Skip to content

Java线程池详解 #12

@jingegebuguai

Description

@jingegebuguai

线程池

为什么设计线程池

如果不使用线程池,当并发的线程数量很多,并且每个线程执行很短时间就结束,这要频繁创建线程会造成大量资源消,降低系统效率。

作用

合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池构造函数

源码构造方式一

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }

源码构造方式二

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

源码构造方式三

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

源码构造方式四

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

源码构造函数分析

前三个线程池构造函数都使用的this()来调用其他构造函数,而真正初始化作用的构造函数是第四个,所以前三个构造函数是基于第四个构造器初始化的。

下面详细说明每个参数的含义:

  • corePollSize: 核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

  • maximumPoolSize: 线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

  • keepAliveTime: 线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。默认线程池的线程数大于线程池大小时起作用,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize,但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

  • workQueue: 一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响.

    ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

    SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

    PriorityBlockingQueue:一个具有优先级得无限阻塞队列。

  • unit: 时间单位

  • threadFactory: 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字

  • handler: 拒绝处理任务时的策略

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

再来看看以上的构造函数,可以发现四个构造函数在对threadFactory和handler的处理不同。

线程池流程图

  • 1、当有新任务时,先判断线程数是否达到corePoolSize,未满的话直接继续创建新线程,满的话进入下一步。
  • 2、判断工作队列workQueue是否已满,未满则将新任务添加到队列中等待,满的话进入下一步。
  • 3、判断线程池是否达到maximumPoolSize,是的话按饱和策略处理,无法处理新任务,没有达到的话则继续创建线程。

AbstractExecutorService

ExecutorService是JDK并发工具包提供的一个核心接口,相当于一个线程池,提供执行任务和管理生命周期的方法。AbstractExecutorService主要实现了这个接口。
处理后的源码如下:

    public abstract class AbstractExecutorService implements ExecutorService {
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
        //submit函数1
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        //submit函数2
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
        //submit函数3
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {······}
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {······}
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {
    }    

ExecutorService接口

源码如下所示:

    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }

Executor接口

   public interface Executor {
       void execute(Runnable command);
   }
   

各方法源码分析

在分析之前先梳理一下各个类和接口的实现继承关系。

ThreadPoolExector——>(extends)AbstractExecutorService——>(implement)ExectorService——>(implement)Executor

现在我们来依次分析:

ThreadPoolExecutor类分析

先看这个类的相关方法的使用与源码

线程池状态码

    volatile int runState;
    static final int RUNNING    = 0;
    static final int SHUTDOWN   = 1;
    static final int STOP       = 2;
    static final int TERMINATED = 3;
  • 当创建线程池后,初始时,线程池处于RUNNING状态;
  • 如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
  • 如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
  • 当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

线程池成员变量

    private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
    private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(runState等)的改变都要使用这个锁
    private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
    private volatile long  keepAliveTime;    //线程存货时间   
    private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
    private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
    private volatile int   poolSize;       //线程池中当前的线程数
    private volatile RejectedExecutionHandler handler; //任务拒绝策略
    private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
    private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
    private long completedTaskCount;   //用来记录已经执行完毕的任务个数

线程池的线程初始化

    //初始化核心线程池一个线程
    public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null); //注意传进去的参数是null
    }
    //初始化核心线程池所有线程
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
            ++n;
        return n;
    }

execute()方法分析

首先是Executor接口中的execute的方法,为核心方法。这个方法主要在ThreadPoolExecutor中实现,这个方法可以向线程池提交一个任务,交由线程池去执行。源码如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取ctl,主线程控制状态ctl是包装两个概念的原子整数,其中workerCount指代有效线程数,runState指代线程是运行还是停止等。
        int c = ctl.get();
        //保证增加工作线程时,ctl可以实时更新。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //保证任务可以在新线程或合并线程中运行
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }    

从传入的参数可知它是执行的实现Runnable接口的线程。

shutdown()方法分析

