Skip to content

Java线程池拓展—Callable,FutureTask #14

@jingegebuguai

Description

@jingegebuguai

AbstractExecutorService类源码分析

线程池ThreadPoolExecutor继承于AbstractorExecutorService类,所以分析该类的构成很有必要。

上一篇博文提到ExecutorService是JDK并发工具包提供的一个核心接口,相当于一个线程池,提供执行任务和管理生命周期的方法。AbstractExecutorService主要实现了这个接口。 处理后的源码如下:

    public abstract class AbstractExecutorService implements ExecutorService {
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
        //submit函数1
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        //submit函数2
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
        //submit函数3
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {······}
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {······}
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {
    }

submit()方法源码分析

下面三个submit()方法是ExecutorService接口中的,并且由AbstructExecutorService类具体实现。

  public Future<?> submit(Runnable task) {···}
      
  public <T> Future<T> submit(Runnable task, T result) {···}

  public <T> Future<T> submit(Callable<T> task) {···}

Callable()接口源码

    @FunctionalInterface
    public interface Callable<V> {
        //计算结果
        V call() throws Exception;
    }

#### newTaskFor()方法源码    

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
        
#### FutureTask类部分源码

    public class FutureTask<V> implements RunnableFuture<V> {
        ···
        public FutureTask(Callable<V> callable) {···}
        public FutureTask(Runnable runnable, V result) {···}
        ···
    }

RunnableFuture接口源码

   public interface RunnableFuture<V> extends Runnable, Future<V> {
       void run();
   }

Runnable接口源码

    @FunctionalInterface
    public interface Runnable {
        public abstract void run();
    }

Future接口源码

    public interface Future<V> {
        //试图取消对任务的执行,参数为true表示可以取消正在执行的任务
        boolean cancel(boolean mayInterruptIfRunning);
        //任务是否取消成功
        boolean isCancelled();
        //任务是否已经完成
        boolean isDone();
        //获取执行结果,会产生阻塞,等到任务执行完再返回结果
        V get() throws InterruptedException, ExecutionException;
        //在指定时间内未返回结果,则返回null
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }

从上述源码可知,想要分析submit()方法,转接这需要分析很多FutureTask类和若干接口。梳理一下如下所示

ThreadPoolExecutor-->(extends)AbstractExecutorService
AbstractExecutorService-->具有三个submit()方法,参数类型包括Runnable,Callable
submit()-->返回FutureTask类对象-->(implements)RunnableFuture-->(extends)Runnable,Future

我们来看Callable接口,从源码可知,这个接口只有一个call()方法,用以调用并返回结果,和Runnable相似,用以返回一个对象或异常。

Call接口使用泛型定义返回类型,Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。

java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

可以用ExecutorService接口中的submit()方法实现这个功能,submit()负责向线程池提交Callable任务,然后返回Future对象,我们在借助Future对象的get()方法获取Callable接口中call()方法的结果:

<T> Future<T> submit(Callable<T> task);
submit()具体实现源码
    public <T> Future<T> submit(Callable<T> task) {
        //Callable任务为空,抛出无参异常
        if (task == null) throw new NullPointerException();
        //返回FutureTask类对象,
        RunnableFuture<T> ftask = newTaskFor(task);
        //向线程池提交一个任务
        execute(ftask);
        //返回处理后的任务
        return ftask;
    }

我们分析的是传入Callable类型参数的submit方法,其实其他两个源码都类似, 注解基本都已经备注好,第一步判断参数的有无,我们来看看第二步返回是FutureTask类的对象,注意这个类实现的是RunnableFuture接口,而这个接口继承的又是Runnable和Future接口(接口也是可以extends接口的哦!)。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。来看第二步代码FutureTask类的构造函数源码:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;      
    }

FutureTask类实现了Future接口的一系列方法,如get()。第三步是向线程池提交任务,最后返回这个任务,下面看一个实例:

Callable,Future示例
    public class Test {
        public static void main(String[] args) {
            //建议使用Executors类中的方法构建线程池,具体使用方法和源码分析看我关于线程池本主题的其他博文
            ExecutorService executor = Executors.newCachedThreadPool();
            Task task = new Task();
            Future<Integer> result = executor.submit(task);
            executor.shutdown();
             
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
             
            System.out.println("主线程在执行任务");
             
            try {
                System.out.println("task运行结果"+result.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
             
            System.out.println("所有任务执行完毕");
        }
    }
    class Task implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            System.out.println("子线程在进行计算");
            Thread.sleep(3000);
            int sum = 0;
            for(int i=0;i<100;i++)
                sum += i;
            return sum;
        }
    }

执行结果如下:

子线程在进行计算
主线程在执行任务
task运行结果4950
所有任务执行完毕
Callable,FutureTask示例
   public class Test {
       public static void main(String[] args) {
           //第一种方式,线程池方式
           ExecutorService executor = Executors.newCachedThreadPool();
           Task task = new Task();
           FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
           executor.submit(futureTask);
           executor.shutdown();
            
           //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
           /*Task task = new Task();
           FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
           Thread thread = new Thread(futureTask);
           thread.start();*/
            
           try {
               Thread.sleep(1000);
           } catch (InterruptedException e1) {
               e1.printStackTrace();
           }
            
           System.out.println("主线程在执行任务");
            
           try {
               System.out.println("task运行结果"+futureTask.get());
           } catch (InterruptedException e) {
               e.printStackTrace();
           } catch (ExecutionException e) {
               e.printStackTrace();
           }
            
           System.out.println("所有任务执行完毕");
       }
   }
   class Task implements Callable<Integer>{
       @Override
       public Integer call() throws Exception {
           System.out.println("子线程在进行计算");
           Thread.sleep(3000);
           int sum = 0;
           for(int i=0;i<100;i++)
               sum += i;
           return sum;
       }
   }

上面一段代码是对Callable+FutureTask方法的演示,若是Runnable+FutureTask也可以用此调用方式,只是task类中的不是call方法,而是实现run方法。

如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions