Permalink
Browse files

learn java util concurrent interfaces.

  • Loading branch information...
jasonwee committed Nov 6, 2015
1 parent 3d29161 commit ce479e5befaf7abe84d3d85930d5196a639e2643
@@ -0,0 +1,55 @@
package org.just4fun.concurrent;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DemoExecutor {
/**
*
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
*/
public static void main(String[] args) {
Integer threadCounter = 0;
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("DemoTask Rejected : " + ((DemoThread ) r).getName());
System.out.println("Waiting for a second !!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Lets add another time : " + ((DemoThread) r).getName());
executor.execute(r);
}
});
// Let start all core threads initially
executor.prestartAllCoreThreads();
while (true) {
threadCounter++;
// Adding threads one by one
System.out.println("Adding DemoTask : " + threadCounter);
executor.execute(new DemoThread(threadCounter.toString()));
if (threadCounter == 100)
break;
}
}
}
@@ -0,0 +1,58 @@
package org.just4fun.concurrent;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExampleThreadPoolExecutor {
private BlockingQueue<Runnable> worksQueue;
private RejectedExecutionHandler executionHandler;
private ThreadPoolExecutor executor;
private Thread monitor;
public ExampleThreadPoolExecutor() {
worksQueue = new ArrayBlockingQueue<Runnable>(2);
executionHandler = new MyRejectedExecutionHandlerImpl();
// create the threadpoolExecutor
executor = new ThreadPoolExecutor(3, 3, 10, TimeUnit.SECONDS, worksQueue, executionHandler);
executor.allowCoreThreadTimeOut(true);
monitor = new Thread(new MyMonitorThread(executor));
monitor.setDaemon(true);
}
public boolean start() {
monitor.start();
// change this to listener.
executor.execute(new MyWork("1"));
executor.execute(new MyWork("2"));
executor.execute(new MyWork("3"));
executor.execute(new MyWork("4"));
executor.execute(new MyWork("5"));
executor.execute(new MyWork("6"));
executor.execute(new MyWork("7"));
executor.execute(new MyWork("8"));
try
{
Thread.sleep(30000);
executor.execute(new MyWork("9"));
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
public static void main(String[] args) {
ExampleThreadPoolExecutor server = new ExampleThreadPoolExecutor();
server.start();
}
}
@@ -0,0 +1,13 @@
package org.just4fun.concurrent;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class MyRejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " : I've been rejected !");
}
}
@@ -0,0 +1,75 @@
package play.learn.java.concurrent;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ForkJoinPool.ManagedBlocker;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TransferQueue;
public class LearnConcurrentInterfaceP2 {
public static void main(String[] args) throws Exception {
// no need to implement this for basic developer, just use class ForkJoinPool which
// inner implementation of ForkJoinWorkerThreadFactory.
// ForkJoinPool.ForkJoinWorkerThreadFactory
// --------------------
BlockingQueue<String> bq = new ArrayBlockingQueue<String>(2);
bq.put("A");
bq.put("B");
QueueManagedBlocker<String> blocker = new QueueManagedBlocker<String>(bq);
ForkJoinPool.managedBlock(blocker);
System.out.println(blocker.isReleasable());
System.out.println(blocker.getValue());
// --------------------
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(new Summer(11,22));
executorService.shutdown();
// --------------------
// RejectedExecutionHandler see
//src/java/org/just4fun/concurrent/MyRejectedExecutionHandlerImpl.java
//src/java/org/just4fun/concurrent/DemoExecutor.java
//src/java/org/just4fun/concurrent/ExampleThreadPoolExecutor.java
// --------------------
RunnableFuture<Integer> rf = new FutureTask<Integer>(new Summer(22,33));
// --------------------
RunnableScheduledFuture<Integer> rsf = new Summer1();
System.out.println(rsf.isPeriodic());
// --------------------
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> System.out.println("hihi"), 1, 1, TimeUnit.SECONDS);
Thread.sleep(3000);
scheduler.shutdown();
// --------------------
ScheduledFuture<Integer> sf = new ScheduledFutureImpl();
sf.isCancelled();
// --------------------
ThreadFactory tf = Executors.defaultThreadFactory();
tf.newThread(()->System.out.println("ThreadFactory")).start();
// --------------------
TransferQueue<Integer> tq = new LinkedTransferQueue<Integer>();
}
}
@@ -0,0 +1,31 @@
package play.learn.java.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ForkJoinPool.ManagedBlocker;
public class QueueManagedBlocker<T> implements ManagedBlocker {
final BlockingQueue<T> queue;
volatile T value = null;
QueueManagedBlocker(BlockingQueue<T> queue) {
this.queue = queue;
}
@Override
public boolean block() throws InterruptedException {
if (value == null)
value = queue.take();
return true;
}
@Override
public boolean isReleasable() {
return value != null || (value = queue.poll()) != null;
}
public T getValue() {
return value;
}
}
@@ -0,0 +1,54 @@
package play.learn.java.concurrent;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ScheduledFutureImpl implements ScheduledFuture<Integer> {
@Override
public long getDelay(TimeUnit unit) {
// TODO Auto-generated method stub
return 0;
}
@Override
public int compareTo(Delayed o) {
// TODO Auto-generated method stub
return 0;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isDone() {
// TODO Auto-generated method stub
return false;
}
@Override
public Integer get() throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
return null;
}
@Override
public Integer get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// TODO Auto-generated method stub
return null;
}
}
@@ -0,0 +1,66 @@
package play.learn.java.concurrent;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Summer1 implements RunnableScheduledFuture<Integer> {
@Override
public void run() {
// TODO Auto-generated method stub
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isDone() {
// TODO Auto-generated method stub
return false;
}
@Override
public Integer get() throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
return null;
}
@Override
public Integer get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// TODO Auto-generated method stub
return null;
}
@Override
public long getDelay(TimeUnit unit) {
// TODO Auto-generated method stub
return 0;
}
@Override
public int compareTo(Delayed o) {
// TODO Auto-generated method stub
return 0;
}
@Override
public boolean isPeriodic() {
// TODO Auto-generated method stub
return false;
}
}

0 comments on commit ce479e5

Please sign in to comment.