Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fixed Cell, minor bugs

  • Loading branch information...
commit 4a4fbf9ff9a97fe1f7a27babea890eb5e0c6d7b3 1 parent e8a4c11
sriram srinivasan authored
71 src/kilim/Cell.java
View
@@ -16,7 +16,6 @@
*/
public class Cell<T> implements PauseReason, EventPublisher {
- // TODO. Give mbox a config name and id and make monitorable
T msg;
EventSubscriber sink;
@@ -46,19 +45,23 @@ public Cell() {
*/
public T get(EventSubscriber eo) {
EventSubscriber producer = null;
+ T ret;
synchronized(this) {
- if (msg == null) {
+ if (msg == null) {
+ ret = null;
addMsgAvailableListener(eo);
- } else {
+ } else {
+ ret = msg;
+ msg = null;
if (srcs.size() > 0) {
producer = srcs.poll();
}
- }
+ }
}
if (producer != null) {
producer.onEvent(this, spaceAvailble);
}
- return msg;
+ return ret;
}
/**
@@ -128,11 +131,11 @@ public T get(long timeoutMillis) throws Pausable {
long begin = System.currentTimeMillis();
while (msg == null) {
TimerTask tt = new TimerTask() {
- public void run() {
- Cell.this.removeMsgAvailableListener(t);
- t.onEvent(Cell.this, timedOut);
- }
- };
+ public void run() {
+ Cell.this.removeMsgAvailableListener(t);
+ t.onEvent(Cell.this, timedOut);
+ }
+ };
Task.timer.schedule(tt, timeoutMillis);
Task.pause(this);
tt.cancel();
@@ -145,28 +148,28 @@ public void run() {
}
public synchronized void addSpaceAvailableListener(EventSubscriber spcSub) {
- srcs.add(spcSub);
- }
+ srcs.add(spcSub);
+ }
public synchronized void removeSpaceAvailableListener(EventSubscriber spcSub) {
- srcs.remove(spcSub);
- }
+ srcs.remove(spcSub);
+ }
public synchronized void addMsgAvailableListener(EventSubscriber msgSub) {
- if (sink != null) {
- throw new AssertionError(
- "Error: A mailbox can not be shared by two consumers. New = "
- + msgSub + ", Old = " + sink);
+ if (sink != null) {
+ throw new AssertionError(
+ "Error: A mailbox can not be shared by two consumers. New = "
+ + msgSub + ", Old = " + sink);
+ }
+ sink = msgSub;
}
- sink = msgSub;
- }
public synchronized void removeMsgAvailableListener(EventSubscriber msgSub) {
- if (sink == msgSub) {
- sink = null;
+ if (sink == msgSub) {
+ sink = null;
+ }
}
- }
public boolean putnb(T msg) {
return put(msg, null);
@@ -184,11 +187,11 @@ public boolean put(T msg, int timeoutMillis) throws Pausable {
long begin = System.currentTimeMillis();
while (!put(msg, t)) {
TimerTask tt = new TimerTask() {
- public void run() {
- Cell.this.removeSpaceAvailableListener(t);
- t.onEvent(Cell.this, timedOut);
- }
- };
+ public void run() {
+ Cell.this.removeSpaceAvailableListener(t);
+ t.onEvent(Cell.this, timedOut);
+ }
+ };
Task.timer.schedule(tt, timeoutMillis);
Task.pause(this);
if (System.currentTimeMillis() - begin >= timeoutMillis) {
@@ -237,12 +240,12 @@ public void putb(T msg, final long timeoutMillis) {
}
public synchronized boolean hasMessage() {
- return msg != null;
- }
+ return msg != null;
+ }
public synchronized boolean hasSpace() {
- return msg == null;
- }
+ return msg == null;
+ }
/**
* retrieve a message, blocking the thread indefinitely. Note, this is a
@@ -279,8 +282,8 @@ public T getb(final long timeoutMillis) {
}
public synchronized String toString() {
- return "id:" + System.identityHashCode(this) + " " + msg;
- }
+ return "id:" + System.identityHashCode(this) + " " + msg;
+ }
// Implementation of PauseReason
public boolean isValid(Task t) {
4 src/kilim/Mailbox.java
View
@@ -389,8 +389,8 @@ public boolean isValid(Task t) {
}
public boolean isValid(Task t) {
- // The pauseReason is "Empty" if the none of the mboxes have any
- // elements
+ // The pauseReason is true (there is valid reason to continue
+ // pausing) if none of the mboxes have any elements
for (Mailbox mb : mbxs) {
if (mb.hasMessage())
return false;
2  src/kilim/Scheduler.java
View
@@ -56,6 +56,7 @@ public void schedule(Task t) {
WorkerThread wt = null;
synchronized(this) {
+ assert t.running == true : "Task " + t + " scheduled even though running is false";
runnableTasks.put(t);
if (!waitingThreads.isEmpty())
wt = waitingThreads.poll();
@@ -97,6 +98,7 @@ Task getNextTask(WorkerThread wt) {
if (t == null) {
wt.waitForMsgOrSignal();
} else if (prefThread == null || prefThread == wt) {
+ assert t.currentThread == null: " Task " + t + " already running";
return t;
} else {
prefThread.addRunnableTask(t);
18 src/kilim/Task.java
View
@@ -19,6 +19,8 @@
*
*/
public abstract class Task implements EventSubscriber {
+ public volatile Thread currentThread = null;
+
static PauseReason yieldReason = new YieldReason();
/**
* Task id, automatically generated
@@ -194,7 +196,7 @@ public void informOnExit(Mailbox<ExitMsg> exit) {
public static void exit(Object aExitValue) throws Pausable { }
public static void exit(Object aExitValue, Fiber f) {
- assert f.pc == 0;
+ assert f.pc == 0 : "f.pc != 0";
f.task.setPauseReason(new TaskDoneReason(aExitValue));
f.togglePause();
}
@@ -206,7 +208,7 @@ public static void exit(Object aExitValue, Fiber f) {
*/
public static void errorExit(Throwable ex) throws Pausable { }
public static void errorExit(Throwable ex, Fiber f) {
- assert f.pc == 0;
+ assert f.pc == 0 : "fc.pc != 0";
f.task.setPauseReason(new TaskDoneReason(ex));
f.togglePause();
}
@@ -250,6 +252,8 @@ public void run() {
public static void yield(Fiber f) {
if (f.pc == 0) {
f.task.setPauseReason(yieldReason);
+ } else {
+ f.task.setPauseReason(null);
}
f.togglePause();
}
@@ -330,13 +334,14 @@ public void _runExecute(WorkerThread thread) throws NotPausable {
Fiber f = fiber;
boolean isDone = false;
try {
+ currentThread = Thread.currentThread();
assert (preferredResumeThread == null || preferredResumeThread == thread) : "Resumed " + id + " in incorrect thread. ";
// start execute. fiber is wound to the beginning.
execute(f.begin());
// execute() done. Check fiber if it is pausing and reset it.
isDone = f.end() || (pauseReason instanceof TaskDoneReason);
-
+ assert (pauseReason == null && isDone) || (pauseReason != null && !isDone) : "pauseReason:" + pauseReason + ",isDone =" + isDone;
} catch (Throwable th) {
th.printStackTrace();
// Definitely done
@@ -368,15 +373,18 @@ public void _runExecute(WorkerThread thread) throws NotPausable {
preferredResumeThread = null;
}
}
+
+ PauseReason pr = this.pauseReason;
synchronized (this) {
running = false;
+ currentThread = null;
}
-
+
// The task has been in "running" mode until now, and may have missed
// notifications to the pauseReason object (that is, it would have
// resisted calls to resume(). If the pauseReason is not valid any
// more, we'll resume.
- if (!pauseReason.isValid(this)) {
+ if (!pr.isValid(this)) {
resume();
}
}
5 src/kilim/WorkerThread.java
View
@@ -27,12 +27,17 @@
}
public void run() {
+ try {
while (true) {
Task t = scheduler.getNextTask(this); // blocks until task available
if (t == null) break; // scheduler shut down
runningTask = t;
t._runExecute(this);
}
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ System.out.println(runningTask);
+ }
}
public Task getCurrentTask() {
2  src/kilim/analysis/MethodWeaver.java
View
@@ -67,7 +67,7 @@
this.classWeaver = cw;
this.methodFlow = mf;
isPausable = mf.isPausable();
- fiberVar = methodFlow.maxLocals; // alloc fresh var
+ fiberVar = methodFlow.maxLocals;
maxVars = fiberVar + 1;
maxStack = methodFlow.maxStack + 1; // plus Fiber
if (!mf.isAbstract()) {
Please sign in to comment.
Something went wrong with that request. Please try again.