From ba3c4f8e28235c1e0780a88d7cd087abfaddc61b Mon Sep 17 00:00:00 2001 From: Michael Bien Date: Wed, 4 May 2011 22:39:18 +0200 Subject: [PATCH] added finish action and several utility methods, WIP. --- .../util/concurrent/CLCommandQueuePool.java | 76 +++++++++++++++++-- 1 file changed, 68 insertions(+), 8 deletions(-) diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index ef788d6..205f039 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -20,20 +20,23 @@ /** * A multithreaded pool of OpenCL command queues. * It serves as a multiplexer distributing tasks over N queues. + * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s + * instead of {@link Callable}s. * @author Michael Bien */ public class CLCommandQueuePool implements CLResource { private final List queues; private final ExecutorService excecutor; + private FinishAction finishAction = FinishAction.DO_NOTHING; private CLCommandQueuePool(Collection queues) { this.queues = Collections.unmodifiableList(new ArrayList(queues)); this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues)); } - public static CLCommandQueuePool create(CLMultiContext mc) { - return create(mc.getDevices()); + public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) { + return create(mc.getDevices(), modes); } public static CLCommandQueuePool create(Collection devices, CLCommandQueue.Mode... modes) { @@ -48,8 +51,16 @@ public static CLCommandQueuePool create(Collection queues) { return new CLCommandQueuePool(queues); } - public Future submit(CLTask task) { - return excecutor.submit(new TaskWrapper(task)); + public Future submit(CLTask task) { + return excecutor.submit(new TaskWrapper(task, finishAction)); + } + + public List> invokeAll(Collection> tasks) throws InterruptedException { + List> wrapper = new ArrayList>(tasks.size()); + for (CLTask task : tasks) { + wrapper.add(new TaskWrapper(task, finishAction)); + } + return excecutor.invokeAll(wrapper); } /** @@ -77,15 +88,34 @@ public void release() { for (CLCommandQueue queue : queues) { queue.finish().release(); } + excecutor.shutdown(); } + /** + * Returns the command queues used in this pool. + */ public List getQueues() { return queues; } + /** + * Returns the size of this pool (number of command queues). + */ + public int getSize() { + return queues.size(); + } + + public FinishAction getFinishAction() { + return finishAction; + } + + public void setFinishAction(FinishAction action) { + this.finishAction = action; + } + @Override public String toString() { - return getClass().getSimpleName()+" [queues: "+queues.size()+"]"; + return getClass().getSimpleName()+" [queues: "+queues.size()+" on finish: "+finishAction+"]"; } private static class QueueThreadFactory implements ThreadFactory { @@ -115,16 +145,46 @@ public QueueThread(CLCommandQueue queue) { private static class TaskWrapper implements Callable { private final CLTask task; + private final FinishAction mode; - public TaskWrapper(CLTask task) { + public TaskWrapper(CLTask task, FinishAction mode) { this.task = task; + this.mode = mode; } public T call() throws Exception { - QueueThread thread = (QueueThread) Thread.currentThread(); - return task.run(thread.queue); + CLCommandQueue queue = ((QueueThread)Thread.currentThread()).queue; + T result = task.run(queue); + if(mode.equals(FinishAction.FLUSH)) { + queue.flush(); + }else if(mode.equals(FinishAction.FINISH)) { + queue.finish(); + } + return result; } } + /** + * The action executed after a task completes. + */ + public enum FinishAction { + + /** + * Does nothing, the task is responsible to make sure all computations + * have finished when the task finishes + */ + DO_NOTHING, + + /** + * Flushes the queue on task completion. + */ + FLUSH, + + /** + * Finishes the queue on task completion. + */ + FINISH + } + }