From 48553900b67ff4ac94efd1b762b295d168c83056 Mon Sep 17 00:00:00 2001 From: "calvin.xiao" Date: Sun, 29 Nov 2015 16:40:37 +0800 Subject: [PATCH] =?UTF-8?q?Threads=E5=A2=9E=E5=8A=A0=E6=9B=B4=E5=A4=9AThre?= =?UTF-8?q?adPool=E5=88=9B=E5=BB=BA=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/springside/modules/utils/Threads.java | 147 +++++++++++++++--- 1 file changed, 128 insertions(+), 19 deletions(-) diff --git a/modules/utils/src/main/java/org/springside/modules/utils/Threads.java b/modules/utils/src/main/java/org/springside/modules/utils/Threads.java index d35ed207a..49e598313 100644 --- a/modules/utils/src/main/java/org/springside/modules/utils/Threads.java +++ b/modules/utils/src/main/java/org/springside/modules/utils/Threads.java @@ -5,8 +5,13 @@ *******************************************************************************/ package org.springside.modules.utils; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -23,18 +28,18 @@ public class Threads { /** - * sleep等待, 单位为毫秒. 已捕捉并处理InterruptedException. + * sleep等待, 单位为毫秒, 已捕捉并处理InterruptedException. */ - public static void sleep(long durationInMillis) { + public static void sleep(long durationMillis) { try { - Thread.sleep(durationInMillis); + Thread.sleep(durationMillis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** - * sleep等待. 已捕捉并处理InterruptedException. + * sleep等待,已捕捉并处理InterruptedException. */ public static void sleep(long duration, TimeUnit unit) { try { @@ -57,22 +62,12 @@ public static void sleep(long duration, TimeUnit unit) { * * 返回线程最后是否被中断. */ - public static boolean gracefulShutdown(ExecutorService threadPool, int shutdownTimeoutInMills) { - return MoreExecutors.shutdownAndAwaitTermination(threadPool, shutdownTimeoutInMills, TimeUnit.MILLISECONDS); + public static boolean gracefulShutdown(ExecutorService threadPool, int shutdownTimeoutMills) { + return MoreExecutors.shutdownAndAwaitTermination(threadPool, shutdownTimeoutMills, TimeUnit.MILLISECONDS); } /** - * 按照ExecutorService JavaDoc示例代码编写的Graceful Shutdown方法. - * - * 先使用shutdown, 停止接收新任务并尝试完成所有已存在任务. - * - * 如果1/2超时时间后, 则调用shutdownNow,取消在workQueue中Pending的任务,并中断所有阻塞函数. - * - * 如果1/2超时仍然超時,則強制退出. - * - * 另对在shutdown时线程本身被调用中断做了处理. - * - * 返回线程最后是否被中断. + * @see #gracefulShutdown(ExecutorService, int) */ public static boolean gracefulShutdown(ExecutorService threadPool, int shutdownTimeout, TimeUnit timeUnit) { return MoreExecutors.shutdownAndAwaitTermination(threadPool, shutdownTimeout, timeUnit); @@ -88,14 +83,128 @@ public static ThreadFactory buildThreadFactory(String nameFormat) { } /** - * 创建ThreadFactory,使得创建的线程有自己的名字而不是默认的"pool-x-thread-y" + * 可设定是否daemon, daemon线程在主线程已执行完毕时, 不会阻塞应用不退出, 而非daemon线程则会阻塞. * - * 格式如"mythread-%d",使用了Guava的工具类 + * @see #buildThreadFactory(String) */ public static ThreadFactory buildThreadFactory(String nameFormat, boolean daemon) { return new ThreadFactoryBuilder().setNameFormat(nameFormat).setDaemon(daemon).build(); } + /** + * 创建FixedThreadPool, 等价Executors的默认实现. + * + * 任务提交时, 如果线程数还没达到nThreads即创建新线程(即n次提交后线程总数必达到n) + * + * 线程不会空闲回收. + * + * 当所有线程繁忙时, 放入无限长的LinkedBlockingQueue中等待. + */ + public static ExecutorService newFixedThreadPool(int nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } + + /** + * @see #newFixedThreadPool(int) + */ + public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), threadFactory); + } + + /** + * 创建Queue长度受限的FixedThreadPool. + * + * 当到达maxQueue时,调用RejectHandler,默认为AbortPolicy,抛出RejectedExecutionException异常. + * 其他可选的Policy包括静默放弃当前任务(Discard),放弃Queue里最老的任务,或由主线程来直接执行(CallerRuns). + * + * @see #newFixedThreadPool(int) + */ + public static ExecutorService newFixedThreadPool(int nThreads, int maxQueueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(maxQueueSize)); + } + + /** + * @see #newFixedThreadPool(int, int) + */ + public static ExecutorService newFixedThreadPool(int nThreads, int maxQueueSize, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(maxQueueSize), threadFactory); + } + + /** + * @see #newFixedThreadPool(int, int) + */ + public static ExecutorService newFixedThreadPool(int nThreads, int maxQueueSize, ThreadFactory threadFactory, + RejectedExecutionHandler rejectHanlder) { + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(maxQueueSize), threadFactory, rejectHanlder); + } + + /** + * 创建CachedThreadPool,等价Executors的默认实现. + * + * 任务提交时, 如果没有空闲线程, 立刻创建新线程, 总线程数无上限. + * + * 如果线程空闲超过一分钟, 进行回收. + */ + public static ExecutorService newCachedThreadPool() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); + } + + /** + * @see #newCachedThreadPool() + */ + public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), + threadFactory); + } + + /** + * 创建CachedThreadPool,与Executors的默认实现相比, 线程总数依然无上限,但可设置KeepAlive时间(默认1分钟). + */ + public static ExecutorService newCachedThreadPool(long keepAliveSecs) { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveSecs, TimeUnit.SECONDS, + new SynchronousQueue()); + } + + /** + * @see #newCachedThreadPool(long) + */ + public static ExecutorService newCachedThreadPool(long keepAliveSecs, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveSecs, TimeUnit.SECONDS, + new SynchronousQueue(), threadFactory); + } + + /** + * 创建CachedThreadPool,与Executors的默认实现相比, 可设置maxThreads(默认无限)与keepAlive时间(默认1分钟). + * + * 当到达maxThreads时,调用RejectHandler,默认为AbortPolicy,抛出RejectedExecutionException异常, + * 其他可选的Policy包括静默放弃任务(Discard)或由主线程来直接执行(CallerRuns). + */ + public static ExecutorService newCachedThreadPool(int maxThreads, long keepAliveSecs) { + return new ThreadPoolExecutor(0, maxThreads, keepAliveSecs, TimeUnit.SECONDS, new SynchronousQueue()); + } + + /** + * @see #newCachedThreadPool(int, long) + */ + public static ExecutorService newCachedThreadPool(int maxThreads, long keepAliveSecs, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, maxThreads, keepAliveSecs, TimeUnit.SECONDS, new SynchronousQueue(), + threadFactory); + } + + /** + * @see #newCachedThreadPool(int, long) + */ + public static ExecutorService newCachedThreadPool(int maxThreads, long keepAliveSecs, ThreadFactory threadFactory, + RejectedExecutionHandler rejectHanlder) { + return new ThreadPoolExecutor(0, maxThreads, keepAliveSecs, TimeUnit.SECONDS, new SynchronousQueue(), + threadFactory, rejectHanlder); + } + /** * 保证不会有Exception抛出到线程池的Runnable,防止用户没有捕捉异常导致中断了线程池中的线程。 */