Skip to content

Commit

Permalink
Included _thread() implementation for threading in a thread pool, rat…
Browse files Browse the repository at this point in the history
…her than cfthread.
  • Loading branch information
markmandel committed Apr 9, 2012
1 parent 32096b6 commit f9576fb
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 36 deletions.
15 changes: 14 additions & 1 deletion README.md
Expand Up @@ -119,7 +119,20 @@ Do each iteration in it's own thread, and then join it all back again at the end

* data - the array/struct to perform a closure on
* closure - the closure to pass through the elements from data to.
* numberOfThreads - number of threads to use in the thread pool for processing.
* numberOfThreads - number of threads to use in the thread pool for processing. (Only needed if you aren't using _withPool())

### _thread(function closure) : any ###

Run the closure in a thread. Must be run inside a _withPool() block to set up the ExecutorService, and close it off at the end. For example:<br/> _withPool( 10, function() {<br/> _thread(function() { ... });<br/> _thread(function() { ... });<br/> }); <br/> Return an instance of java.util.concurrent.Future to give you control over the closure, and/or retrieve the value returned from the closure.

* closure - the closure to call asynchronously

### _withPool(numeric numberOfThreads, function closure) : void ###

This function allows you to share the underlying ExecutorService with multiple concurrent methods.<br/> For example, this shares a threadpool of 10 threads across multiple _eachParrallel calls:<br/> _withPool( 10, function() {<br/> _eachParrallel(array, function() { ... });<br/> _eachParrallel(array, function() { ... });<br/> _eachParrallel(array, function() { ... });<br/> });

* numberOfThreads - the number of threads to use in the thread pool for processing.
* closure - the closure that contains the calls to other concurrent library functions.


Contributions
Expand Down
116 changes: 94 additions & 22 deletions sesame/concurrency.cfm
Expand Up @@ -15,8 +15,8 @@
Make it easier to do things in parrallel.
These functions often used the variables scope to perform what they do in UUID'd scoped contexts.
Make it easier to do things in parallel.
They can put objects in the request scope, under the key 'sesame-concurrency-es'
--->

<cfscript>
Expand All @@ -25,46 +25,118 @@
*
* @data the array/struct to perform a closure on
* @closure the closure to pass through the elements from data to.
* @numberOfThreads number of threads to use in the thread pool for processing.
* @numberOfThreads number of threads to use in the thread pool for processing. (Only needed if you aren't using _withPool())
*/
public void function _eachParallel(required any data, required function closure, numeric numberOfThreads=5)
{
var futures = [];
var executorService = createObject("java", "java.util.concurrent.Executors").newFixedThreadPool(arguments.numberOfThreads);
var _closure = arguments.closure;
if(isArray(arguments.data))
if(!structKeyExists(request, "sesame-concurrency-es"))
{
for(var item in arguments.data)
var executorService = createObject("java", "java.util.concurrent.Executors").newFixedThreadPool(arguments.numberOfThreads);
var shutDownEs = true;
}
else
{
var executorService = request["sesame-concurrency-es"];
var shutDownEs = false;
}
try
{
if(isArray(arguments.data))
{
var args ={1=item};
var runnable = new sesame.concurrency.ClosureRunnable(_closure, args);
for(var item in arguments.data)
{
var args ={1=item};
var runnable = new sesame.concurrency.ClosureConcurrent(_closure, args);
runnable = createDynamicProxy(runnable, ["java.lang.Runnable"]);
runnable = createDynamicProxy(runnable, ["java.lang.Runnable"]);
var future = executorService.submit(runnable);
var future = executorService.submit(runnable);
arrayAppend(futures, future);
arrayAppend(futures, future);
}
}
}
if(isStruct(arguments.data))
{
for(var key in arguments.data)
if(isStruct(arguments.data))
{
var args ={1=key, 2=arguments.data[key]};
var runnable = new sesame.concurrency.ClosureRunnable(_closure, args);
for(var key in arguments.data)
{
var args ={1=key, 2=arguments.data[key]};
var runnable = new sesame.concurrency.ClosureConcurrent(_closure, args);
runnable = createDynamicProxy(runnable, ["java.lang.Runnable"]);
var future = executorService.submit(runnable.toRunnable());
var future = executorService.submit(runnable);
arrayAppend(futures, future);
}
}
arrayAppend(futures, future);
//join it all back up
ArrayEach(futures, function(it) { it.get(); });
}
catch(Any exc)
{
rethrow;
}
finally
{
if(shutDownEs)
{
executorService.shutdown();
}
}
}
/**
* This function allows you to share the underlying ExecutorService with multiple concurrent methods.<br/>
* For example, this shares a threadpool of 10 threads across multiple _eachParrallel calls:<br/>
* _withPool( 10, function() {<br/>
* _eachParrallel(array, function() { ... });<br/>
* _eachParrallel(array, function() { ... });<br/>
* _eachParrallel(array, function() { ... });<br/>
* });
*
* @numberOfThreads the number of threads to use in the thread pool for processing.
* @closure the closure that contains the calls to other concurrent library functions.
*/
public void function _withPool(required numeric numberOfThreads, required function closure)
{
request["sesame-concurrency-es"] = createObject("java", "java.util.concurrent.Executors").newFixedThreadPool(arguments.numberOfThreads);
//join it all back up
ArrayEach(futures, function(it) { it.get(); });
try
{
arguments.closure();
}
catch(Any exc)
{
rethrow;
}
finally
{
request["sesame-concurrency-es"].shutdown();
StructDelete(request, "sesame-concurrency-es");
}
}
/**
* Run the closure in a thread. Must be run inside a _withPool() block to set up the ExecutorService, and close it off at the end.
* For example:<br/>
* _withPool( 10, function() {<br/>
* _thread(function() { ... });<br/>
* _thread(function() { ... });<br/>
* });
*<br/>
* Return an instance of java.util.concurrent.Future to give you control over the closure, and/or retrieve the value returned from the closure.
*
* @closure the closure to call asynchronously
*/
public any function _thread(required function closure)
{
var executorService = request["sesame-concurrency-es"];
var callable = new sesame.concurrency.ClosureConcurrent(arguments.closure);
return executorService.submit(callable.toCallable());
}
</cfscript>
Expand Up @@ -15,7 +15,7 @@
*/

/**
* Implementation of java.util.Runnable that takes a closure and runs it. No value is returned.
* Implementation of java.util.Runnable and java.util.concurrent.Callable that takes a closure and runs it.
*/
component accessors="true"
{
Expand All @@ -26,19 +26,44 @@ component accessors="true"
* Constructor
*
* @func The function/closure to be called.
* @args the arguments to pass through. Defaults to none.
*/
public ClosureRunnable function init(required function func, struct args={})
public ClosureConcurrent function init(required function func, struct args={})
{
setFunc(arguments.func);
setArgs(args);
return this;
}

/**
* Call the function.
* Call the function, and returns nothing.
*/
public void function run()
{
variables.func(argumentCollection=variables.args);
}

/**
* Call the function, and returns it's value
*/
public any function call()
{
return variables.func(argumentCollection=variables.args);
}

/**
* convert to runnable
*/
public any function toRunnable()
{
return createDynamicProxy(this, ["java.lang.Runnable"]);
}

/**
* convert to Callable
*/
public any function toCallable()
{
return createDynamicProxy(this, ["java.util.concurrent.Callable"]);
}
}
87 changes: 77 additions & 10 deletions tests/cases/ConcurrencyTest.cfc
Expand Up @@ -24,6 +24,34 @@ component extends="tests.AbstractTestCase"
assertEquals([2,4,6,8], collected);
}

/**
* test each in parrallel with an array
*/
public void function testEachParrallelwithArrayWithPool()
{
var data = [1, 2, 3, 4];
var collected = [];

//gate
assertFalse(structKeyExists(request, "sesame-concurrency-es"));

_withPool(10, function()
{
assertTrue(structKeyExists(request, "sesame-concurrency-es"));

_eachParallel(data, function(it) { ArrayAppend(collected, 2*it); });
});

assertFalse(structKeyExists(request, "sesame-concurrency-es"));

debug(data);
debug(collected);

arraySort(collected, "numeric");

assertEquals([2,4,6,8], collected);
}

/**
* test each in parrallel with a struct
*/
Expand All @@ -38,38 +66,77 @@ component extends="tests.AbstractTestCase"
}

