Permalink
Browse files

use priority queue for runnableTasks, add priority API

  • Loading branch information...
krestenkrab committed Dec 12, 2009
1 parent 85e2abd commit 3409e1837975541e7f9064039c7c5e2736fa6056
Showing with 53 additions and 22 deletions.
  1. +2 −1 src/kilim/Mailbox.java
  2. +17 −3 src/kilim/Scheduler.java
  3. +34 −18 src/kilim/Task.java
View
@@ -151,8 +151,9 @@ public void run() {
*/
public boolean untilHasMessages(int num, long timeoutMillis) throws Pausable {
final Task t = Task.getCurrentTask();
+ final long end = System.currentTimeMillis() + timeoutMillis;
+
boolean has_msg = hasMessages(num, t);
- long end = System.currentTimeMillis() + timeoutMillis;
while (has_msg == false) {
TimerTask tt = new TimerTask() {
public void run() {
View
@@ -6,7 +6,9 @@
package kilim;
+import java.util.Comparator;
import java.util.LinkedList;
+import java.util.PriorityQueue;
/**
* This is a basic FIFO Executor. It maintains a list of
@@ -23,7 +25,17 @@
LinkedList<WorkerThread> allThreads = new LinkedList<WorkerThread>();
LinkedList<WorkerThread> waitingThreads = new LinkedList<WorkerThread>();
protected boolean shutdown = false;
- protected RingQueue<Task> runnableTasks = new RingQueue<Task>(1000);
+ // protected RingQueue<Task> runnableTasks = new RingQueue<Task>(1000);
+ protected PriorityQueue<Task> runnableTasks = new PriorityQueue<Task>(1000, new Comparator<Task>() {
+
+ public int compare(Task o1, Task o2) {
+ if (o1.priority < o2.priority)
+ return -1;
+ else if (o1.priority == o2.priority)
+ return 0;
+ else
+ return 1;
+ }});
static {
String s = System.getProperty("kilim.Scheduler.numThreads");
@@ -57,7 +69,7 @@ public void schedule(Task t) {
synchronized(this) {
assert t.running == true : "Task " + t + " scheduled even though running is false";
- runnableTasks.put(t);
+ runnableTasks.add(t);
if (!waitingThreads.isEmpty())
wt = waitingThreads.poll();
}
@@ -87,10 +99,12 @@ Task getNextTask(WorkerThread wt) {
return t;
}
- t = runnableTasks.get();
+ t = runnableTasks.peek();
if (t == null) {
waitingThreads.add(wt);
} else {
+ Task tt = runnableTasks.remove();
+ assert t == tt : "queue not in order?";
prefThread = t.preferredResumeThread;
}
}
View
@@ -26,6 +26,18 @@
*/
public final int id;
static final AtomicInteger idSource = new AtomicInteger();
+
+ public static final int PRIORITY_MAX = 0;
+ public static final int PRIORITY_HIGH = 1;
+ public static final int PRIORITY_NORMAL = 2;
+ public static final int PRIORITY_LOW = 3;
+
+ /**
+ * The priority of this task.
+ * One of {@link #PRIORITY_MAX}, {@link #PRIORITY_HIGH},
+ * {@link #PRIORITY_NORMAL} or {@link #PRIORITY_LOW}.
+ */
+ int priority = PRIORITY_NORMAL;
/**
* The stack manager in charge of rewinding and unwinding
@@ -82,8 +94,6 @@
public Object exitResult = "OK";
- private Error death_ex;
-
// TODO: move into a separate timer service or into the schduler.
public final static Timer timer = new Timer(true);
@@ -123,22 +133,10 @@ public Task start() {
return this;
}
- /**
- * Used to make an exception happen inside this task
- * @param ex
- */
- public void kill(Error ex) {
- System.err.println("killing "+this+" setting "+death_ex);
-
- this.death_ex = ex;
- resume();
- }
-
- public void checkKill() {
- if (this.death_ex != null) {
- // System.err.println("killing "+this+": throw "+death_ex);
- throw this.death_ex;
- }
+ /** can be overridden in subclasses to check if the task needs killing,
+ * which is done by simply throwing some subclass of java.lang.Error
+ * from here. */
+ public void checkKill() throws java.lang.Error {
}
/**
@@ -343,6 +341,24 @@ public final PauseReason getPauseReason() {
return pauseReason;
}
+
+ /**
+ * We only allow a task to set this on itself, so that
+ * it does not happen while it is runnable [in the priority queue],
+ * which would invalidate the internal invsariants of the queue.
+ *
+ * @param value
+ * @param f
+ * @throws Pausable
+ */
+ public final void setPriority(int value) throws Pausable {
+ assert Task.getCurrentTask() == this : "task can only call setPriority this on self";
+ this.priority = value;
+ }
+
+ public final int getPriority() {
+ return priority;
+ }
public synchronized boolean isDone() {
return done;

0 comments on commit 3409e18

Please sign in to comment.