Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

ExecutorCompletionService

marcesher edited this page · 10 revisions

ExecutorCompletionService

From the Javadocs, an ExecutorCompletionService is:

A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks.

You supply an ExecutorCompletionService with an output queue, and as the tasks are finished, they are moved into the output queue. You then have a separate polling task that processes those results, in completion order, as they arrive.

Java APIs

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html

In Action

To see the ExecutorCompletionService in action, download CFConcurrent and run examples/ExecutorCompletionService/index.cfm

Difference between an ExecutorService and an ExecutorCompletionService

When using an ExecutorService, you submit tasks and either a) ignore the results or b) wait for the tasks to complete and then fetch the results with Future.get().

With an ExecutorCompletionService, you submit tasks, but you do not wait for the results to complete. Instead, when you construct the ExecutorCompletionService, you supply it a "polling" task. You configure the ExecutorCompletionService to run on a schedule (every 10 milliseconds, every minute, every 15 minutes... it depends on your application's needs). As your submitted tasks complete, they are put onto the output queue. When the polling task executes on its schedule, it poll()s the completed tasks off of the output queue.

Thus, rather than fetch results in the same process that submitted tasks, you fetch results in a separate process.

When this might be useful

ExecutorCompletionService has been useful to me in at least these two scenarios:

  1. When I don't need to fetch results immediately, but I do want to inspect them at some time in the future, in a different process
  2. When I need to submit a lot of tasks, and wish to process the results as tasks complete, rather than waiting for the entire batch to complete

I wrote an application once that constantly searched the file system for files to delete. A ScheduledThreadPoolExecutor was configured to run every few seconds, finding deleteable files. Once found, new Tasks were created for deleting each of the files. Those tasks were submitted to a completion service for asynchronous execution. Then, every 15 minutes or so, the Completion Service would gather up all the deletion results and store metadata about the deletions in the database. In this case, I cared more about the aggregate deletion information than about the results of each individual deletion; thus, separating task submission from task result processing made sense.

Imagine submitting 10K tasks to an ExecutorService with invokeAll(). To process the results, you'd need to wait for all 10K tasks to complete. What if, instead, you'd rather process the results as they completed. An ExecutorCompletionService is needed here... you can configure it to run on a tight schedule (every few ms), and can thus process the results in near real-time, even as other tasks are still waiting to complete.

Create Tasks that do the work

As discussed on Page 1, we use Tasks to do our program's actual work. These are CFCs with a result-returning call() method. Please read the documentation for Tasks for more detail.

In this example, we'll use the HelloTask you've seen in the rest of this documentation. In your application, you'll replace the call() method with the implementation that solves your problem (obviously).

Create a Task that post-processes completed worker tasks

This Task is a Runnable, meaning it provides a run() method which returns void. You'll create a single instance of this task and submit it to the Completion Service. On the configured schedule, the completion service will run your completion task, and in that task you'll pull all the completed results off of the completion queue.

CFConcurrent provides a convenience AbstractCompletionTask which you can extend. It handles polling the completion queue, pulling off completed tasks and submitting an array of results to a process() method which you implement. In this documentation, I'm going to use that approach. If you want to create your own Completion Task entirely from scratch, simply follow the AbstractCompletionTask as a guide.

component extends="cfconcurrent.AbstractCompletionTask"{

    publishCount = 0;

    function process( results ){
        publishCount += arrayLen(results);
        writeLog("Process: received #arrayLen(results)# tasks. Total: #publishCount#");
        writeLog("Process: first task ID: #results[1].id#; it slept for #results[1].sleepTime# ms. last task ID: #results[arrayLen(results)].id#; it slept for #results[arrayLen(results)].sleepTime# ms");
    }

}

If you'll recall from the HelloTask, call() returns a struct. Here, in our Completion Task, we'll receive an array of those results, one struct for every returned struct from the completed task.

In this simple example, we do nothing noteworthy. In your applications, you'll be responding to each completion with more important behavior.

Create the ExecutorCompletionService

Creating an ExecutorCompletionService is similar to creating an ExecutorService. The difference is that you'll also create that single instance of your Completion Task and set that as the completion task for the Completion Service.

component{
    this.name = "executorCompletionServiceExample";

    function onApplicationStart(){
        //a maxConcurrent of 0 will cause the service to default to the number of Available Processors + 1
        application.executorCompletionService = createObject("component", "cfconcurrent.ExecutorCompletionService")
            .init( serviceName = "executorCompletionServiceExample",
                maxConcurrent = 0,
                completionQueueProcessFrequency = 2 );
        application.executorCompletionService.setLoggingEnabled( true );
        application.completionTask = createObject("component", "CompletionTask");
        application.executorCompletionService.setCompletionQueueProcessTask( application.completionTask );

        application.executorCompletionService.start();
    }

    function onRequestStart(){
        if( structKeyExists(url, "stop") OR structKeyExists(url, "reinit") ){
            applicationStop();
            onApplicationStop();
        }
    }

    function onApplicationStop(){
        writeLog("Stopping #application.applicationName# Completion Service");
        application.executorCompletionService.stop();
    }
}

It's important to note here that when working directly with Java, the ExecutorCompletionService simply adds the "output queue" onto an ExecutorService. You need to create a ScheduledThreadPoolExecutor, a completion task, and schedule that completion task separately.

Because this is the most common usage of an ExecutorCompletionService, CFConcurrent simplifies this by creating both the completion service and the ScheduledThreadPoolExecutor for you.

Determining Appropriate Configuration Parameters

See Determining Configuration Options for guidance

Submit Tasks

Submitting tasks is largely the same as described in ExecutorService, except that you have fewer choices. With an ExecutorService, you can execute(), submit() but also invokeAll(), invokeAny(). With the first two, you fire and forget. The latter two you wait for results.

With a Completion Service, you do not wait for results because they are handled in a separate process; thus, the latter two are irrelevant. And execute() accepts a non-result-returning Runnable, which doesn't fit with the idea of "processing results" (who'd want to process a null?).

Consequently, the only method you care about is submit( Callable )

Typical usage:

task = new HelloTask( args );
future = application.completionService.submit( task );

Pause and Resume the ExecutorCompletionService

CFConcurrent's ExecutorCompletionService provides a convenient ability to pause/unpause a service. Effectively, this enables you to ignore new tasks, if you need to.

application.completionService.pause(); will trigger this ignore behavior. Any tasks that are submitted are simply thrown away.

application.completionService.unPause(); will enable acceptance of new tasks.

Something went wrong with that request. Please try again.