/**
* test ClosureRunnable
* test ClosureConcurrent
*/
public void function testClosureRunnableNoArguments()
public void function testClosureConcurrentNoArguments()
{
var func = function() { request.foo = "bar"; };

var runnable = new sesame.concurrency.ClosureRunnable(func);
var runnable = new sesame.concurrency.ClosureConcurrent(func);

assertFalse(structKeyExists(request, "foo"));

runnable.run();

assertTrue(structKeyExists(request, "foo"));
assertEquals("bar", request.foo);

var func = function() { return "Hello World"; };

var callable = new sesame.concurrency.ClosureConcurrent(func);
assertEquals(func(), callable.call());
}

/**
* test ClosureRunnable
* test ClosureConcurrent
*/
public void function testClosureRunnableWithArguments()
public void function testClosureConcurrentWithArguments()
{
var func = function(it) { request.testClosureRunnableWithArguments = it; };
var func = function(it) { request.testClosureConcurrentWithArguments = it; };

var runnable = new sesame.concurrency.ClosureRunnable(func, {1="bar"});
var runnable = new sesame.concurrency.ClosureConcurrent(func, {1="bar"});

assertFalse(structKeyExists(request, "testClosureRunnableWithArguments"));
assertFalse(structKeyExists(request, "testClosureConcurrentWithArguments"));

runnable.run();

assertTrue(structKeyExists(request, "testClosureRunnableWithArguments"));
assertEquals("bar", request.testClosureRunnableWithArguments);
assertTrue(structKeyExists(request, "testClosureConcurrentWithArguments"));
assertEquals("bar", request.testClosureConcurrentWithArguments);

var func = function(it) { return "Hello World #it#"; };
var callable = new sesame.concurrency.ClosureConcurrent(func, {1="GOATS!"});

assertEquals(func("GOATS!"), callable.call());
}

/**
* test thread with a return value
*/
public void function testThreadWithReturnValue()
{
_withPool(5, function()
{
var future = _thread(function() { return "Hello World!"; });

assertEquals("Hello World!", future.get());
});
}

/**
* test thread without a return value
*/
public void function testThreadWithoutReturnValue()
{
var key = createUUID();
var result = {};
var future = 0;
_withPool(5, function()
{
future = _thread(function() { result[key] = 1; });
});

future.get();

AssertTrue(structKeyExists(result, key));
}
}

0 comments on commit f9576fb

Please sign in to comment.