Permalink
Browse files

- initial task class

  • Loading branch information...
1 parent 22cf108 commit 2619428afcff0fececd0d1e7a7b0d6f57643efd2 @sushantk committed Sep 18, 2012
Showing with 254 additions and 9 deletions.
  1. +20 −0 ants/src/ants/Data.java
  2. +114 −0 ants/src/ants/Task.java
  3. +109 −0 ants/src/ants/TaskManager.java
  4. +11 −9 ants/src/ants/Util.java
View
@@ -0,0 +1,20 @@
+package ants;
+
+public class Data {
+
+ private byte[] bytes;
+ private String mimeType;
+
+ public Data(byte[] bytes, String mimeType) {
+ this.bytes = bytes;
+ this.mimeType = mimeType;
+ }
+
+ byte[] getBytes() {
+ return this.bytes;
+ }
+
+ String getMimeType() {
+ return this.mimeType;
+ }
+}
View
@@ -0,0 +1,114 @@
+package ants;
+
+import java.util.LinkedList;
+
+/**
+ * The most basic unit of activity
+ */
+public abstract class Task {
+
+ static public enum Status {
+ DONE, NOT_DONE
+ }
+
+ static public enum Result {
+ FAILED, SUCCEDED, TIMEDOUT
+ }
+
+ static public enum Type {
+ CPU, SYNC_IO, ASYNC_IO
+ }
+
+ /**
+ * Callback to send the task completion notification to one
+ * or more listeners
+ */
+ public interface Callback {
+ Iterable<Task> onComplete(Task task);
+ }
+
+ private Type type;
+ private Status status = Status.NOT_DONE;
+ private Result result = Result.FAILED;
+ private Object userData;
+ private Data data;
+
+ private LinkedList<Callback> callbacks = new LinkedList<Callback>();
+
+ public Task(Type type, Object userData) {
+ this.userData = userData;
+ this.type = type;
+ }
+
+ public Object getUserData() {
+ return this.userData;
+ }
+
+ public Type getType() {
+ return this.type;
+ }
+
+ public Status getStatus() {
+ return this.status;
+ }
+
+ public Result getResult() {
+ return this.result;
+ }
+
+ public Data getData() {
+ return this.data;
+ }
+
+ synchronized public void setData(Data data, Result result) {
+ this.status = Status.DONE;
+
+ this.data = data;
+ this.result = result;
+
+ // let the monitor know that the async task has finished fetching
+ if ((Type.ASYNC_IO == this.type) && (null != this.asyncMonitor)) {
+ this.asyncMonitor.onAsyncDataReady(this);
+ this.asyncMonitor = null;
+ }
+ }
+
+ public abstract Iterable<Task> run();
+
+ Iterable<Callback> getCallbacks() {
+ return this.callbacks;
+ }
+
+ public void addCallback(Callback callback) {
+ this.callbacks.add(callback);
+ }
+
+ public String toString() {
+ return "Task<" + userData + ">";
+ }
+
+ /**
+ * Used by the task manager to monitor async task activity
+ */
+ public interface AsyncMonitor {
+ /**
+ * Notify the monitor when the async task has finished fetching
+ * the data
+ */
+ void onAsyncDataReady(Task task);
+ }
+
+ private AsyncMonitor asyncMonitor;
+
+ /**
+ * if the data has already arrived, we will notify the monitor
+ * right away, else save it for future notification in setData
+ */
+ synchronized public void setAsyncMonitor(AsyncMonitor monitor) {
+ if (Status.DONE == this.status) {
+ monitor.onAsyncDataReady(this);
+ } else {
+ this.asyncMonitor = monitor;
+ }
+ }
+}
@@ -0,0 +1,109 @@
+package ants;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides foundation for task execution
+ */
+public class TaskManager implements Task.AsyncMonitor {
+
+ static ThreadPoolExecutor cpuTaskExecutor;
+ static ThreadPoolExecutor syncIOTaskExecutor;
+
+ /*
+ * One time configuration to tune thread pools as per the need and system
+ * capacity
+ */
+ public static void configure(int cpuTaskPoolSize, int syncIOTaskPoolSize) {
+ // cpu tasks have a fixed size pre-started thread pool with an unbounded
+ // queue. it is recommended that the pool size be a little more than the
+ // number of available cores in the system
+ ThreadPoolExecutor cpuTaskExecutor = new ThreadPoolExecutor(
+ cpuTaskPoolSize, cpuTaskPoolSize, 0, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(cpuTaskPoolSize));
+ cpuTaskExecutor.allowCoreThreadTimeOut(false);
+ cpuTaskExecutor.prestartAllCoreThreads();
+ TaskManager.cpuTaskExecutor = cpuTaskExecutor;
+
+ // sync io tasks have a fixed size thread pool with an unbounded queue.
+ // threads start and grow up to the pool size as per the need and also
+ // stay alive after the use.
+ ThreadPoolExecutor syncIOTaskExecutor = new ThreadPoolExecutor(
+ syncIOTaskPoolSize, syncIOTaskPoolSize, 0,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(
+ syncIOTaskPoolSize));
+ syncIOTaskExecutor.allowCoreThreadTimeOut(false);
+ TaskManager.syncIOTaskExecutor = syncIOTaskExecutor;
+ }
+
+ boolean logging;
+
+ public TaskManager(boolean logging) {
+ this.logging = logging;
+ }
+
+ public void run(final Task task) {
+ final TaskManager self = this;
+
+ switch (task.getType()) {
+ case CPU: {
+ TaskManager.cpuTaskExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ Iterable<Task> next = task.run();
+ self.complete(task, next);
+ }
+ });
+ }
+ break;
+ case SYNC_IO: {
+ TaskManager.syncIOTaskExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ Iterable<Task> next = task.run();
+ self.complete(task, next);
+ }
+ });
+ }
+ break;
+ case ASYNC_IO: {
+ task.setAsyncMonitor(this);
+ }
+ break;
+ }
+ }
+
+ /**
+ * @see ants.Task.AsyncMonitor#onAsyncDataReady(ants.Task)
+ */
+ @Override
+ public void onAsyncDataReady(final Task task) {
+ // thread pool is not used to run the async io task, as it is already
+ // done fetching data. it is supposedly going to advice for any further
+ // activity via returned tasks
+ Iterable<Task> next = task.run();
+ this.complete(task, next);
+ }
+
+ /**
+ * finalize the task after completing its run
+ */
+ private void complete(Task task, Iterable<Task> nextTasks) {
+ // run the new set of tasks returned by the completed task
+ for (Task next : nextTasks) {
+ this.run(next);
+ }
+
+ // notify the callbacks and run the new set of tasks returned
+ // by the callback
+ Iterable<Task.Callback> callbacks = task.getCallbacks();
+ for (Task.Callback callback : callbacks) {
+ Iterable<Task> cbNextTasks = callback.onComplete(task);
+ for (Task next : cbNextTasks) {
+ this.run(next);
+ }
+ }
+ }
+}
View
@@ -1,18 +1,20 @@
package ants;
-import java.util.List;
-
public class Util {
- public static String implode(List<String> a_list, String a_seperator) {
- String value = "";
- int length = a_list.size();
- for(int i = 0; i < length; i++) {
- if(i > 0) value += a_seperator;
- value += a_list.get(i);
+ public static String implode(Iterable<String> list, String seperator) {
+ StringBuffer value = new StringBuffer();
+
+ int i = 0;
+ for(String string : list) {
+ if(i > 0) {
+ value.append(seperator);
+ }
+ value.append(string);
+ i++;
}
- return value;
+ return value.toString();
}
}

0 comments on commit 2619428

Please sign in to comment.