这个方法主要是有序关闭线程,先前提交的任务将会执行,但不会再接受新任务。一般执行关闭线程池任务主要使用shutdown()方法。

    public void shutdown() {
        //声明一个重入锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        //关闭线程
        try {
            //确保有权限关闭线程
            checkShutdownAccess();
            //将运行状态转化为跟定目标
            advanceRunState(SHUTDOWN);
            //中断线程,等待未执行的任务
            interruptIdleWorkers();
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

ShutdownNow()方法分析

中断所有已经执行的任务,并且不接受新的任务,源码与方法shutdown()类似。不同点是ShutdownNow()遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

isShutdown()方法分析

只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。判断是否在关闭线程池。

    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }
 ```java   
#### isTerminated()方法分析
当所有的任务都已关闭后,才表示线程池关闭成功这时调用isTerminaed方法会返回true。
```java
    public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }

remove()方法分析

移除线程池中正在执行的任务,线程未执行的话则不执行该方法。

    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); 
        return removed;
    }

purge()方法分析

删除线程池中所有已经被取消的任务,用于回收存储回收操作。

    public void purge() {
        final BlockingQueue<Runnable> q = workQueue;
        try {
            Iterator<Runnable> it = q.iterator();
            while (it.hasNext()) {
                Runnable r = it.next();
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    it.remove();
            }
        } catch (ConcurrentModificationException fallThrough) {
            for (Object r : q.toArray())
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    q.remove(r);
        }
        tryTerminate(); 
    }

getPoolSize()方法分析

用于返回当前线程池存在的线程数量

    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

getActiveCount()方法分析

返回正在执行任务的线程数量

      public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

getLargestPoolSize()方法分析

返回线程池中曾经创建过的线程最大数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。

    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

getTaskCount()方法分析

返回计划任务数量的估计值(已经执行完的任务数+正在执行的任务数+工作队列等待的线程数)。

    public long getTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //已经完成的任务数
                long n = completedTaskCount;
                //加上正在线程池执行的任务数
                for (Worker w : workers) {
                    n += w.completedTasks;
                    if (w.isLocked())
                        ++n;
                }
                //再加上工作队列中等待的任务数
                return n + workQueue.size();
            } finally {
                mainLock.unlock();
            }
        }

getCompletedTaskCount()方法分析

返回已经完成的任务数,其实是已完成的加上正执行的任务数。源码和getTaskCount()基本类似。

toString()方法分析

返回线程池的相关信息

    public String toString() {
        long ncompleted;
        int nworkers, nactive;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            ncompleted = completedTaskCount;
            nactive = 0;
            nworkers = workers.size();
            for (Worker w : workers) {
                ncompleted += w.completedTasks;
                if (w.isLocked())
                    ++nactive;
            }
        } finally {
            mainLock.unlock();
        }
        int c = ctl.get();
        String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
                     (runStateAtLeast(c, TERMINATED) ? "Terminated" :
                      "Shutting down"));
        return super.toString() +
            "[" + rs +
            ", pool size = " + nworkers +
            ", active threads = " + nactive +
            ", queued tasks = " + workQueue.size() +
            ", completed tasks = " + ncompleted +
            "]";
    }

线程池简单示例

    public class Test {
         public static void main(String[] args) {   
             ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                     new ArrayBlockingQueue<Runnable>(5));
              
             for(int i=0;i<15;i++){
                 MyTask myTask = new MyTask(i);
                 executor.execute(myTask);
                 System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                 executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
             }
             executor.shutdown();
         }
    }
    class MyTask implements Runnable {
        private int taskNum;
         
        public MyTask(int num) {
            this.taskNum = num;
        }
         
        @Override
        public void run() {
            System.out.println("正在执行task "+taskNum);
            try {
                Thread.currentThread().sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task "+taskNum+"执行完毕");
        }
    }

在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

具体实现如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  • newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

  • newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

配置线程池大小

如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,如果线程池过小,那么会导致许多空闲的处理器无法执行工作,从而降低吞吐率。

NCPU是系统处理器的数量

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

如果是IO密集型任务,参考值可以设置为2*NCPU

  当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions