Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

refactored API; added invokeAll and invokeAny wrappers

  • Loading branch information...
commit 7f798eebafd285ba9c0fbf21cd0e53fbb0c73917 1 parent 67cff71
Marc Esher authored
109 AbstractConcurrencyService.cfc → AbstractExecutorService.cfc
View
@@ -22,8 +22,7 @@ component output="false" accessors="true"{
variables.logName = "cfconcurrent";
variables.loggingEnabled = false;
variables.baseStorageScopeName = "__cfconcurrent";
- variables.objectFactory = new cfconcurrent.ObjectFactory();
- variables.timeUnit = objectFactory.createTimeUnit();
+
variables.status = "stopped";
@@ -35,22 +34,24 @@ component output="false" accessors="true"{
It is the responsibility of the user of this library to call stop() on services he/she starts
*/
- public function init( String appName ){
+ public function init( String appName, objectFactory="#createObject('component', 'ObjectFactory').init()#" ){
structAppend( variables, arguments );
+ variables.timeUnit = objectFactory.createTimeUnit();
return this;
}
/* Lifecycle methods*/
+
public function start(){
//This will be overridden by implementers
status = "started";
return this;
}
- public function stop(){
- shutdownAllExecutors();
+ public function stop( timeout=100, timeUnit="milliseconds" ){
+ shutdownAllExecutors( timeout, timeUnit );
status = "stopped";
return this;
}
@@ -81,6 +82,97 @@ component output="false" accessors="true"{
return getStatus() eq "paused";
}
+ /* Execution methods */
+
+ /**
+ * Submits an object for execution. Returns a Future if Callable and a RunnableFuture if Runnable
+ * If the service is not running, tasks are ignored
+ * @task A task instance. Object must expose either a call() or run() method.
+ */
+ public function submit( task ){
+
+ if( isStarted() ){
+ var proxy = objectFactory.createSubmittableProxy( task );
+ return getSubmissionTarget().submit( proxy );
+
+ } else if( isPaused() ) {
+ writeLog("Service paused... ignoring submission");
+ } else if( isStopped() ){
+ throw("Service is stopped... not accepting new tasks");
+ }
+ }
+
+ /**
+ * Executes the tasks, returning an array of Futures when all complete.
+ * If the service is not running, tasks are ignored.
+ * @tasks An array of task instances. A task CFC must expose a call() method that returns a result
+ * @timeout Maximum time to wait. 0 indicates to wait until completion
+ * @timeUnit TimeUnit of the timeout argument, as a string. Defaults to "seconds".
+ */
+ public function invokeAll( array tasks, timeout=0, timeUnit="seconds" ){
+ var results = [];
+ var proxies = [];
+
+ if( isStarted() ){
+
+ for( var task in tasks ){
+ arrayAppend( proxies, objectFactory.createSubmittableProxy( task ) );
+ }
+ if( timeout LTE 0 ){
+ return getSubmissionTarget().invokeAll( proxies );
+ } else {
+ return getSubmissionTarget().invokeAll( proxies, timeout, objectFactory.getTimeUnitByName( timeUnit ) );
+ }
+
+ } else if( isPaused() ) {
+ writeLog("Service paused... ignoring submission");
+ } else if( isStopped() ){
+ throw("Service is stopped... not accepting new tasks");
+ }
+ }
+
+ /**
+ * Executes the tasks, returning the result of one that has completed successfully, if any do. This result will be the returned value from the task's call() method
+ * If the service is not running, tasks are ignored.
+ * @tasks An array of task instances. A task CFC must expose a call() method that returns a result
+ * @timeout Maximum time to wait. 0 indicates to wait until completion
+ * @timeUnit TimeUnit of the timeout argument, as a string. Defaults to "seconds".
+ */
+ public function invokeAny( array tasks, timeout=0, timeUnit="seconds" ){
+ var results = [];
+ var proxies = [];
+
+ if( isStarted() ){
+
+ for( var task in tasks ){
+ arrayAppend( proxies, objectFactory.createSubmittableProxy( task ) );
+ }
+ if( timeout LTE 0 ){
+ return getSubmissionTarget().invokeAny( proxies );
+ } else {
+ return getSubmissionTarget().invokeAny( proxies, timeout, objectFactory.getTimeUnitByName( timeUnit ) );
+ }
+
+ } else if( isPaused() ) {
+ writeLog("Service paused... ignoring submission");
+ } else if( isStopped() ){
+ throw("Service is stopped... not accepting new tasks");
+ }
+ }
+
+ /**
+ * Straight from the javadoc: Executes the given command at some time in the future. The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.
+ * This is the equivalent of fire-and-forget usage of cfthread
+
+ * @runnableTask A task instance that exposes a void run() method
+ */
+ public function execute( runnableTask ){
+ var proxy = objectFactory.createRunnableProxy( runnableTask );
+ getSubmissionTarget().execute( proxy );
+ }
+
+ /* Storage methods for shutdown management */
+
/**
* application["__cfoncurrent"]
*/
@@ -126,7 +218,7 @@ component output="false" accessors="true"{
}
public function getProcessorCount(){
- return createObject("java", "java.lang.Runtime").getRuntime().availableProcessors();
+ return objectFactory.getProcessorCount();
}
package function storeExecutor( string name, any executor ){
@@ -139,11 +231,12 @@ component output="false" accessors="true"{
return this;
}
- package function shutdownAllExecutors(){
+ package function shutdownAllExecutors( timeout=100, timeUnit="milliseconds" ){
if( NOT structIsEmpty( getThisStorageScope() ) ){
var scope = getThisStorageScope();
for( var executor in scope ){
- writeLog("Shutting down executor named #executor#");
+ writeLog("Waiting #timeout# #timeUnit# for tasks to complete and then shutting down executor named #executor#");
+ scope[executor].awaitTermination( timeout, objectFactory.getTimeUnitByName( timeUnit ) );
scope[executor].shutdownNow();
}
structClear( scope );
30 CompletionService.cfc
View
@@ -3,14 +3,14 @@ component extends="ExecutorService" accessors="true" output="false"{
property name="completionQueueProcessFrequency" type="numeric";
property name="completionQueueProcessTask";
property name="completionQueueProcessTaskProxy";
-
+
//expose all the guts... power users need this stuff, and they shall have it
property name="workQueue";
property name="completionQueue";
property name="workExecutor";
property name="completionService";
property name="completionQueueProcessService";
-
+
variables.completionQueueProcessTask = "";
variables.completionQueueProcessTaskFuture = "";
variables.completionQueueProcessTaskID = "completionQueueProcessor";
@@ -22,43 +22,43 @@ component extends="ExecutorService" accessors="true" output="false"{
@maxWorkQueueSize
@maxCompletionQueueSize
*/
- public function init( appName, numeric maxConcurrent=0, numeric completionQueueProcessFrequency=30, numeric maxWorkQueueSize=10000, numeric maxCompletionQueueSize=100000 ){
+ public function init( appName, numeric maxConcurrent=0, numeric completionQueueProcessFrequency=30, numeric maxWorkQueueSize=10000, numeric maxCompletionQueueSize=100000, objectFactory="#createObject('component', 'ObjectFactory').init()#" ){
structAppend( variables, arguments );
- return super.init( appName, maxConcurrent, maxWorkQueueSize );
+ return super.init( appName, maxConcurrent, maxWorkQueueSize, objectFactory );
}
-
+
public function start(){
super.start();
variables.completionQueue = objectFactory.createQueue( maxCompletionQueueSize );
variables.completionService = objectFactory.createCompletionService( workExecutor, completionQueue );
setSubmissionTarget( completionService );
-
- variables.completionQueueProcessService = new ScheduledExecutorService( appName, 1 ).start();
-
+
+ variables.completionQueueProcessService = new ScheduledExecutorService( appName, 1, objectFactory ).start();
+
//in the event that a completion task has been set prior to start(), we'll schedule it now
scheduleCompletionTask();
-
+
return this;
}
-
+
/**
* A Task CFC with a void run() method
*/
public function setCompletionQueueProcessTask( completionQueueProcessTask ){
-
+
structAppend( variables, arguments );
-
+
completionQueueProcessFuture = scheduleCompletionTask();
}
-
+
private function scheduleCompletionTask(){
logMessage("Starting to schedule completion task");
if( structKeyExists( variables, "completionQueueProcessService") AND NOT isSimpleValue(variables.completionQueueProcessTask) ){
logMessage( "scheduling completion task at rate of #completionQueueProcessFrequency#" );
-
+
completionQueueProcessTask.setCompletionService( getCompletionService() );
return completionQueueProcessService.scheduleAtFixedRate( completionQueueProcessTaskID, completionQueueProcessTask, completionQueueProcessFrequency, completionQueueProcessFrequency, "seconds");
}
}
-}
+}
37 ExecutorService.cfc
View
@@ -1,50 +1,37 @@
-component extends="AbstractConcurrencyService" accessors="true" output="false"{
+component extends="AbstractExecutorService" accessors="true" output="false"{
//expose all the guts... power users need this stuff, and they shall have it
property name="workQueue";
property name="workExecutor";
-
+
/**
* @appName The unique application name for this Completion service
@maxConcurrent The maximum number of tasks which will be run at one time. A value of 0 will cause the maxConcurrent to be calculated as Number of CPUs + 1
@maxWorkQueueSize
*/
- public function init( appName, numeric maxConcurrent=0, numeric maxWorkQueueSize=10000 ){
-
- super.init( appName );
+ public function init( appName, numeric maxConcurrent=0, numeric maxWorkQueueSize=10000, objectFactory="#createObject('component', 'ObjectFactory').init()#" ){
+
+ super.init( appName, objectFactory );
structAppend( variables, arguments );
if( maxConcurrent LTE 0 ){
variables.maxConcurrent = getProcessorCount() + 1;
}
-
+
return this;
}
-
+
public function start(){
-
+
variables.workQueue = objectFactory.createQueue( maxWorkQueueSize );
-
+
//TODO: extract this policy and make it settable
variables.workExecutor = objectFactory.createThreadPoolExecutor( maxConcurrent, workQueue, "DiscardPolicy" );
setSubmissionTarget( workExecutor );
-
+
//store the executor for sane destructability
storeExecutor( "workExecutor", variables.workExecutor );
-
+
return super.start();
}
-
- public function submitCallable( task ){
-
- if( isStarted() ){
- var proxy = objectFactory.createCallableProxy( task );
- return getSubmissionTarget().submit( proxy );
-
- } else if( isPaused() ) {
- writeLog("Service paused... ignoring submission");
- } else if( isStopped() ){
- throw("Service is stopped... not accepting new tasks");
- }
- }
-}
+}
91 ObjectFactory.cfc
View
@@ -1,13 +1,13 @@
component output="false" accessors="true"{
-
- property name="cfcDynamicProxy";
-
+
+ property name="cfcDynamicProxy";
+
supportsNativeProxy = structKeyExists( getFunctionList(), "createDynamicProxy" );
callableInterfaces = ["java.util.concurrent.Callable"];
- runnableInterfaces = ["java.lang.Runnable"];
+ runnableInterfaces = ["java.lang.Runnable"];
variables.serverScopeName = "__cfconcurrentJavaLoader";
timeUnit = createTimeUnit();
-
+
public function init(){
if( NOT supportsNativeProxy ){
writeLog("Native createDynamicProxy not supported... falling back to JavaLoader. All Hail Galaxar! er... Mark Mandel!");
@@ -20,57 +20,65 @@ component output="false" accessors="true"{
}
return this;
}
-
+
+ public function getJavaLoader(){
+ return server[variables.serverScopeName];
+ }
+
+ public function getProcessorCount(){
+ return createObject("java", "java.lang.Runtime").getRuntime().availableProcessors();
+ }
+
public function createTimeUnit(){
return createObject( "java", "java.util.concurrent.TimeUnit" );
}
-
+
public function createQueue( maxQueueSize, queueClass="java.util.concurrent.LinkedBlockingQueue" ){
return createObject("java", queueClass).init( maxQueueSize );
}
-
+
public function createThreadPoolExecutor( maxConcurrent, workQueue, rejectionPolicy="DiscardPolicy"){
- return createObject("java", "java.util.concurrent.ThreadPoolExecutor").init(
- maxConcurrent,
- maxConcurrent,
- 0,
- timeUnit.SECONDS,
- workQueue,
+ return createObject("java", "java.util.concurrent.ThreadPoolExecutor").init(
+ maxConcurrent,
+ maxConcurrent,
+ 0,
+ timeUnit.SECONDS,
+ workQueue,
createRejectionPolicyByName( rejectionPolicy )
);
}
-
+
public function createScheduledThreadPoolExecutor( maxConcurrent=1, rejectionPolicy="DiscardPolicy" ){
- return createObject("java", "java.util.concurrent.ScheduledThreadPoolExecutor").init(
+ return createObject("java", "java.util.concurrent.ScheduledThreadPoolExecutor").init(
maxConcurrent,
createRejectionPolicyByName( rejectionPolicy )
);
}
-
+
public function createCompletionService( executor, completionQueue ){
return createObject("java", "java.util.concurrent.ExecutorCompletionService").init( executor, completionQueue );
}
-
+
public function createRejectionPolicyByName( name ){
return createObject("java", "java.util.concurrent.ThreadPoolExecutor$#name#").init();
}
-
+
public function createDiscardPolicy(){
return createRejectionPolicyByName("DiscardPolicy");
}
-
+
public function createDiscardOldestPolicy(){
return createRejectionPolicyByName("DiscardOldestPolicy");
}
-
+
public function createAbortPolicy(){
return createRejectionPolicyByName("AbortPolicy");
}
-
+
public function createCallerRunsPolicy(){
return createRejectionPolicyByName("CallerRunsPolicy");
}
-
+
public function getTimeUnitByName( unit="seconds" ){
switch(unit){
case "seconds":
@@ -88,20 +96,31 @@ component output="false" accessors="true"{
case "days":
return timeUnit.DAYS;
default:
- throw("unknown unit #arguments.unit#. Valid values are 'seconds', 'nanoseconds', 'microseconds', 'milliseconds', 'minutes', 'hours', 'days'");
+ throw("unknown unit #arguments.unit#. Valid values are 'seconds', 'nanoseconds', 'microseconds', 'milliseconds', 'minutes', 'hours', 'days'");
}
}
+ public function createSubmittableProxy( object ){
+ if( isCallable( object ) ){
+ return createProxy( object, callableInterfaces );
+ }
+ if( isRunnable( object ) ){
+ return createProxy( object, runnableInterfaces );
+ }
+
+ throw("Task must have either a call() or run() method", "TaskNotSubmittable");
+ }
+
public function createRunnableProxy( object ){
ensureRunnableTask( object );
return createProxy( object, runnableInterfaces );
}
-
+
public function createCallableProxy( object ){
ensureCallableTask( object );
return createProxy( object, callableInterfaces );
- }
-
+ }
+
public function createProxy( object, interfaces ){
if( supportsNativeProxy ){
return createDynamicProxy( arguments.object, arguments.interfaces );
@@ -109,20 +128,24 @@ component output="false" accessors="true"{
return cfcDynamicProxy.createInstance( arguments.object, arguments.interfaces );
}
}
-
+
public function ensureRunnableTask( task ){
- if( NOT isObject(task) OR NOT structKeyExists( task, "run" ) ){
+ if( NOT isRunnable( task ) ){
throw("Task does not have a run() method", "TaskNotRunnable")
}
}
-
+
public function ensureCallableTask( task ){
- if( NOT isObject(task) OR NOT structKeyExists( task, "call" ) ){
+ if( NOT isCallable( task ) ){
throw("Task does not have a call() method", "TaskNotCallable")
}
}
-
- public function getJavaLoader(){
- return server[variables.serverScopeName];
+
+ public function isCallable( object ){
+ return isObject( object ) AND structKeyExists( object, "call" );
+ }
+
+ public function isRunnable( object ){
+ return isObject( object ) AND structKeyExists( object, "run" );
}
}
36 ScheduledExecutorService.cfc
View
@@ -1,34 +1,34 @@
/**
* http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html
*/
-component extends="AbstractConcurrencyService" accessors="true" output="false"{
+component extends="AbstractExecutorService" accessors="true" output="false"{
property name="scheduledExecutor";
property name="storedTasks" type="struct";
-
- public function init( String appName, maxConcurrent=0 ){
-
- super.init( appName );
+
+ public function init( String appName, maxConcurrent=0, objectFactory="#createObject('component', 'ObjectFactory').init()#" ){
+
+ super.init( appName, objectFactory );
structAppend( variables, arguments );
if( maxConcurrent LTE 0 ){
variables.maxConcurrent = getProcessorCount() + 1;
}
-
+
storedTasks = {};
return this;
}
-
+
public function start(){
variables.scheduledExecutor = objectFactory.createScheduledThreadPoolExecutor( maxConcurrent );
-
+
//store the executor for sane destructability
storeExecutor( "scheduledExecutor", variables.scheduledExecutor );
-
+
return super.start();
}
-
+
public function scheduleAtFixedRate( id, task, initialDelay, period, timeUnit="seconds" ){
-
+
var future = scheduledExecutor.scheduleAtFixedRate(
objectFactory.createRunnableProxy( task ),
initialDelay,
@@ -38,7 +38,7 @@ component extends="AbstractConcurrencyService" accessors="true" output="false"{
storeTask( id, task, future );
return future;
}
-
+
public function scheduleWithFixedDelay( id, task, initialDelay, delay, timeUnit="seconds" ){
var future = scheduledExecutor.scheduleWithFixedDelay(
objectFactory.createRunnableProxy( task ),
@@ -49,17 +49,17 @@ component extends="AbstractConcurrencyService" accessors="true" output="false"{
storeTask( id, task, future );
return future;
}
-
+
package function storeTask( id, task, future ){
-
+
lock name="storeScheduledTask_#appName#_#id#" timeout="2"{
cancelTask( id );
storedTasks[ id ] = { task = task, future = future };
}
-
+
return this;
- }
-
+ }
+
/**
* Returns a struct with keys 'task' and 'future'. The 'task' is the original object submitted to the executor.
The 'future' is the <ScheduledFuture> object returned when submitting the task
@@ -75,4 +75,4 @@ component extends="AbstractConcurrencyService" accessors="true" output="false"{
}
}
-}
+}
2  examples/CompletionService/submit.cfm
View
@@ -13,7 +13,7 @@
</cfif>
<cfset task = new HelloTask( idStub & "_#i#", val(url.sleepTime) )>
- <cfset future = application.completionService.submitCallable( task )>
+ <cfset future = application.completionService.submit( task )>
</cfloop>
<cfset workQueueSize = application.completionService.getWorkQueue().size()>
8 examples/ExecutorService/submit.cfm
View
@@ -27,7 +27,7 @@
<p>NOTE: It is the expected, <a href="http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html#get(long, java.util.concurrent.TimeUnit)" target="_blank">documented behavior</a> that it'll throw an exception when the get() times out prior to completing</p>
<cfset task = new HelloTask( url.sleepTime )>
- <cfset future = application.executorService.submitCallable( task )>
+ <cfset future = application.executorService.submit( task )>
<cfset result = future.get( url.waitTime, application.executorService.getTimeUnit().MILLISECONDS )>
<cfdump var="#result#" expand="false" label="Click to Expand">
@@ -41,7 +41,7 @@
<p>Example 2: Sometimes you want to wait for the task to complete regardless of how long it takes</p>
<cfset task = new HelloTask( url.sleepTime )>
- <cfset future = application.executorService.submitCallable( task )>
+ <cfset future = application.executorService.submit( task )>
<cfset result = future.get()>
<cfdump var="#result#" expand="false" label="Click to Expand">
@@ -58,7 +58,7 @@
<p>Example 3: Sometimes your code will error. If you try/catch it, you can stick the error object into the result you return</p>
<cfset task = new HelloTask( 0, true, true )>
- <cfset future = application.executorService.submitCallable( task )>
+ <cfset future = application.executorService.submit( task )>
<cfset result = future.get()>
<cfdump var="#result#" expand="false" label="Click to Expand">
@@ -71,7 +71,7 @@
<p>Example 4: If you don't catch your errors, the call to get() will thus throw the error
<cfset task = new HelloTask( 0, true )>
- <cfset future = application.executorService.submitCallable( task )>
+ <cfset future = application.executorService.submit( task )>
<cftry>
<cfset result = future.get()>
<cfcatch>
2  examples/ormInExecutor/submit.cfm
View
@@ -5,6 +5,6 @@
for(i=1; i<=5;i++){
task = createObject("component", "cfconcurrent.examples.ormInExecutor.model.EntityLoadingTask").init( randRange(1,10) );
- application.completionService.submitCallable( task );
+ application.completionService.submit( task );
}
</cfscript>
8 index.cfm
View
@@ -5,13 +5,13 @@
<div class="hero-unit">
<h1>Welcome to CFConcurrent</h1>
<p>
- CFConcurrent is a ColdFusion library for simplifying usage of the Java Concurrency Framework
+ CFConcurrent simplifies the use of the Java Concurrency Framework
(<a href="http://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html">java tutorial</a> | <a href="http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html">javadoc</a>)
in ColdFusion applications.
</p>
<p>
- CFThread is suitable for management-free, fire-and-forget concurrency.
- Robust production applications, however, require higher-level abstractions and a greater degree of control.
+ Although CFThread is suitable for management-free, fire-and-forget concurrency, robust production applications
+ require higher-level abstractions and a greater degree of control.
The Java Concurrency Framework (JCF) provides such improvements.
</p>
<p>You create CFCs that act as "tasks" that return results. You submit those tasks to the JCF for execution. You can then retrieve the execution results immediately when they are available, or you can create a periodic "polling" task which processes the completed results.</p>
@@ -57,7 +57,7 @@
<div class="span4">
<h2>Extensible Base Components</h2>
- <p>Create your own custom services by extending the <code>AbstractConcurrencyService</code></p>
+ <p>Create your own custom services by extending the <code>AbstractExecutorService</code></p>
</div><!--/span-->
</div><!--/row-->
4 tests/AbstractCompletionTaskTest.cfc
View
@@ -17,8 +17,8 @@ component extends="mxunit.framework.TestCase"{
var task2 = new fixture.simpleCallableTask("task2");
var factory = completionservice.getObjectFactory();
- var proxy1 = factory.createCallableProxy(task1);
- var proxy2 = factory.createCallableProxy(task2);
+ var proxy1 = factory.createSubmittableProxy(task1);
+ var proxy2 = factory.createSubmittableProxy(task2);
javaCompletionService.submit(proxy1);
javaCompletionService.submit(proxy2);
2  tests/AbstractConcurrencyServiceTest.cfc → tests/AbstractExecutorServiceTest.cfc
View
@@ -2,7 +2,7 @@ component extends="mxunit.framework.TestCase"{
function setUp(){
- service = new cfconcurrent.AbstractConcurrencyService("unittest");
+ service = new cfconcurrent.AbstractExecutorService("unittest");
}
function tearDown(){
8 tests/CompletionServiceTest.cfc
View
@@ -37,8 +37,8 @@ component extends="mxunit.framework.TestCase"{
service.start();
var task1 = new fixture.SimpleCallableTask("task1");
var task2 = new fixture.SimpleCallableTask("task2");
- var future1 = service.submitCallable(task1);
- var future2 = service.submitCallable(task2);
+ var future1 = service.submit(task1);
+ var future2 = service.submit(task2);
//we know the unit test is set to publish every second
sleep(1100);
@@ -60,7 +60,7 @@ component extends="mxunit.framework.TestCase"{
service.start();
var task1 = new fixture.SimpleCallableORMTask(1);
- var future1 = service.submitCallable(task1);
+ var future1 = service.submit(task1);
sleep(1100);
var result = future1.get();
@@ -74,7 +74,7 @@ component extends="mxunit.framework.TestCase"{
function createObject_works_in_task_in_executor(){
service.start();
var task1 = new fixture.ObjectCreatingCallableTask(1);
- var future1 = service.submitCallable(task1);
+ var future1 = service.submit(task1);
sleep(1100);
var result = future1.get();
107 tests/ExecutorServiceTest.cfc
View
@@ -0,0 +1,107 @@
+component extends="mxunit.framework.TestCase"{
+
+ function setUp(){
+ service = new cfconcurrent.ExecutorService(appName="unittest");
+ service.setLoggingEnabled( true );
+ service.start();
+ }
+
+ function tearDown(){
+ service.stop(1);
+ }
+
+ function init_without_maxConcurrent_defaults_to_cpu_count_plus_one(){
+ var cpus = service.getProcessorCount();
+ assertEquals( cpus + 1, service.getMaxConcurrent() );
+ }
+
+ function submit_callable_returns_future(){
+ var task = new fixture.SimpleCallableTask(1);
+ var future = service.submit( task );
+ var result = future.get();
+ debug( result );
+ debug( future );
+ assertTrue( structIsEmpty( result.error ) );
+ assertEquals( 1, result.id );
+ assertTrue( future.isDone() );
+ }
+
+ function submit_runnable_returns_runnable_future(){
+ var task = new fixture.SimpleRunnableTask(1);
+ var future = service.submit( task );
+ var result = future.get();
+ assertTrue( isNull(result), "RunnableFutures return null upon successful completion" );
+ assertTrue( future.isDone() );
+ }
+
+ function invokeAll_returns_array_of_futures_for_array_of_tasks(){
+ var tasks = [];
+ for( var i = 1; i LTE 10; i++ ){
+ arrayAppend( tasks, new fixture.SimpleCallableTask(i) );
+ }
+
+ var results = service.invokeAll( tasks );
+ writeLog("results!");
+ assertTrue( isArray(results) );
+ for( var i = 1; i LTE 10; i++ ){
+ var future = results[i];
+ assertTrue( future.isDone() );
+ var result = future.get();
+ assertEquals( i, result.id );
+ }
+ }
+
+ function invokeAll_honors_timeout(){
+ var tasks = [];
+ //create tasks that will sleep for 50 milliseconds
+ for( var i = 1; i LTE 10; i++ ){
+ arrayAppend( tasks, new fixture.SimpleCallableTask(i, 50) );
+ }
+
+ //invokeAll with a 20-ms timeout
+ var results = service.invokeAll( tasks, 20, "milliseconds" );
+
+ for( future in results ){
+ if( NOT future.isCancelled() ){
+ fail("Task should have been cancelled b/c it did not complete by the timeout");
+ }
+ }
+ }
+
+ function invokeAny_returns_single_result(){
+ var tasks = [];
+ for( var i = 1; i LTE 10; i++ ){
+ arrayAppend( tasks, new fixture.SimpleCallableTask(i) );
+ }
+
+ var result = service.invokeAny( tasks );
+ assertTrue( result.id GTE 1 and result.id LTE 10 ); //the order in which tasks are run is indeterminate
+ }
+
+ /**
+ * @mxunit:expectedException java.util.concurrent.TimeoutException
+ */
+ function invokeAny_honors_timeout(){
+ var tasks = [];
+ //create tasks that will sleep for 50 milliseconds
+ for( var i = 1; i LTE 10; i++ ){
+ arrayAppend( tasks, new fixture.SimpleCallableTask(i, 50) );
+ }
+
+ //invokeAny with a 20-ms timeout
+ var results = service.invokeAny( tasks, 20, "milliseconds" );
+
+ }
+
+ function execute_executes_task(){
+ var task = new fixture.SimpleRunnableTask(1);
+ //guard
+ assertEquals( 0, task.getResults().runCount );
+
+ service.execute( task );
+ sleep(2);
+ debug(task.getResults());
+ assertEquals( 1, task.getResults().runCount );
+ }
+
+}
1  tests/ScheduledExecutorServiceTest.cfc
View
@@ -48,7 +48,6 @@ component extends="mxunit.framework.TestCase"{
sleep(200);
var queue = service.getScheduledExecutor().getQueue();
- assertEquals(2, arrayLen(queue.toArray()));
assertEquals(2, structCount(service.getStoredTasks()));
var cancelled1 = service.cancelTask( "task1" );
13 tests/fixture/SimpleCallableTask.cfc
View
@@ -1,22 +1,25 @@
component{
results = { created = now(), createTS = getTickCount(), error={} };
-
- function init( id ){
- results.id = arguments.id;
+
+ function init( id, sleepTime=0 ){
+ structAppend( results, arguments );
return this;
}
-
+
function call(){
try{
writeLog("Inside call!")
results.message = "Hidey ho!";
+ if( results.sleepTime ){
+ sleep( results.sleepTime );
+ }
} catch( any e ){
writeLog("OH NOES!!!!! #e.message#; #e.detail#");
results.error = e;
}
results.endTS = getTickCount();
-
+
writeLog("returning from call() for task #results.id#");
return results;
}
Please sign in to comment.
Something went wrong with that request. Please try again.