Permalink
Browse files

Implement basic flow control: sending a message costs reductions equa…

…l to the size of the target queue. Also, minor updates to debug messages.
  • Loading branch information...
1 parent 6c40220 commit 3cde8cd510428c8addbb7785ec6da8e17e35ea1e @krestenkrab krestenkrab committed Mar 18, 2010
View
4 erl.sh
@@ -4,8 +4,8 @@ if [ "x${OTPROOT}" == "x" ]; then
OTPROOT=/sw/lib/erlang
fi
-java -ea -cp erjang-0.1.jar \
- -Derj.threads=1 \
+java -server -ea -cp erjang-0.1.jar \
+ -Derj.threads=2 \
-Derjpath=$OTPROOT/lib/erts-5.7.3/ebin erjang.OTPMain \
-root $OTPROOT \
-progname erl \
View
Binary file not shown.
@@ -20,6 +20,7 @@
package erjang;
import kilim.Pausable;
+import kilim.Task;
/**
* An EHandle is either an EPort or an EPID. EHandles can be sent messages
@@ -51,6 +52,13 @@ public boolean exists() {
public void send(EObject msg) throws Pausable {
ETask<?> task = task();
if (task != null) {
+
+ task.reds += task.mbox.size();
+ if (task.reds > 1000) {
+ task.reds = 0;
+ Task.yield();
+ }
+
task.mbox_send(msg);
}
}
@@ -20,6 +20,7 @@
package erjang;
import kilim.Pausable;
+import kilim.Task;
/**
* This is a PID on this node
@@ -63,6 +64,13 @@ EProc task() {
public void send(EObject msg) throws Pausable {
EProc task = this.task;
if (task != null) {
+
+ task.reds += task.mbox.size();
+ if (task.reds > 1000) {
+ task.reds = 0;
+ Task.yield();
+ }
+
task.mbox.put(msg);
}
}
@@ -458,6 +458,8 @@ public static EObject send(EProc proc, EObject pid, EObject msg)
throws Pausable {
// TODO handle ports also?
proc.check_exit();
+
+ //System.out.println(""+proc+" :: "+pid+" ! "+msg);
EHandle p;
if ((p = pid.testHandle()) != null) {
@@ -711,7 +713,9 @@ public static void run(Task task) {
/** peek mbox at current index (proc.midx), which is 0 upon entry to the loop. */
public static EObject loop_rec(EProc proc) {
- EObject msg = proc.mbox.peek(proc.midx);
+ int idx = proc.midx;
+ EObject msg = proc.mbox.peek(idx);
+ if (DEBUG_WAIT) System.err.println("WAIT| entered loop #"+idx+" message="+msg);
return msg;
}
@@ -748,13 +752,15 @@ public static boolean wait_timeout(EProc proc, EObject howlong)
/** wait forever, for one more message to be available */
public static void wait(EProc proc) throws Pausable {
- if (DEBUG_WAIT) System.err.println("WAIT| "+proc+" waits for messages");
- proc.mbox.untilHasMessages(proc.midx + 1);
- if (DEBUG_WAIT) System.err.println("WAIT| "+proc+" wakes up after timeout");
+ int idx = proc.midx + 1;
+ if (DEBUG_WAIT) System.err.println("WAIT| "+proc+" waits for "+idx+" messages");
+ proc.mbox.untilHasMessages(idx);
+ if (DEBUG_WAIT) System.err.println("WAIT| "+proc+" wakes up after timeout; now has "+(idx));
}
/** message reception timed out, reset message index */
public static void timeout(EProc proc) {
+ if (DEBUG_WAIT) System.err.println("WAIT| "+proc+" timed out");
proc.midx = 0;
}
@@ -129,8 +129,8 @@ public void remove_monitor(ERef r, boolean flush) {
}
-
- protected Mailbox<EObject> mbox = new Mailbox<EObject>(10, 1000);
+ static final int MAX_MAILBOX_SIZE = 1000;
+ protected Mailbox<EObject> mbox = new Mailbox<EObject>(10, MAX_MAILBOX_SIZE);
protected static enum State {
INIT, // has not started yet
@@ -144,6 +144,8 @@ public void remove_monitor(ERef r, boolean flush) {
protected State pstate = State.INIT;
protected EObject exit_reason;
+ public int reds;
+
/**
* @throws Pausable
*

3 comments on commit 3cde8cd

What a neat trick! Hopefully it works out well.

Owner

krestenkrab replied Mar 19, 2010

BEAM does something similar unless you pass the command-line argument +snsp (scheduler no sender punish).

Owner

krestenkrab replied Mar 19, 2010

From bif.c

    erts_send_message(p, rp, &rp_locks, msg, 0);
    if (!erts_use_sender_punish)
        res = 0;
    else {
#ifdef ERTS_SMP
        res = rp->msg_inq.len*4;
        if (ERTS_PROC_LOCK_MAIN & rp_locks)
        res += rp->msg.len*4;
#else
        res = rp->msg.len*4;
#endif
    }
Please sign in to comment.