From 3f1f0ed615f494b22827e21842811351d064b8b2 Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Thu, 18 Mar 2010 21:05:39 +0800 Subject: [PATCH] Rework: Use RingQueue (not priority queue), change event handling, reformat Mailbox. --- examples/kilim/examples/Group.java | 29 - src/kilim/Cell.java | 295 -------- src/kilim/Mailbox.java | 1102 ++++++++++++++-------------- src/kilim/Scheduler.java | 25 +- src/kilim/Task.java | 28 +- src/kilim/TaskGroup.java | 49 -- src/kilim/WorkerThread.java | 7 +- src/kilim/analysis/CallWeaver.java | 2 + test/kilim/test/TestMailbox.java | 59 -- 9 files changed, 567 insertions(+), 1029 deletions(-) delete mode 100644 examples/kilim/examples/Group.java delete mode 100644 src/kilim/Cell.java delete mode 100644 src/kilim/TaskGroup.java diff --git a/examples/kilim/examples/Group.java b/examples/kilim/examples/Group.java deleted file mode 100644 index 4e4a423..0000000 --- a/examples/kilim/examples/Group.java +++ /dev/null @@ -1,29 +0,0 @@ -/* Copyright (c) 2006, Sriram Srinivasan - * - * You may distribute this software under the terms of the license - * specified in the file "License" - */ - -package kilim.examples; - -import kilim.Pausable; -import kilim.Task; -import kilim.TaskGroup; - -public class Group { - public static void main(String[] args) { - TaskGroup tg = new TaskGroup(); - tg.add(new GroupTask().start()); - tg.add(new GroupTask().start()); - tg.joinb(); - System.exit(0); - } - - static class GroupTask extends Task { - public void execute() throws Pausable { - System.out.println("Task #" + id + "sleeping"); - Task.sleep(1000); - System.out.println("Task #" + id + "done"); - } - } -} \ No newline at end of file diff --git a/src/kilim/Cell.java b/src/kilim/Cell.java deleted file mode 100644 index d3d4741..0000000 --- a/src/kilim/Cell.java +++ /dev/null @@ -1,295 +0,0 @@ -/* Copyright (c) 2006, Sriram Srinivasan - * - * You may distribute this software under the terms of the license - * specified in the file "License" - */ - -package kilim; - -import java.util.LinkedList; -import java.util.TimerTask; - -/** - * A cell is a single-space buffer that supports multiple producers and a single - * consumer, functionally identical to Mailbox bounded to a size of 1 (and hence - * optimized for this size) - */ - -public class Cell implements PauseReason, EventPublisher { - T msg; - EventSubscriber sink; - - public static final int SPACE_AVAILABLE = 1; - public static final int MSG_AVAILABLE = 2; - public static final int TIMED_OUT = 3; - public static final Event spaceAvailble = new Event(MSG_AVAILABLE); - public static final Event messageAvailable = new Event(SPACE_AVAILABLE); - public static final Event timedOut = new Event(TIMED_OUT); - - LinkedList srcs = new LinkedList(); - - // DEBUG stuff - // To do: move into monitorable stat object - /* - * public int nPut = 0; public int nGet = 0; public int nWastedPuts = 0; - * public int nWastedGets = 0; - */ - public Cell() { - } - - /** - * Non-blocking, nonpausing get. - * @param eo. If non-null, registers this observer and calls it with a MessageAvailable event when - * a put() is done. - * @return buffered message if there's one, or null - */ - public T get(EventSubscriber eo) { - EventSubscriber producer = null; - T ret; - synchronized(this) { - if (msg == null) { - ret = null; - addMsgAvailableListener(eo); - } else { - ret = msg; - msg = null; - if (srcs.size() > 0) { - producer = srcs.poll(); - } - } - } - if (producer != null) { - producer.onEvent(this, spaceAvailble); - } - return ret; - } - - /** - * Non-blocking, nonpausing put. - * @param eo. If non-null, registers this observer and calls it with an SpaceAvailable event - * when there's space. - * @return buffered message if there's one, or null - */ - public boolean put(T amsg, EventSubscriber eo) { - boolean ret = true; // assume we'll be able to enqueue - EventSubscriber subscriber; - synchronized(this) { - if (amsg == null) { - throw new NullPointerException("Null message supplied to put"); - } - if (msg == null) { // space available - msg = amsg; - subscriber = sink; - sink = null; - } else { - ret = false; - // unable to enqueue. Cell is full - subscriber = null; - if (eo != null) { - srcs.add(eo); - } - } - } - // notify get's subscriber that something is available - if (subscriber != null) { - subscriber.onEvent(this, messageAvailable); - } - return ret; - } - - /** - * Get, don't pause or block. - * - * @return stored message, or null if no message found. - */ - public T getnb() { - return get(null); - } - - /** - * @return non-null message. - * @throws Pausable - */ - public T get() throws Pausable{ - Task t = Task.getCurrentTask(); - T msg = get(t); - while (msg == null) { - Task.pause(this); - msg = get(t); - } - return msg; - } - - - /** - * @return non-null message. - * @throws Pausable - */ - public T get(long timeoutMillis) throws Pausable { - final Task t = Task.getCurrentTask(); - T msg = get(t); - long begin = System.currentTimeMillis(); - while (msg == null) { - TimerTask tt = new TimerTask() { - public void run() { - Cell.this.removeMsgAvailableListener(t); - t.onEvent(Cell.this, timedOut); - } - }; - Task.timer.schedule(tt, timeoutMillis); - Task.pause(this); - tt.cancel(); - if (System.currentTimeMillis() - begin > timeoutMillis) { - break; - } - msg = get(t); - } - return msg; - } - - public synchronized void addSpaceAvailableListener(EventSubscriber spcSub) { - srcs.add(spcSub); - } - - public synchronized void removeSpaceAvailableListener(EventSubscriber spcSub) { - srcs.remove(spcSub); - } - - - public synchronized void addMsgAvailableListener(EventSubscriber msgSub) { - if (sink != null) { - throw new AssertionError( - "Error: A mailbox can not be shared by two consumers. New = " - + msgSub + ", Old = " + sink); - } - sink = msgSub; - } - - public synchronized void removeMsgAvailableListener(EventSubscriber msgSub) { - if (sink == msgSub) { - sink = null; - } - } - - public boolean putnb(T msg) { - return put(msg, null); - } - - public void put(T msg) throws Pausable { - Task t = Task.getCurrentTask(); - while (!put(msg, t)) { - Task.pause(this); - } - } - - public boolean put(T msg, int timeoutMillis) throws Pausable { - final Task t = Task.getCurrentTask(); - long begin = System.currentTimeMillis(); - while (!put(msg, t)) { - TimerTask tt = new TimerTask() { - public void run() { - Cell.this.removeSpaceAvailableListener(t); - t.onEvent(Cell.this, timedOut); - } - }; - Task.timer.schedule(tt, timeoutMillis); - Task.pause(this); - if (System.currentTimeMillis() - begin >= timeoutMillis) { - return false; - } - } - return true; - } - - public void putb(T msg) { - putb(msg, 0 /* infinite wait */); - } - - public class BlockingSubscriber implements EventSubscriber { - public volatile boolean eventRcvd = false; - public void onEvent(EventPublisher ep, Event e) { - synchronized (Cell.this) { - eventRcvd = true; - Cell.this.notify(); - } - } - public void blockingWait(final long timeoutMillis) { - long start = System.currentTimeMillis(); - long remaining = timeoutMillis; - boolean infiniteWait = timeoutMillis == 0; - synchronized (Cell.this) { - while (!eventRcvd && (infiniteWait || remaining > 0)) { - try { - Cell.this.wait(infiniteWait? 0 : remaining); - } catch (InterruptedException ie) {} - long elapsed = System.currentTimeMillis() - start; - remaining -= elapsed; - } - } - } - } - - public void putb(T msg, final long timeoutMillis) { - BlockingSubscriber evs = new BlockingSubscriber(); - if (!put(msg, evs)) { - evs.blockingWait(timeoutMillis); - } - if (!evs.eventRcvd) { - removeSpaceAvailableListener(evs); - } - } - - public synchronized boolean hasMessage() { - return msg != null; - } - - public synchronized boolean hasSpace() { - return msg == null; - } - - /** - * retrieve a message, blocking the thread indefinitely. Note, this is a - * heavyweight block, unlike #get() that pauses the Fiber but doesn't block - * the thread. - */ - - public T getb() { - return getb(0); - } - - /** - * retrieve a msg, and block the Java thread for the time given. - * - * @param millis. - * max wait time - * @return null if timed out. - */ - public T getb(final long timeoutMillis) { - BlockingSubscriber evs = new BlockingSubscriber(); - T msg; - - if ((msg = get(evs)) == null) { - evs.blockingWait(timeoutMillis); - if (evs.eventRcvd) { - msg = get(null); // non-blocking get. - assert msg != null: "Received event, but message is null"; - } - } - if (msg == null) { - removeMsgAvailableListener(evs); - } - return msg; - } - - public synchronized String toString() { - return "id:" + System.identityHashCode(this) + " " + msg; - } - - // Implementation of PauseReason - public boolean isValid(Task t) { - synchronized(this) { - return (t == sink) || srcs.contains(t); - } - } -} - diff --git a/src/kilim/Mailbox.java b/src/kilim/Mailbox.java index 2059b32..17d07cf 100755 --- a/src/kilim/Mailbox.java +++ b/src/kilim/Mailbox.java @@ -15,577 +15,553 @@ * synchronize with each other (as opposed to direct java calls or static member * variables). put() and get() are the two essential functions. * - * We use the term "block" to mean thread block, and "pause" to mean - * fiber pausing. The suffix "nb" on some methods (such as getnb()) - * stands for non-blocking. + * We use the term "block" to mean thread block, and "pause" to mean fiber + * pausing. The suffix "nb" on some methods (such as getnb()) stands for + * non-blocking. */ public class Mailbox implements PauseReason, EventPublisher { - // TODO. Give mbox a config name and id and make monitorable - T[] msgs; - private int iprod = 0; // producer index - private int icons = 0; // consumer index; - private int numMsgs = 0; - private int maxMsgs = 300; - EventSubscriber sink; - - // FIX: I don't like this event design. The only good thing is that - // we don't create new event objects every time we signal a client - // (subscriber) that's blocked on this mailbox. - public static final int SPACE_AVAILABLE = 1; - public static final int MSG_AVAILABLE = 2; - public static final int TIMED_OUT = 3; - public static final Event spaceAvailble = new Event(MSG_AVAILABLE); - public static final Event messageAvailable = new Event(SPACE_AVAILABLE); - public static final Event timedOut = new Event(TIMED_OUT); - - LinkedList srcs = new LinkedList(); - - // DEBUG stuff - // To do: move into monitorable stat object - /* - * public int nPut = 0; public int nGet = 0; public int nWastedPuts = 0; - * public int nWastedGets = 0; - */ - public Mailbox() { - this(10); - } - - public Mailbox(int initialSize) { - this(initialSize, Integer.MAX_VALUE); - } - - @SuppressWarnings("unchecked") - public Mailbox(int initialSize, int maxSize) { - if (initialSize > maxSize) - throw new IllegalArgumentException("initialSize: " + initialSize - + " cannot exceed maxSize: " + maxSize); - msgs = (T[]) new Object[initialSize]; - maxMsgs = maxSize; - } - - /** - * Non-blocking, nonpausing get. - * @param eo. If non-null, registers this observer and calls it with a MessageAvailable event when - * a put() is done. - * @return buffered message if there's one, or null - */ - public T get(EventSubscriber eo) { - T msg; - EventSubscriber producer = null; - synchronized(this) { - int n = numMsgs; - if (n > 0) { - int ic = icons; - msg = msgs[ic]; msgs[ic]=null; - icons = (ic + 1) % msgs.length; - numMsgs = n - 1; - - if (srcs.size() > 0) { - producer = srcs.poll(); - } - } else { - msg = null; - addMsgAvailableListener(eo); - } - } - if (producer != null) { - producer.onEvent(this, spaceAvailble); - } - return msg; - } - - - /** - * @return non-null message. - * @throws Pausable - */ - public void untilHasMessage() throws Pausable{ - while (hasMessage(Task.getCurrentTask()) == false) { - Task.pause(this); - } - } - - /** - * @return non-null message. - * @throws Pausable - */ - public void untilHasMessages(int num) throws Pausable{ - while (hasMessages(num, Task.getCurrentTask()) == false) { - Task.pause(this); - } - } - - - /** - * @return non-null message. - * @throws Pausable - */ - public boolean untilHasMessage(long timeoutMillis) throws Pausable { - final Task t = Task.getCurrentTask(); - boolean has_msg = hasMessage(t); - long end = System.currentTimeMillis() + timeoutMillis; - while (has_msg == false) { - TimerTask tt = new TimerTask() { - public void run() { - Mailbox.this.removeMsgAvailableListener(t); - t.onEvent(Mailbox.this, timedOut); - } - }; - Task.timer.schedule(tt, timeoutMillis); - Task.pause(this); - tt.cancel(); - has_msg = hasMessage(t); - timeoutMillis = end - System.currentTimeMillis(); - if (timeoutMillis <= 0) { - removeMsgAvailableListener(t); - break; - } - } - return has_msg; - } - - /** - * @return non-null message. - * @throws Pausable - */ - public boolean untilHasMessages(int num, long timeoutMillis) throws Pausable { - final Task t = Task.getCurrentTask(); - final long end = System.currentTimeMillis() + timeoutMillis; - - boolean has_msg = hasMessages(num, t); - while (has_msg == false) { - TimerTask tt = new TimerTask() { - public void run() { - Mailbox.this.removeMsgAvailableListener(t); - t.onEvent(Mailbox.this, timedOut); - } - }; - Task.timer.schedule(tt, timeoutMillis); - Task.pause(this); - tt.cancel(); - has_msg = hasMessages(num, t); - timeoutMillis = end - System.currentTimeMillis(); - if (timeoutMillis <= 0) { - removeMsgAvailableListener(t); - break; - } - } - return has_msg; - } - - - /** - * Non-blocking, nonpausing "wait-until-message-available". - * @param eo. If non-null, registers this observer and calls it with a MessageAvailable event when - * a put() is done. - * @return true's one, or false - */ - public boolean hasMessage(EventSubscriber eo) { - boolean has_msg; - synchronized(this) { - int n = numMsgs; - if (n > 0) { - has_msg = true; - } else { - has_msg = false; - addMsgAvailableListener(eo); - } - } - return has_msg; - } - - public boolean hasMessages(int num, EventSubscriber eo) { - boolean has_msg; - synchronized(this) { - int n = numMsgs; - if (n >= num) { - has_msg = true; - } else { - has_msg = false; - addMsgAvailableListener(eo); - } - } - return has_msg; - } - - /** - * Non-blocking, nonpausing peek. - * @return buffered message if there's one, or null - */ - public T peek(int idx) { - assert idx >= 0 : "negative index"; - T msg; - synchronized(this) { - int n = numMsgs; + // TODO. Give mbox a config name and id and make monitorable + T[] msgs; + private int iprod = 0; // producer index + private int icons = 0; // consumer index; + private int numMsgs = 0; + private int maxMsgs = 300; + MessageConsumer sink; + + // FIX: I don't like this event design. The only good thing is that + // we don't create new event objects every time we signal a client + // (subscriber) that's blocked on this mailbox. + public static final int SPACE_AVAILABLE = 1; + public static final int MSG_AVAILABLE = 2; + public static final int TIMED_OUT = 3; + public static final Event spaceAvailble = new Event(MSG_AVAILABLE); + public static final Event messageAvailable = new Event(SPACE_AVAILABLE); + public static final Event timedOut = new Event(TIMED_OUT); + + LinkedList pendingProducers = new LinkedList(); + + // DEBUG stuff + // To do: move into monitorable stat object + /* + * public int nPut = 0; public int nGet = 0; public int nWastedPuts = 0; + * public int nWastedGets = 0; + */ + public Mailbox() { + this(10); + } + + public Mailbox(int initialSize) { + this(initialSize, Integer.MAX_VALUE); + } + + @SuppressWarnings("unchecked") + public Mailbox(int initialSize, int maxSize) { + if (initialSize > maxSize) + throw new IllegalArgumentException("initialSize: " + initialSize + + " cannot exceed maxSize: " + maxSize); + msgs = (T[]) new Object[initialSize]; + maxMsgs = maxSize; + } + + /** + * Non-blocking, nonpausing get. + * + * @param eo + * . If non-null, registers this observer and calls it with a + * MessageAvailable event when a put() is done. + * @return buffered message if there's one, or null + */ + public T get(MessageConsumer eo) { + T msg; + MessageProducer producer = null; + synchronized (this) { + int n = numMsgs; + if (n > 0) { + int ic = icons; + msg = msgs[ic]; + msgs[ic] = null; + icons = (ic + 1) % msgs.length; + numMsgs = n - 1; + + assert msg != null : "received null message!"; + + if (pendingProducers.size() > 0) { + producer = pendingProducers.poll(); + } + } else { + msg = null; + addMessageConsumer(eo); + } + } + if (producer != null) { + producer.spaceAvailable(this); + } + return msg; + } + + /** + * @return non-null message. + * @throws Pausable + */ + public void untilHasMessage() throws Pausable { + while (hasMessage(Task.getCurrentTask()) == false) { + Task.pause(this); + } + } + + /** + * @return non-null message. + * @throws Pausable + */ + public void untilHasMessages(int num) throws Pausable { + while (hasMessages(num, Task.getCurrentTask()) == false) { + Task.pause(this); + } + } + + /** + * @return non-null message. + * @throws Pausable + */ + public boolean untilHasMessage(long timeoutMillis) throws Pausable { + final Task t = Task.getCurrentTask(); + boolean has_msg = hasMessage(t); + long end = System.currentTimeMillis() + timeoutMillis; + while (has_msg == false) { + TimerTask tt = new TimerTask() { + public void run() { + if (Mailbox.this.removeMessageConsumer(t)) { + t.consumeTimeout(Mailbox.this); + } + } + }; + Task.timer.schedule(tt, timeoutMillis); + Task.pause(this); + tt.cancel(); + has_msg = hasMessage(t); + timeoutMillis = end - System.currentTimeMillis(); + if (timeoutMillis <= 0) { + removeMessageConsumer(t); + break; + } + } + return has_msg; + } + + /** + * @return non-null message. + * @throws Pausable + */ + public boolean untilHasMessages(int num, long timeoutMillis) + throws Pausable { + final Task t = Task.getCurrentTask(); + final long end = System.currentTimeMillis() + timeoutMillis; + + boolean has_msg = hasMessages(num, t); + while (has_msg == false) { + TimerTask tt = new TimerTask() { + public void run() { + if (removeMessageConsumer(t)) { + t.consumeTimeout(Mailbox.this); + } + } + }; + Task.timer.schedule(tt, timeoutMillis); + Task.pause(this); + if (!tt.cancel()) { + removeMessageConsumer(t); + } + + has_msg = hasMessages(num, t); + timeoutMillis = end - System.currentTimeMillis(); + if (!has_msg && timeoutMillis <= 0) { + removeMessageConsumer(t); + break; + } + } + return has_msg; + } + + /** + * Non-blocking, nonpausing "wait-until-message-available". + * + * @param eo + * . If non-null, registers this observer and calls it with a + * MessageAvailable event when a put() is done. + * @return true's one, or false + */ + public boolean hasMessage(MessageConsumer eo) { + boolean has_msg; + synchronized (this) { + int n = numMsgs; + if (n > 0) { + has_msg = true; + } else { + has_msg = false; + addMessageConsumer(eo); + } + } + return has_msg; + } + + public boolean hasMessages(int num, MessageConsumer eo) { + boolean has_msg; + synchronized (this) { + int n = numMsgs; + if (n >= num) { + has_msg = true; + } else { + has_msg = false; + addMessageConsumer(eo); + } + } + return has_msg; + } + + /** + * Non-blocking, nonpausing peek. + * + * @return buffered message if there's one, or null + */ + public T peek(int idx) { + assert idx >= 0 : "negative index"; + T msg; + synchronized (this) { + int n = numMsgs; + if (idx < n) { + int ic = icons; + msg = msgs[(ic + idx) % msgs.length]; + + assert msg != null : "peeked null message!"; + } else { + msg = null; + } + } + return msg; + } + + public T remove(final int idx) { + assert idx >= 0 : "negative index"; + T msg; + synchronized (this) { + int n = numMsgs; + assert idx < numMsgs; if (idx < n) { - int ic = icons; - msg = msgs[(ic+idx)%msgs.length]; - } else { - msg = null; - } - } - return msg; - } - - public T remove(final int idx) { - assert idx >= 0 : "negative index"; - T msg; - synchronized(this) { - int n = numMsgs; - if (idx < n) { - int ic = icons; - int mlen = msgs.length; - msg = msgs[(ic+idx)%mlen]; - for (int i = idx; i > 0; i--) { - msgs[(ic+i)%mlen] = msgs[(ic+i-1)%mlen]; - } - msgs[icons] = null; - numMsgs -= 1; - icons = (icons+1)%mlen; - } else { - throw new IllegalStateException(); - } - } - return msg; - } - - /** - * Non-blocking, nonpausing put. - * @param eo. If non-null, registers this observer and calls it with an SpaceAvailable event - * when there's space. - * @return buffered message if there's one, or null - */ - public boolean put(T msg, EventSubscriber eo) { - boolean ret = true; // assume we will be able to enqueue - EventSubscriber subscriber; - synchronized(this) { - if (msg == null) { - throw new NullPointerException("Null message supplied to put"); - } - int ip = iprod; - int ic = icons; - int n = numMsgs; - if (n == msgs.length) { - assert ic == ip : "numElements == msgs.length && ic != ip"; - if (n < maxMsgs) { - T[] newmsgs = (T[]) new Object[Math.min(n * 2, maxMsgs)]; - System.arraycopy(msgs, ic, newmsgs, 0, n - ic); - if (ic > 0) { - System.arraycopy(msgs, 0, newmsgs, n - ic, ic); - } - msgs = newmsgs; - ip = n; - ic = 0; - } else { - ret = false; - } - } - if (ret) { - numMsgs = n + 1; - msgs[ip] = msg; - iprod = (ip + 1) % msgs.length; - icons = ic; - subscriber = sink; - sink = null; - } else { - subscriber = null; - // unable to enqueue - if (eo != null) { - srcs.add(eo); - } - } - } - // notify get's subscriber that something is available - if (subscriber != null) { - subscriber.onEvent(this, messageAvailable); - } - return ret; - } - - /** - * Get, don't pause or block. - * - * @return stored message, or null if no message found. - */ - public T getnb() { - return get(null); - } - - /** - * @return non-null message. - * @throws Pausable - */ - public T get() throws Pausable{ - Task t = Task.getCurrentTask(); - T msg = get(t); - while (msg == null) { - Task.pause(this); - msg = get(t); - } - return msg; - } - - - /** - * @return non-null message. - * @throws Pausable - */ - public T get(long timeoutMillis) throws Pausable { - final Task t = Task.getCurrentTask(); - T msg = get(t); - long end = System.currentTimeMillis() + timeoutMillis; - while (msg == null) { - TimerTask tt = new TimerTask() { - public void run() { - Mailbox.this.removeMsgAvailableListener(t); - t.onEvent(Mailbox.this, timedOut); - } - }; - Task.timer.schedule(tt, timeoutMillis); - Task.pause(this); - tt.cancel(); - - msg = get(t); - - timeoutMillis = end - System.currentTimeMillis(); - if (timeoutMillis <= 0) { - removeMsgAvailableListener(t); - break; - } - } - return msg; - } - - - /** - * Takes an array of mailboxes and returns the index of the first mailbox - * that has a message. It is possible that because of race conditions, an - * earlier mailbox in the list may also have received a message. - */ - public static int select(Mailbox... mboxes) throws Pausable { - while (true) { - for (int i = 0; i < mboxes.length; i++) { - if (mboxes[i].hasMessage()) { - return i; - } - } - Task t = Task.getCurrentTask(); - EmptySet_MsgAvListener pauseReason = - new EmptySet_MsgAvListener(t, mboxes); - for (int i = 0; i < mboxes.length; i++) { - mboxes[i].addMsgAvailableListener(pauseReason); - } - Task.pause(pauseReason); - for (int i = 0; i < mboxes.length; i++) { - mboxes[i].removeMsgAvailableListener(pauseReason); - } - } - } - - public synchronized void addSpaceAvailableListener(EventSubscriber spcSub) { - srcs.add(spcSub); - } - - public synchronized void removeSpaceAvailableListener(EventSubscriber spcSub) { - srcs.remove(spcSub); - } - - - public synchronized void addMsgAvailableListener(EventSubscriber msgSub) { - if (sink != null && sink != msgSub) { - throw new AssertionError( - "Error: A mailbox can not be shared by two consumers. New = " - + msgSub + ", Old = " + sink); - } - sink = msgSub; - } - - public synchronized void removeMsgAvailableListener(EventSubscriber msgSub) { - if (sink == msgSub) { - sink = null; - } - } - - public boolean putnb(T msg) { - return put(msg, null); - } - - public void put(T msg) throws Pausable { - Task t = Task.getCurrentTask(); - t.checkKill(); - while (!put(msg, t)) { - Task.pause(this); - } - } - - public boolean put(T msg, int timeoutMillis) throws Pausable { - final Task t = Task.getCurrentTask(); - long begin = System.currentTimeMillis(); - while (!put(msg, t)) { - TimerTask tt = new TimerTask() { - public void run() { - Mailbox.this.removeSpaceAvailableListener(t); - t.onEvent(Mailbox.this, timedOut); - } - }; - Task.timer.schedule(tt, timeoutMillis); - Task.pause(this); - if (System.currentTimeMillis() - begin >= timeoutMillis) { - return false; - } - } - return true; - } - - public void putb(T msg) { - putb(msg, 0 /* infinite wait */); - } - - public class BlockingSubscriber implements EventSubscriber { - public volatile boolean eventRcvd = false; - public void onEvent(EventPublisher ep, Event e) { - synchronized (Mailbox.this) { - eventRcvd = true; - Mailbox.this.notify(); - } - } - public void blockingWait(final long timeoutMillis) { - long start = System.currentTimeMillis(); - long remaining = timeoutMillis; - boolean infiniteWait = timeoutMillis == 0; - synchronized (Mailbox.this) { - while (!eventRcvd && (infiniteWait || remaining > 0)) { - try { - Mailbox.this.wait(infiniteWait? 0 : remaining); - } catch (InterruptedException ie) {} - long elapsed = System.currentTimeMillis() - start; - remaining -= elapsed; - } - } - } - } - - public void putb(T msg, final long timeoutMillis) { - BlockingSubscriber evs = new BlockingSubscriber(); - if (!put(msg, evs)) { - evs.blockingWait(timeoutMillis); - } - if (!evs.eventRcvd) { - removeSpaceAvailableListener(evs); - } - } - - public synchronized int size() { - return numMsgs; - } - - public synchronized boolean hasMessage() { - return numMsgs > 0; - } - - /** return true if the mailbox has at least num messages. */ - public synchronized boolean hasMessages(int num) { - return numMsgs >= num; - } - - public synchronized boolean hasSpace() { - return (maxMsgs - numMsgs) > 0; - } - - /** - * retrieve a message, blocking the thread indefinitely. Note, this is a - * heavyweight block, unlike #get() that pauses the Fiber but doesn't block - * the thread. - */ - - public T getb() { - return getb(0); - } - - /** - * retrieve a msg, and block the Java thread for the time given. - * - * @param millis. - * max wait time - * @return null if timed out. - */ - public T getb(final long timeoutMillis) { - BlockingSubscriber evs = new BlockingSubscriber(); - T msg; - - if ((msg = get(evs)) == null) { - evs.blockingWait(timeoutMillis); - if (evs.eventRcvd) { - msg = get(null); // non-blocking get. - assert msg != null: "Received event, but message is null"; - } - } - if (msg == null) { - removeMsgAvailableListener(evs); - } - return msg; - } - - public synchronized String toString() { - return "id:" + System.identityHashCode(this) + " " + - // DEBUG "nGet:" + nGet + " " + - // "nPut:" + nPut + " " + - // "numWastedPuts:" + nWastedPuts + " " + - // "nWastedGets:" + nWastedGets + " " + - "numMsgs:" + numMsgs; - } - - // Implementation of PauseReason - public boolean isValid(Task t) { - synchronized(this) { - return (t == sink) || srcs.contains(t); - } - } - - public synchronized Object[] messages() { - synchronized (this) { + int ic = icons; + int mlen = msgs.length; + msg = msgs[(ic + idx) % mlen]; + for (int i = idx; i > 0; i--) { + msgs[(ic + i) % mlen] = msgs[(ic + i - 1) % mlen]; + } + msgs[icons] = null; + numMsgs -= 1; + icons = (icons + 1) % mlen; + } else { + throw new IllegalStateException(); + } + } + return msg; + } + + /** + * Non-blocking, nonpausing put. + * + * @param eo + * . If non-null, registers this observer and calls it with an + * SpaceAvailable event when there's space. + * @return buffered message if there's one, or null + */ + public boolean put(T msg, MessageProducer eo) { + boolean ret = true; // assume we will be able to enqueue + MessageConsumer consumer; + synchronized (this) { + if (msg == null) { + throw new NullPointerException("Null message supplied to put"); + } + int ip = iprod; + int ic = icons; + int n = numMsgs; + if (n == msgs.length) { + assert ic == ip : "numElements == msgs.length && ic != ip"; + if (n < maxMsgs) { + T[] newmsgs = (T[]) new Object[Math.min(n * 2, maxMsgs)]; + System.arraycopy(msgs, ic, newmsgs, 0, n - ic); + if (ic > 0) { + System.arraycopy(msgs, 0, newmsgs, n - ic, ic); + } + msgs = newmsgs; + ip = n; + ic = 0; + } else { + ret = false; + } + } + if (ret) { + numMsgs = n + 1; + msgs[ip] = msg; + iprod = (ip + 1) % msgs.length; + icons = ic; + consumer = sink; + sink = null; + } else { + consumer = null; + // unable to enqueue + if (eo != null) { + pendingProducers.add(eo); + } + } + } + // notify get's subscriber that something is available + if (consumer != null) { + consumer.messageAvailable(this); + } + return ret; + } + + /** + * Get, don't pause or block. + * + * @return stored message, or null if no message found. + */ + public T getnb() { + return get(null); + } + + /** + * @return non-null message. + * @throws Pausable + */ + public T get() throws Pausable { + Task t = Task.getCurrentTask(); + T msg = get(t); + while (msg == null) { + Task.pause(this); + removeMessageConsumer(t); + msg = get(t); + } + return msg; + } + + /** + * @return non-null message. + * @throws Pausable + */ + public T get(long timeoutMillis) throws Pausable { + final Task t = Task.getCurrentTask(); + T msg = get(t); + long end = System.currentTimeMillis() + timeoutMillis; + while (msg == null) { + TimerTask tt = new TimerTask() { + public void run() { + if (Mailbox.this.removeMessageConsumer(t)) { + t.consumeTimeout(Mailbox.this); + } + } + }; + Task.timer.schedule(tt, timeoutMillis); + Task.pause(this); + tt.cancel(); + removeMessageConsumer(t); + msg = get(t); + + timeoutMillis = end - System.currentTimeMillis(); + if (timeoutMillis <= 0) { + removeMessageConsumer(t); + break; + } + } + return msg; + } + + public synchronized void addMessageProducer(MessageProducer spcSub) { + pendingProducers.add(spcSub); + } + + public synchronized boolean removeMessageProducer(MessageProducer spcSub) { + return pendingProducers.remove(spcSub); + } + + public synchronized void addMessageConsumer(MessageConsumer msgSub) { + if (sink != null && sink != msgSub) { + throw new AssertionError( + "Error: A mailbox can not be shared by two consumers. New = " + + msgSub + ", Old = " + sink); + } + sink = msgSub; + } + + public synchronized boolean removeMessageConsumer( + MessageConsumer msgSub) { + if (sink == msgSub) { + sink = null; + return true; + } else { + return false; + } + } + + public boolean putnb(T msg) { + return put(msg, null); + } + + public void put(T msg) throws Pausable { + Task t = Task.getCurrentTask(); + t.checkKill(); + while (!put(msg, t)) { + Task.pause(this); + } + } + + public boolean put(T msg, int timeoutMillis) throws Pausable { + final Task t = Task.getCurrentTask(); + long begin = System.currentTimeMillis(); + while (!put(msg, t)) { + TimerTask tt = new TimerTask() { + public void run() { + if (Mailbox.this.removeMessageProducer(t)) { + t.produceTimeout(Mailbox.this); + } + } + }; + Task.timer.schedule(tt, timeoutMillis); + Task.pause(this); + tt.cancel(); + removeMessageProducer(t); + if (System.currentTimeMillis() - begin >= timeoutMillis) { + return false; + } + } + return true; + } + + public boolean putb(T msg) { + return putb(msg, 0 /* infinite wait */); + } + + public class BlockingSubscriber implements MessageProducer, MessageConsumer { + public synchronized void spaceAvailable(Mailbox ep) { + eventRcvd = true; + notifyAll(); + } + + public synchronized void produceTimeout(Mailbox pub) { + notifyAll(); + } + + public synchronized void messageAvailable(Mailbox ep) { + eventRcvd = true; + notifyAll(); + } + + public synchronized void consumeTimeout(Mailbox pub) { + notifyAll(); + } + + public volatile boolean eventRcvd = false; + + public boolean blockingWait(final long timeoutMillis) { + long absEnd = timeoutMillis == 0 + ? Long.MAX_VALUE + : System.currentTimeMillis() + timeoutMillis; + synchronized (this) { + long now = System.currentTimeMillis(); + while (!eventRcvd && (now < absEnd)) { + try { + this.wait(absEnd - now); + } catch (InterruptedException ie) { + } + now = System.currentTimeMillis(); + } + } + + return eventRcvd; + } + } + + public boolean putb(T msg, final long timeoutMillis) { + BlockingSubscriber evs = new BlockingSubscriber(); + if (!put(msg, evs)) { + boolean did_put = evs.blockingWait(timeoutMillis); + removeMessageProducer(evs); + return did_put; + } else { + return true; + } + } + + public synchronized int size() { + return numMsgs; + } + + public synchronized boolean hasMessage() { + return numMsgs > 0; + } + + /** return true if the mailbox has at least num messages. */ + public synchronized boolean hasMessages(int num) { + return numMsgs >= num; + } + + public synchronized boolean hasSpace() { + return (maxMsgs - numMsgs) > 0; + } + + /** + * retrieve a message, blocking the thread indefinitely. Note, this is a + * heavyweight block, unlike #get() that pauses the Fiber but doesn't block + * the thread. + */ + + public T getb() { + return getb(0); + } + + /** + * retrieve a msg, and block the Java thread for the time given. + * + * @param millis + * . max wait time + * @return null if timed out. + */ + public T getb(final long timeoutMillis) { + BlockingSubscriber evs = new BlockingSubscriber(); + T msg; + + if ((msg = get(evs)) == null) { + if (evs.blockingWait(timeoutMillis)) { + msg = get(null); // non-blocking get. + assert msg != null : "Received event, but message is null"; + } + } + removeMessageConsumer(evs); + return msg; + } + + public synchronized String toString() { + return "id:" + System.identityHashCode(this) + " " + + // DEBUG "nGet:" + nGet + " " + + // "nPut:" + nPut + " " + + // "numWastedPuts:" + nWastedPuts + " " + + // "nWastedGets:" + nWastedGets + " " + + "numMsgs:" + numMsgs; + } + + // Implementation of PauseReason + public boolean isValid(Task t) { + synchronized (this) { + return (t == sink) || pendingProducers.contains(t); + } + } + + public synchronized Object[] messages() { + synchronized (this) { Object[] result = new Object[numMsgs]; for (int i = 0; i < numMsgs; i++) { - result[i] = msgs[(icons+i)%msgs.length]; + result[i] = msgs[(icons + i) % msgs.length]; } return result; } - - } -} -class EmptySet_MsgAvListener implements PauseReason, EventSubscriber { - final Task task; - final Mailbox[] mbxs; - - EmptySet_MsgAvListener(Task t, Mailbox[] mbs) { - task = t; - mbxs = mbs; - } - - public boolean isValid(Task t) { - // The pauseReason is true (there is valid reason to continue - // pausing) if none of the mboxes have any elements - for (Mailbox mb : mbxs) { - if (mb.hasMessage()) - return false; - } - return true; - } - - public void onEvent(EventPublisher ep, Event e) { - for (Mailbox m : mbxs) { - if (m != ep) { - ((Mailbox)ep).removeMsgAvailableListener(this); - } - } - task.resume(); - } - - public void cancel() { - for (Mailbox mb : mbxs) { - mb.removeMsgAvailableListener(this); - } - } + } } diff --git a/src/kilim/Scheduler.java b/src/kilim/Scheduler.java index 3994985..f5dacd6 100755 --- a/src/kilim/Scheduler.java +++ b/src/kilim/Scheduler.java @@ -8,7 +8,6 @@ import java.util.Comparator; import java.util.LinkedList; -import java.util.PriorityQueue; /** * This is a basic FIFO Executor. It maintains a list of @@ -25,17 +24,7 @@ public class Scheduler { LinkedList allThreads = new LinkedList(); LinkedList waitingThreads = new LinkedList(); protected boolean shutdown = false; - // protected RingQueue runnableTasks = new RingQueue(1000); - protected PriorityQueue runnableTasks = new PriorityQueue(1000, new Comparator() { - - public int compare(Task o1, Task o2) { - if (o1.priority < o2.priority) - return -1; - else if (o1.priority == o2.priority) - return 0; - else - return 1; - }}); + protected RingQueue runnableTasks = new RingQueue(1000); static { String s = System.getProperty("kilim.Scheduler.numThreads"); @@ -69,8 +58,9 @@ public void schedule(Task t) { synchronized(this) { assert t.running == true : "Task " + t + " scheduled even though running is false"; - runnableTasks.add(t); - wt = waitingThreads.poll(); + runnableTasks.put(t); + if (!waitingThreads.isEmpty()) + wt = waitingThreads.poll(); } if (wt != null) { synchronized(wt) { @@ -98,19 +88,14 @@ Task getNextTask(WorkerThread wt) { return t; } - t = runnableTasks.peek(); + t = runnableTasks.get(); if (t == null) { waitingThreads.add(wt); } else { - boolean removed = runnableTasks.remove(t); - assert removed : "queue not in order?"; prefThread = t.preferredResumeThread; } } ///////////// - // race here: added to waiting threads above, - // and received notify before entering waitForMsg - ///////////// if (t == null) { wt.waitForMsgOrSignal(); } else if (prefThread == null || prefThread == wt) { diff --git a/src/kilim/Task.java b/src/kilim/Task.java index b77fd9a..fa6871c 100755 --- a/src/kilim/Task.java +++ b/src/kilim/Task.java @@ -17,7 +17,7 @@ * provide a pausable execute method. * */ -public abstract class Task implements EventSubscriber { +public abstract class Task implements MessageConsumer, MessageProducer { public volatile Thread currentThread = null; static PauseReason yieldReason = new YieldReason(); @@ -160,15 +160,25 @@ public int getStackDepth() { throw new AssertionError("Expected task to be run by WorkerThread"); } - public void onEvent(EventPublisher ep, Event e) { - // This is sneaky. We _know_ that the only time a task will get registered - // is mailbox.put or get(), and that it'll be the pausereason as well. - // @drkrab: no we don't! This can be a timedOut event, and that also - // needs us to resume. - //if (ep == pauseReason) { - resume(); - //} + public void consumeTimeout(Mailbox pub) { + resume(); } + + public void produceTimeout(Mailbox pub) { + resume(); + } + + public void messageAvailable(Mailbox ep) { + ep.removeMessageConsumer(this); + resume(); + } + + public void spaceAvailable(Mailbox ep) { + ep.removeMessageProducer(this); + resume(); + } + + /** * This is typically called by a pauseReason to resume the task. */ diff --git a/src/kilim/TaskGroup.java b/src/kilim/TaskGroup.java deleted file mode 100644 index f3dcb6b..0000000 --- a/src/kilim/TaskGroup.java +++ /dev/null @@ -1,49 +0,0 @@ -package kilim; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; - -public class TaskGroup extends Task { - private Mailbox addedTasksMB = new Mailbox(); - private Mailbox exitmb = new Mailbox(); - private HashSet tasks = new HashSet(); - - public List results = Collections.synchronizedList(new ArrayList()); - - public void execute() throws Pausable { - while (!tasks.isEmpty() || addedTasksMB.hasMessage()) { - switch (Mailbox.select(addedTasksMB, exitmb)) { - case 0: - Task t = addedTasksMB.getnb(); - t.informOnExit(exitmb); - tasks.add(t); - break; - case 1: - ExitMsg em = exitmb.getnb(); - results.add(em); - tasks.remove(em.task); - break; - } - } - exit(results); - } - - @Override - public ExitMsg joinb() { - start(); - return super.joinb(); - } - - @Override - public ExitMsg join() throws Pausable { - start(); - return super.join(); - } - - public void add(Task t) { - t.informOnExit(exitmb); - addedTasksMB.putnb(t); // will wake up join if it is waiting. - } -} diff --git a/src/kilim/WorkerThread.java b/src/kilim/WorkerThread.java index 45ed68c..1196eec 100755 --- a/src/kilim/WorkerThread.java +++ b/src/kilim/WorkerThread.java @@ -66,11 +66,8 @@ public synchronized boolean hasTasks() { return tasks.size() > 0; } public synchronized Task getNextTask() { - Task task = tasks.peek(); - if (task != null) { - tasks.remove(task); - } - return task; + Task task = tasks.poll(); + return task; } public synchronized void waitForMsgOrSignal() { diff --git a/src/kilim/analysis/CallWeaver.java b/src/kilim/analysis/CallWeaver.java index b76a507..9daa87e 100755 --- a/src/kilim/analysis/CallWeaver.java +++ b/src/kilim/analysis/CallWeaver.java @@ -764,6 +764,8 @@ private void genRestoreVars(MethodVisitor mv, int stateVar) { if (!u.isLiveIn(i)) continue; Value v = f.getLocal(i); + if (v.getTypeDesc() == D_UNDEFINED) + continue; int vmt = VMType.toVmType(v.getTypeDesc()); if (v.isConstant()) { loadConstant(mv, v); diff --git a/test/kilim/test/TestMailbox.java b/test/kilim/test/TestMailbox.java index bd22aed..fc44473 100755 --- a/test/kilim/test/TestMailbox.java +++ b/test/kilim/test/TestMailbox.java @@ -102,32 +102,6 @@ public void testTasks() { assertTrue(mb.getnb() == null); } - // Send messages on two mailboxes and collect them back on one mailbox. - public void testSelectSimple() { - Mailbox mainmb = new Mailbox(); - SelectTaskMB t = new SelectTaskMB(mainmb); - t.start(); - - // Make sure the task is blocked on select and hasn't already - // sent us a message - try {Thread.sleep(100);} catch (InterruptedException ignore) {} - assertTrue(! mainmb.hasMessage()); - HashSet sentMsgs = new HashSet(); - final int n = 10; - for (int i = 0; i < n; i++) { - Msg m = new Msg(); - assertTrue(t.mymb2.putnb(m)); - sentMsgs.add(m); - try {Thread.sleep(10);} catch (InterruptedException ignore) {} - m = new Msg(); - assertTrue(t.mymb1.putnb(m)); - sentMsgs.add(m); - } - for (int i = 0; i < n*2; i++) { - Msg m = mainmb.getb(1000); - assertTrue(m != null && sentMsgs.contains(m)); - } - } } class Msg { @@ -166,36 +140,3 @@ public void execute() throws Pausable { } - -class SelectTaskMB extends Task { - Mailbox mymb1, mymb2; - Mailbox mainmb; - - SelectTaskMB(Mailbox mb) { - mymb1 = new Mailbox(); - mymb2 = new Mailbox(); - mainmb = mb; - } - - public void execute() throws Pausable { - while (true) { - Msg m; - // Receive a message on mymb1 or 2 and forward to mainmb - // If some error, send a dummy message and testSelect() - // will flag an error. - switch (Mailbox.select(mymb1, mymb2)) { - case 0: - m = mymb1.getnb(); - mainmb.put(m); - break; - case 1: - m = mymb2.getnb(); - if (m == null) m = new Msg(); - mainmb.put(m); - break; - default: - mainmb.put(new Msg()); - } - } - } -} \ No newline at end of file