-
Notifications
You must be signed in to change notification settings - Fork 2
Thread
Xin Wan edited this page Dec 2, 2019
·
1 revision
ThreadTest.java
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.omg.CORBA.WCharSeqHelper;
public class ThreadTest {
public static void main(String[] args) throws Exception {
ThreadTest test = new ThreadTest();
//test.twoThreadsJoinTest();
//test.volatileTest();
//test.atomicTest();
//test.synchronizedTest();
//test.longAdderTest();
//test.scheduledTasksTest();
//test.futureAndCallableTest();
//test.countDownLatchTest();
//test.cyclicBarrierTest();
//test.lockAndConditionTest();
//test.synchronizedAndWaitNotifyTest();
//test.semaphoreTest();
//test.readWriteLockTest();
//test.deadLockTest();
test.simpleProducerAndConsumerTest();
}
private class MyBlockingQueue<E> {
private int max;
private Queue<E> queue;
private ReentrantLock lock = new ReentrantLock();
private Condition nonEmpty = lock.newCondition();
private Condition non
}
private void simpleProducerAndConsumerTest() {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(2);
Runnable producer = () -> {
while (true) {
try {
int item = new Random().nextInt();
System.out.println(Thread.currentThread().getName() + " thread: add item" + item);
queue.put(item);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
Runnable consumer = () -> {
while (true) {
int item;
try {
item = queue.take();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " thread: take item" + item);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
new Thread(producer).start();
new Thread(producer).start();
new Thread(consumer).start();
new Thread(consumer).start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void deadLockTest() {
DeadLock deadLock = new DeadLock();
ExecutorService service = Executors.newFixedThreadPool(2);
service.submit(new Runnable() {
public void run() {
deadLock.operation1();
}
});
service.submit(new Runnable() {
public void run() {
deadLock.operation2();
}
});
}
private class DeadLock {
private Lock lock1 = new ReentrantLock();
private Lock lock2 = new ReentrantLock();
public void operation1() {
lock1.lock();
System.out.println(Thread.currentThread().getId() + ": get lock1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (lock2.tryLock()) {
System.out.println(Thread.currentThread().getId() + ": get lock2");
lock2.unlock();
} else {
System.out.println(Thread.currentThread().getId() + ": no lock2");
}
lock1.unlock();
}
public void operation2() {
lock2.lock();
System.out.println(Thread.currentThread().getId() + ": get lock2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock1.lock();
System.out.println(Thread.currentThread().getId() + ": get lock1");
lock1.unlock();
lock2.unlock();
}
}
private void readWriteLockTest() {
ReadWriteLock lock = new ReadWriteLock();
for (int i = 0; i < 100; i++) {
if (i % 5 == 4) {
new Thread(() -> lock.write()).start();
} else {
new Thread(() -> lock.read()).start();
}
}
}
private class ReadWriteLock {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
public void read() {
readLock.lock();
System.out.println(Thread.currentThread().getId() + " : reading");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
readLock.unlock();
}
public void write() {
writeLock.lock();
System.out.println(Thread.currentThread().getId() + " : writing");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
writeLock.unlock();
}
}
private void semaphoreTest() {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 4; i++) {
Thread thread = new Thread(new SemaphoreRunnable(semaphore));
thread.start();
}
}
private class SemaphoreRunnable implements Runnable {
private Semaphore semaphore;
public SemaphoreRunnable(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void run() {
System.out.println("Thread " + Thread.currentThread().getId() + ": before get permit");
semaphore.acquireUninterruptibly();
System.out.println("Thread " + Thread.currentThread().getId() + ": get permit");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
semaphore.release();
System.out.println("Thread " + Thread.currentThread().getId() + ": release permit");
}
}
private void synchronizedAndWaitNotifyTest() throws Exception {
WaitNotify wn = new WaitNotify();
Thread thread1 = new Thread(new Runnable() {
public void run() {
wn.callWait();
}
});
Thread thread2 = new Thread(new Runnable() {
public void run() {
wn.callNotify();
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
private class WaitNotify {
private synchronized void callWait() {
System.out.println("I am thread1 and I am going to wait.");
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("I am thread1 and I am done");
}
private synchronized void callNotify() {
System.out.println("I am thread2 and I am going to notify.");
try {
Thread.sleep(1000);
notify();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("I am thread2 and I am done");
}
}
private void lockAndConditionTest() throws InterruptedException {
Lock lock = new ReentrantLock();
Condition conditionMet = lock.newCondition();
Thread thread1 = new Thread(new LockThread(lock, conditionMet, true));
Thread thread2 = new Thread(new LockThread(lock, conditionMet, false));
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
private class LockThread implements Runnable {
private Lock lock;
private Condition conditionMet;
private boolean isWait;
public LockThread(Lock lock, Condition conditionMet, boolean isWait) {
this.lock = lock;
this.conditionMet = conditionMet;
this.isWait = isWait;
}
public void run() {
if (isWait) {
lock.lock();
try {
System.out.println("I am thread1 and I am going to wait.");
conditionMet.await();
System.out.println("I am thread1 and I am done");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
lock.unlock();
}
} else {
lock.lock();
try {
System.out.println("I am thread2 and I am start.");
Thread.sleep(1000);
conditionMet.signal();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println("I am thread2 and I am unlock.");
lock.unlock();
}
}
}
}
private void cyclicBarrierTest() {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService service = Executors.newFixedThreadPool(4);
for (int i = 0; i < 3; i++) {
service.submit(new CyclicBarrierService(barrier));
}
}
private class CyclicBarrierService implements Runnable {
private CyclicBarrier barrier;
public CyclicBarrierService(CyclicBarrier barrier) {
this.barrier = barrier;
}
public void run() {
while(true) {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Thread " + Thread.currentThread().getId() + " message" + new Random().nextInt());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
private void countDownLatchTest() throws InterruptedException {
CountDownLatch counter = new CountDownLatch(3);
ExecutorService service = Executors.newFixedThreadPool(4);
for (int i = 0; i < 3; i++) {
service.submit(new DependentService(counter));
}
counter.await();
System.out.println("Main thread is done.");
}
private class DependentService implements Runnable {
private CountDownLatch counter;
public DependentService(CountDownLatch counter) {
this.counter = counter;
}
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("I am worker " + Thread.currentThread().getId());
counter.countDown();
}
}
private void futureAndCallableTest() throws InterruptedException, ExecutionException {
Callable<Integer> task = new Callable<Integer>() {
public Integer call() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new Random().nextInt();
}
};
ExecutorService service = Executors.newCachedThreadPool();
List<Future<Integer>> list = new ArrayList<Future<Integer>>();
int size = 2000;
for (int i = 0; i < size; i++) {
Future<Integer> future = service.submit(task);
list.add(future);
}
System.out.println("start time: " + new Date());
for (int i = 0; i < size; i++) {
list.get(i).get();
}
System.out.println("end time: " + new Date());
}
private void scheduledTasksTest() {
Runnable task = new Runnable(){
public void run() {
System.out.println("Current time:" + new Date());
}
};
Runnable task1 = new Runnable(){
public void run() {
System.out.println("Current time:" + new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
System.out.println("Current time:" + new Date());
// service.schedule(task, 1, TimeUnit.SECONDS);
//service.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);
service.scheduleWithFixedDelay(task1, 1, 2, TimeUnit.SECONDS);
}
private void longAdderTest() throws InterruptedException {
LongAdder counter = new LongAdder();
ExecutorService service = Executors.newFixedThreadPool(16);
for (int i = 0; i < 10000; i++) {
service.submit(new AdderTaskRunnable(counter));
}
Thread.sleep(2000);
System.out.println("result: " + counter.sum ());
}
private class AdderTaskRunnable implements Runnable {
private LongAdder counter;
public AdderTaskRunnable(LongAdder counter) {
this.counter = counter;
}
public void run() {
counter.increment();
System.out.println("Thread name: " + Thread.currentThread().getId() + " counter: " + counter.intValue());
}
}
private void synchronizedTest() {
Runnable runnable = new SynchronizedRunnable();
int size = 10000;
for (int i = 0; i < size; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
System.out.println("the vaule should be " + size * 2);
}
private void atomicTest() {
Runnable runnable = new AtomicRunnable();
int size = 100000;
for (int i = 0; i < size; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
System.out.println("the vaule should be " + size);
}
private void volatileTest() {
Runnable runnable = new VolatileRunnable();
for (int i = 0; i < 10000; i++) {
Thread thread = new Thread(runnable);
thread.start();
}
System.out.println("the vaule should be 2000");
}
private void twoThreadsJoinTest() {
Thread thread1 = new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("I am a runnable thread.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("End of runnable thread.");
}
});
thread1.start();
Thread thread2 = new MyThread();
thread2.start();
try {
thread1.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of Main thread");
}
private class MyThread extends Thread {
@Override
public void run() {
System.out.println("I am Mythread");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("End of Mythread.");
}
}
}
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicRunnable implements Runnable {
static AtomicInteger counter = new AtomicInteger();
public void run() {
System.out.println("Atomic runnable value: " + counter.get());
System.out.println("Atomic runnable value: " + counter.addAndGet(1));
}
}
public class SynchronizedRunnable implements Runnable {
private static int counter = 0;
public void run() {
addCounter();
addCounter2();
}
private synchronized void addCounter() {
System.out.println("sychronized counter " + ++counter);
}
private void addCounter2() {
synchronized(this) {
System.out.println("sychronized counter2 " + ++counter);
}
}
}
public class VolatileRunnable implements Runnable {
static volatile int counter = 0;
@Override
public void run() {
System.out.println("Vilotile Runnable: val = " + counter++);
System.out.println("Vilotile Runnable: val = " + counter++);
}
}