Skip to content

Commit

Permalink
added finish action and several utility methods, WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbien committed May 4, 2011
1 parent 3a20670 commit ba3c4f8
Showing 1 changed file with 68 additions and 8 deletions.
76 changes: 68 additions & 8 deletions src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
Expand Up @@ -20,20 +20,23 @@
/**
* A multithreaded pool of OpenCL command queues.
* It serves as a multiplexer distributing tasks over N queues.
* The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s
* instead of {@link Callable}s.
* @author Michael Bien
*/
public class CLCommandQueuePool implements CLResource {

private final List<CLCommandQueue> queues;
private final ExecutorService excecutor;
private FinishAction finishAction = FinishAction.DO_NOTHING;

private CLCommandQueuePool(Collection<CLCommandQueue> queues) {
this.queues = Collections.unmodifiableList(new ArrayList<CLCommandQueue>(queues));
this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues));
}

public static CLCommandQueuePool create(CLMultiContext mc) {
return create(mc.getDevices());
public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) {
return create(mc.getDevices(), modes);
}

public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) {
Expand All @@ -48,8 +51,16 @@ public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) {
return new CLCommandQueuePool(queues);
}

public <T> Future<T> submit(CLTask<T> task) {
return excecutor.submit(new TaskWrapper(task));
public <R> Future<R> submit(CLTask<R> task) {
return excecutor.submit(new TaskWrapper(task, finishAction));
}

public <R> List<Future<R>> invokeAll(Collection<CLTask<R>> tasks) throws InterruptedException {
List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size());
for (CLTask<R> task : tasks) {
wrapper.add(new TaskWrapper<R>(task, finishAction));
}
return excecutor.invokeAll(wrapper);
}

/**
Expand Down Expand Up @@ -77,15 +88,34 @@ public void release() {
for (CLCommandQueue queue : queues) {
queue.finish().release();
}
excecutor.shutdown();
}

/**
* Returns the command queues used in this pool.
*/
public List<CLCommandQueue> getQueues() {
return queues;
}

/**
* Returns the size of this pool (number of command queues).
*/
public int getSize() {
return queues.size();
}

public FinishAction getFinishAction() {
return finishAction;
}

public void setFinishAction(FinishAction action) {
this.finishAction = action;
}

@Override
public String toString() {
return getClass().getSimpleName()+" [queues: "+queues.size()+"]";
return getClass().getSimpleName()+" [queues: "+queues.size()+" on finish: "+finishAction+"]";
}

private static class QueueThreadFactory implements ThreadFactory {
Expand Down Expand Up @@ -115,16 +145,46 @@ public QueueThread(CLCommandQueue queue) {
private static class TaskWrapper<T> implements Callable<T> {

private final CLTask<T> task;
private final FinishAction mode;

public TaskWrapper(CLTask<T> task) {
public TaskWrapper(CLTask<T> task, FinishAction mode) {
this.task = task;
this.mode = mode;
}

public T call() throws Exception {
QueueThread thread = (QueueThread) Thread.currentThread();
return task.run(thread.queue);
CLCommandQueue queue = ((QueueThread)Thread.currentThread()).queue;
T result = task.run(queue);
if(mode.equals(FinishAction.FLUSH)) {
queue.flush();
}else if(mode.equals(FinishAction.FINISH)) {
queue.finish();
}
return result;
}

}

/**
* The action executed after a task completes.
*/
public enum FinishAction {

/**
* Does nothing, the task is responsible to make sure all computations
* have finished when the task finishes
*/
DO_NOTHING,

/**
* Flushes the queue on task completion.
*/
FLUSH,

/**
* Finishes the queue on task completion.
*/
FINISH
}

}

0 comments on commit ba3c4f8

Please sign in to comment.