Permalink
Browse files

now works with kilim threads

  • Loading branch information...
krestenkrab committed Dec 2, 2009
1 parent ff9698d commit b64dfc81929dc9d69b43e39a2b057d3a510b2ab2
@@ -95,6 +95,86 @@ public T get(EventSubscriber eo) {
return msg;
}
+
+ /**
+ * @return non-null message.
+ * @throws Pausable
+ */
+ public void untilHasMessage() throws Pausable{
+ Task t = Task.getCurrentTask();
+ boolean msg = hasMessage(t);
+ while (msg == false) {
+ Task.pause(this);
+ msg = hasMessage(t);
+ }
+ }
+
+
+ /**
+ * @return non-null message.
+ * @throws Pausable
+ */
+ public boolean untilHasMessage(long timeoutMillis) throws Pausable {
+ final Task t = Task.getCurrentTask();
+ boolean has_msg = hasMessage(t);
+ long begin = System.currentTimeMillis();
+ while (has_msg == false) {
+ TimerTask tt = new TimerTask() {
+ public void run() {
+ Mailbox.this.removeMsgAvailableListener(t);
+ t.onEvent(Mailbox.this, timedOut);
+ }
+ };
+ Task.timer.schedule(tt, timeoutMillis);
+ Task.pause(this);
+ tt.cancel();
+ if (System.currentTimeMillis() - begin > timeoutMillis) {
+ break;
+ }
+ has_msg = hasMessage(t);
+ }
+ return has_msg;
+ }
+
+
+ /**
+ * Non-blocking, nonpausing "wait-until-message-available".
+ * @param eo. If non-null, registers this observer and calls it with a MessageAvailable event when
+ * a put() is done.
+ * @return true's one, or false
+ */
+ public boolean hasMessage(EventSubscriber eo) {
+ boolean has_msg;
+ synchronized(this) {
+ int n = numMsgs;
+ if (n > 0) {
+ has_msg = true;
+ } else {
+ has_msg = false;
+ addMsgAvailableListener(eo);
+ }
+ }
+ return has_msg;
+ }
+
+ /**
+ * Non-blocking, nonpausing peek.
+ * @return buffered message if there's one, or null
+ */
+ public T peek() {
+ T msg;
+ synchronized(this) {
+ int n = numMsgs;
+ if (n > 0) {
+ int ic = icons;
+ msg = msgs[ic];
+ } else {
+ msg = null;
+ }
+ }
+ return msg;
+ }
+
/**
* Non-blocking, nonpausing put.
* @param eo. If non-null, registers this observer and calls it with an SpaceAvailable event
@@ -143,6 +223,7 @@ public boolean put(T msg, EventSubscriber eo) {
}
// notify get's subscriber that something is available
if (subscriber != null) {
+ System.err.println("sending messageAvailable to "+subscriber);
subscriber.onEvent(this, messageAvailable);
}
return ret;
@@ -81,6 +81,9 @@
public Object exitResult = "OK";
+
+ private Error death_ex;
+
// TODO: move into a separate timer service or into the schduler.
public final static Timer timer = new Timer(true);
@@ -120,6 +123,24 @@ public Task start() {
return this;
}
+ /**
+ * Used to make an exception happen inside this task
+ * @param ex
+ */
+ public void kill(Error ex) {
+ System.err.println("killing "+this+" setting "+death_ex);
+
+ this.death_ex = ex;
+ resume();
+ }
+
+ private void maybeKill() {
+ if (this.death_ex != null) {
+ System.err.println("killing "+this+": throw "+death_ex);
+ throw this.death_ex;
+ }
+ }
+
/**
* The generated code calls Fiber.upEx, which in turn calls
* this to find out out where the current method is w.r.t
@@ -146,12 +167,15 @@ public void onEvent(EventPublisher ep, Event e) {
// is mailbox.put or get(), and that it'll be the pausereason as well.
if (ep == pauseReason) {
resume();
- }
+ } else {
+ System.err.println("not current pause reason");
+ }
}
/**
* This is typically called by a pauseReason to resume the task.
*/
public void resume() {
+ System.err.println("resuming "+this+" on "+scheduler);
if (scheduler == null) return;
boolean doSchedule = false;
@@ -271,6 +295,7 @@ public static void pause(PauseReason pauseReason, Fiber f) {
f.task.setPauseReason(null);
}
f.togglePause();
+ f.task.maybeKill();
}
/*
@@ -335,6 +360,9 @@ void _runExecute(WorkerThread thread) throws NotPausable {
try {
currentThread = Thread.currentThread();
assert (preferredResumeThread == null || preferredResumeThread == thread) : "Resumed " + id + " in incorrect thread. ";
+
+ System.err.println("scheduling "+this);
+
// start execute. fiber is wound to the beginning.
execute(f.begin());
@@ -23,6 +23,7 @@
public int numResumes = 0;
WorkerThread(Scheduler ascheduler) {
+ setDaemon(true);
scheduler=ascheduler;
}
@@ -74,7 +74,7 @@ public MethodVisitor visitMethod(
public ArrayList<MethodFlow> analyze(boolean forceAnalysis) throws KilimException {
// cr.accept(this, ClassReader.SKIP_DEBUG);
- cr.accept(this, ClassReader.SKIP_DEBUG);
+ cr.accept(this, 0);
for (Object o: this.fields) {
FieldNode fn = (FieldNode)o;
if (fn.name.equals(Constants.WOVEN_FIELD)) {
@@ -175,6 +175,15 @@ private String toString(String className, String methName, String desc) {
return className.replace('/', '.') + '.' + methName + desc;
}
+ /* (non-Javadoc)
+ * @see org.objectweb.asm.tree.MethodNode#visitLineNumber(int, org.objectweb.asm.Label)
+ */
+ @Override
+ public void visitLineNumber(int line, Label start) {
+ // TODO Auto-generated method stub
+ super.visitLineNumber(line, start);
+ }
+
@Override
public void visitLabel(Label label) {
// if (hasPausableAnnotation)
@@ -19,6 +19,8 @@
package erjang;
+import kilim.Pausable;
+
/**
*
*/
@@ -38,13 +40,13 @@ public EInternalPort self() {
}
@Override
- public void send_exit(EHandle from, EObject reason) {
+ public void send_exit(EHandle from, EObject reason) throws Pausable {
// TODO Auto-generated method stub
}
@Override
- public void mbox_send(EObject msg) {
+ public void mbox_send(EObject msg) throws Pausable {
// TODO Auto-generated method stub
}
@@ -110,6 +110,8 @@ static EFun make(Method method) {
// make sure we have it's superclass loaded
get_fun_class(ary);
+ data = weave(data);
+
Class<? extends EFun> res_class = ERT.defineClass(cl, clname.replace(
'/', '.'), data, 0, data.length);
@@ -135,14 +137,7 @@ static EFun make(Method method) {
byte[] data = gen_fun_class_data(arity);
- ClassWeaver w = new ClassWeaver(data, new Compiler.ErjangDetector("/xx/"));
- for (ClassInfo ci : w.getClassInfos()) {
- ETuple.dump(ci.className, ci.bytes);
-
- if (ci.className.equals(EFUN_NAME)) {
- data = ci.bytes;
- }
- }
+ data = weave(data);
return ERT.defineClass(EFun.class.getClassLoader(), self_type.replace(
@@ -254,7 +249,7 @@ static EFun get_fun_with_handler(int arity, EFunHandler handler) {
cw.visitEnd();
byte[] data = cw.toByteArray();
- ETuple.dump(self_type, data);
+ data = weave(data);
Class<? extends EFun> clazz = ERT.defineClass(EFun.class
.getClassLoader(), self_type.replace('/', '.'), data, 0,
@@ -275,6 +270,17 @@ static EFun get_fun_with_handler(int arity, EFunHandler handler) {
throw new Error(e);
}
}
+
+ public static byte[] weave(byte[] data) {
+ ClassWeaver w = new ClassWeaver(data, new Compiler.ErjangDetector("/xx/"));
+ for (ClassInfo ci : w.getClassInfos()) {
+ ETuple.dump(ci.className, ci.bytes);
+
+ if (!ci.className.startsWith("kilim"))
+ data = ci.bytes;
+ }
+ return data;
+ }
private static void create_cast(ClassWriter cw, int n) {
MethodVisitor mv = cw.visitMethod(Opcodes.ACC_PUBLIC|Opcodes.ACC_STATIC, "cast",
@@ -304,7 +310,7 @@ private static void create_cast(ClassWriter cw, int n) {
private static void make_go_method(ClassWriter cw, String self_type,
int arity) {
MethodVisitor mv;
- mv = cw.visitMethod(ACC_PUBLIC, "go", GO_DESC, null, null);
+ mv = cw.visitMethod(ACC_PUBLIC, "go", GO_DESC, null, PAUSABLE_EX);
mv.visitCode();
for (int i = 0; i < arity; i++) {
@@ -349,7 +355,7 @@ private static void make_go_method(ClassWriter cw, String self_type,
private static void make_invoke_method(ClassWriter cw, String self_type,
int arity) {
MethodVisitor mv = cw.visitMethod(ACC_PUBLIC, "invoke", EUtil
- .getSignature(arity, true), null, null);
+ .getSignature(arity, true), null, PAUSABLE_EX);
mv.visitCode();
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1);
@@ -19,6 +19,8 @@
package erjang;
+import kilim.Pausable;
+
/**
* An EHandle is either an EPort or an EPID. EHandles can be sent messages
*/
@@ -30,16 +32,18 @@
/**
* @param msg
+ * @throws Pausable
*/
- public void send(EObject msg) {
+ public void send(EObject msg) throws Pausable {
task().mbox_send(msg);
}
/**
* @param self
* @param result
+ * @throws Pausable
*/
- public void exit_signal(EHandle from, EObject reason) {
+ public void exit_signal(EHandle from, EObject reason) throws Pausable {
task().send_exit(from, reason);
}
@@ -33,4 +33,11 @@ public EInteger asInteger() {
return this;
}
+ /**
+ * @return
+ */
+ public long longValue() {
+ return bigintValue().longValue();
+ }
+
}
@@ -19,6 +19,8 @@
package erjang;
+import kilim.Pausable;
+
/**
* This is a PID on this node
*/
@@ -39,15 +41,15 @@ public EInternalPID(EProc self) {
}
@Override
- public void send(EObject msg) {
+ public void send(EObject msg) throws Pausable {
proc.mbox_send(msg);
}
/* (non-Javadoc)
* @see erjang.EPID#send_exit(erjang.EPID, erjang.EObject)
*/
@Override
- public void exit_signal(EHandle from, EObject reason) {
+ public void exit_signal(EHandle from, EObject reason) throws Pausable {
proc.send_exit(from, reason);
}
@@ -73,4 +75,12 @@ public void set_group_leader(EPID group_leader) {
public int internal_pid_number() {
throw new NotImplemented();
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "PID<" + task() + ">";
+ }
}
Oops, something went wrong.

0 comments on commit b64dfc8

Please sign in to comment.