Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: krestenkrab/kilim-erjang
base: master
...
head fork: alepharchives/kilim
compare: master
Checking mergeability… Don't worry, you can still create the pull request.
  • 9 commits
  • 16 files changed
  • 0 commit comments
  • 2 contributors
View
29 examples/kilim/examples/Group.java
@@ -1,29 +0,0 @@
-/* Copyright (c) 2006, Sriram Srinivasan
- *
- * You may distribute this software under the terms of the license
- * specified in the file "License"
- */
-
-package kilim.examples;
-
-import kilim.Pausable;
-import kilim.Task;
-import kilim.TaskGroup;
-
-public class Group {
- public static void main(String[] args) {
- TaskGroup tg = new TaskGroup();
- tg.add(new GroupTask().start());
- tg.add(new GroupTask().start());
- tg.joinb();
- System.exit(0);
- }
-
- static class GroupTask extends Task {
- public void execute() throws Pausable {
- System.out.println("Task #" + id + "sleeping");
- Task.sleep(1000);
- System.out.println("Task #" + id + "done");
- }
- }
-}
View
295 src/kilim/Cell.java
@@ -1,295 +0,0 @@
-/* Copyright (c) 2006, Sriram Srinivasan
- *
- * You may distribute this software under the terms of the license
- * specified in the file "License"
- */
-
-package kilim;
-
-import java.util.LinkedList;
-import java.util.TimerTask;
-
-/**
- * A cell is a single-space buffer that supports multiple producers and a single
- * consumer, functionally identical to Mailbox bounded to a size of 1 (and hence
- * optimized for this size)
- */
-
-public class Cell<T> implements PauseReason, EventPublisher {
- T msg;
- EventSubscriber sink;
-
- public static final int SPACE_AVAILABLE = 1;
- public static final int MSG_AVAILABLE = 2;
- public static final int TIMED_OUT = 3;
- public static final Event spaceAvailble = new Event(MSG_AVAILABLE);
- public static final Event messageAvailable = new Event(SPACE_AVAILABLE);
- public static final Event timedOut = new Event(TIMED_OUT);
-
- LinkedList<EventSubscriber> srcs = new LinkedList<EventSubscriber>();
-
- // DEBUG stuff
- // To do: move into monitorable stat object
- /*
- * public int nPut = 0; public int nGet = 0; public int nWastedPuts = 0;
- * public int nWastedGets = 0;
- */
- public Cell() {
- }
-
- /**
- * Non-blocking, nonpausing get.
- * @param eo. If non-null, registers this observer and calls it with a MessageAvailable event when
- * a put() is done.
- * @return buffered message if there's one, or null
- */
- public T get(EventSubscriber eo) {
- EventSubscriber producer = null;
- T ret;
- synchronized(this) {
- if (msg == null) {
- ret = null;
- addMsgAvailableListener(eo);
- } else {
- ret = msg;
- msg = null;
- if (srcs.size() > 0) {
- producer = srcs.poll();
- }
- }
- }
- if (producer != null) {
- producer.onEvent(this, spaceAvailble);
- }
- return ret;
- }
-
- /**
- * Non-blocking, nonpausing put.
- * @param eo. If non-null, registers this observer and calls it with an SpaceAvailable event
- * when there's space.
- * @return buffered message if there's one, or null
- */
- public boolean put(T amsg, EventSubscriber eo) {
- boolean ret = true; // assume we'll be able to enqueue
- EventSubscriber subscriber;
- synchronized(this) {
- if (amsg == null) {
- throw new NullPointerException("Null message supplied to put");
- }
- if (msg == null) { // space available
- msg = amsg;
- subscriber = sink;
- sink = null;
- } else {
- ret = false;
- // unable to enqueue. Cell is full
- subscriber = null;
- if (eo != null) {
- srcs.add(eo);
- }
- }
- }
- // notify get's subscriber that something is available
- if (subscriber != null) {
- subscriber.onEvent(this, messageAvailable);
- }
- return ret;
- }
-
- /**
- * Get, don't pause or block.
- *
- * @return stored message, or null if no message found.
- */
- public T getnb() {
- return get(null);
- }
-
- /**
- * @return non-null message.
- * @throws Pausable
- */
- public T get() throws Pausable{
- Task t = Task.getCurrentTask();
- T msg = get(t);
- while (msg == null) {
- Task.pause(this);
- msg = get(t);
- }
- return msg;
- }
-
-
- /**
- * @return non-null message.
- * @throws Pausable
- */
- public T get(long timeoutMillis) throws Pausable {
- final Task t = Task.getCurrentTask();
- T msg = get(t);
- long begin = System.currentTimeMillis();
- while (msg == null) {
- TimerTask tt = new TimerTask() {
- public void run() {
- Cell.this.removeMsgAvailableListener(t);
- t.onEvent(Cell.this, timedOut);
- }
- };
- Task.timer.schedule(tt, timeoutMillis);
- Task.pause(this);
- tt.cancel();
- if (System.currentTimeMillis() - begin > timeoutMillis) {
- break;
- }
- msg = get(t);
- }
- return msg;
- }
-
- public synchronized void addSpaceAvailableListener(EventSubscriber spcSub) {
- srcs.add(spcSub);
- }
-
- public synchronized void removeSpaceAvailableListener(EventSubscriber spcSub) {
- srcs.remove(spcSub);
- }
-
-
- public synchronized void addMsgAvailableListener(EventSubscriber msgSub) {
- if (sink != null) {
- throw new AssertionError(
- "Error: A mailbox can not be shared by two consumers. New = "
- + msgSub + ", Old = " + sink);
- }
- sink = msgSub;
- }
-
- public synchronized void removeMsgAvailableListener(EventSubscriber msgSub) {
- if (sink == msgSub) {
- sink = null;
- }
- }
-
- public boolean putnb(T msg) {
- return put(msg, null);
- }
-
- public void put(T msg) throws Pausable {
- Task t = Task.getCurrentTask();
- while (!put(msg, t)) {
- Task.pause(this);
- }
- }
-
- public boolean put(T msg, int timeoutMillis) throws Pausable {
- final Task t = Task.getCurrentTask();
- long begin = System.currentTimeMillis();
- while (!put(msg, t)) {
- TimerTask tt = new TimerTask() {
- public void run() {
- Cell.this.removeSpaceAvailableListener(t);
- t.onEvent(Cell.this, timedOut);
- }
- };
- Task.timer.schedule(tt, timeoutMillis);
- Task.pause(this);
- if (System.currentTimeMillis() - begin >= timeoutMillis) {
- return false;
- }
- }
- return true;
- }
-
- public void putb(T msg) {
- putb(msg, 0 /* infinite wait */);
- }
-
- public class BlockingSubscriber implements EventSubscriber {
- public volatile boolean eventRcvd = false;
- public void onEvent(EventPublisher ep, Event e) {
- synchronized (Cell.this) {
- eventRcvd = true;
- Cell.this.notify();
- }
- }
- public void blockingWait(final long timeoutMillis) {
- long start = System.currentTimeMillis();
- long remaining = timeoutMillis;
- boolean infiniteWait = timeoutMillis == 0;
- synchronized (Cell.this) {
- while (!eventRcvd && (infiniteWait || remaining > 0)) {
- try {
- Cell.this.wait(infiniteWait? 0 : remaining);
- } catch (InterruptedException ie) {}
- long elapsed = System.currentTimeMillis() - start;
- remaining -= elapsed;
- }
- }
- }
- }
-
- public void putb(T msg, final long timeoutMillis) {
- BlockingSubscriber evs = new BlockingSubscriber();
- if (!put(msg, evs)) {
- evs.blockingWait(timeoutMillis);
- }
- if (!evs.eventRcvd) {
- removeSpaceAvailableListener(evs);
- }
- }
-
- public synchronized boolean hasMessage() {
- return msg != null;
- }
-
- public synchronized boolean hasSpace() {
- return msg == null;
- }
-
- /**
- * retrieve a message, blocking the thread indefinitely. Note, this is a
- * heavyweight block, unlike #get() that pauses the Fiber but doesn't block
- * the thread.
- */
-
- public T getb() {
- return getb(0);
- }
-
- /**
- * retrieve a msg, and block the Java thread for the time given.
- *
- * @param millis.
- * max wait time
- * @return null if timed out.
- */
- public T getb(final long timeoutMillis) {
- BlockingSubscriber evs = new BlockingSubscriber();
- T msg;
-
- if ((msg = get(evs)) == null) {
- evs.blockingWait(timeoutMillis);
- if (evs.eventRcvd) {
- msg = get(null); // non-blocking get.
- assert msg != null: "Received event, but message is null";
- }
- }
- if (msg == null) {
- removeMsgAvailableListener(evs);
- }
- return msg;
- }
-
- public synchronized String toString() {
- return "id:" + System.identityHashCode(this) + " " + msg;
- }
-
- // Implementation of PauseReason
- public boolean isValid(Task t) {
- synchronized(this) {
- return (t == sink) || srcs.contains(t);
- }
- }
-}
-
View
94 src/kilim/Lock.java
@@ -0,0 +1,94 @@
+package kilim;
+
+import java.util.LinkedList;
+
+/**
+ * Simple reentrant lock that uses Kilim's means for suspending
+ *
+ * @author krab@trifork.com
+ */
+public class Lock implements PauseReason, EventPublisher {
+
+ Task owner = null;
+ int count;
+ LinkedList<Task> waiters;
+
+ static boolean $isWoven = true;
+
+ public void lock() throws Pausable {
+ Task.errNotWoven();
+ }
+
+ public void lock(Fiber f) {
+ while (!lock(f.task)) {
+ f.down();
+ Task.pause(this, f);
+ if(f.up() == Fiber.PAUSING__NO_STATE) {
+ f.setState(this, 0);
+ }
+ }
+ }
+
+ private boolean lock(Task currentTask) {
+ synchronized (this) {
+ if (owner == null) {
+ owner = currentTask;
+ count = 1;
+ return true;
+ } else if (owner == currentTask) {
+ count = count + 1;
+ return true;
+ } else {
+ addListener(currentTask);
+ return false;
+ }
+ }
+ }
+
+ private synchronized Task _unlock(Task currentTask) {
+ Task next = null;
+
+ if (owner != currentTask) {
+ throw new IllegalStateException();
+ }
+
+ if ((count = count-1) == 0) {
+ owner = null;
+ if (waiters != null && !waiters.isEmpty()) {
+ next = waiters.removeFirst();
+ }
+ }
+
+ return next;
+ }
+
+ public void unlock() throws Pausable {
+ Task.errNotWoven();
+ }
+
+ public void unlock(Fiber f) {
+ Task next = _unlock(f.task);
+ _unlock2(next);
+ }
+
+ private void _unlock2(Task next) {
+ if (next != null) {
+ next.resume();
+ }
+ }
+
+ private void addListener(Task eo) {
+ if (waiters == null) {
+ waiters = new LinkedList<Task>();
+ }
+
+ waiters.add(eo);
+ }
+
+ public boolean isValid(Task t) {
+ synchronized(this) {
+ return waiters != null && waiters.contains(t);
+ }
+ }
+
+}
View
1,114 src/kilim/Mailbox.java
@@ -15,577 +15,565 @@
* synchronize with each other (as opposed to direct java calls or static member
* variables). put() and get() are the two essential functions.
*
- * We use the term "block" to mean thread block, and "pause" to mean
- * fiber pausing. The suffix "nb" on some methods (such as getnb())
- * stands for non-blocking.
+ * We use the term "block" to mean thread block, and "pause" to mean fiber
+ * pausing. The suffix "nb" on some methods (such as getnb()) stands for
+ * non-blocking.
*/
public class Mailbox<T> implements PauseReason, EventPublisher {
- // TODO. Give mbox a config name and id and make monitorable
- T[] msgs;
- private int iprod = 0; // producer index
- private int icons = 0; // consumer index;
- private int numMsgs = 0;
- private int maxMsgs = 300;
- EventSubscriber sink;
-
- // FIX: I don't like this event design. The only good thing is that
- // we don't create new event objects every time we signal a client
- // (subscriber) that's blocked on this mailbox.
- public static final int SPACE_AVAILABLE = 1;
- public static final int MSG_AVAILABLE = 2;
- public static final int TIMED_OUT = 3;
- public static final Event spaceAvailble = new Event(MSG_AVAILABLE);
- public static final Event messageAvailable = new Event(SPACE_AVAILABLE);
- public static final Event timedOut = new Event(TIMED_OUT);
-
- LinkedList<EventSubscriber> srcs = new LinkedList<EventSubscriber>();
-
- // DEBUG stuff
- // To do: move into monitorable stat object
- /*
- * public int nPut = 0; public int nGet = 0; public int nWastedPuts = 0;
- * public int nWastedGets = 0;
- */
- public Mailbox() {
- this(10);
- }
-
- public Mailbox(int initialSize) {
- this(initialSize, Integer.MAX_VALUE);
- }
-
- @SuppressWarnings("unchecked")
- public Mailbox(int initialSize, int maxSize) {
- if (initialSize > maxSize)
- throw new IllegalArgumentException("initialSize: " + initialSize
- + " cannot exceed maxSize: " + maxSize);
- msgs = (T[]) new Object[initialSize];
- maxMsgs = maxSize;
- }
-
- /**
- * Non-blocking, nonpausing get.
- * @param eo. If non-null, registers this observer and calls it with a MessageAvailable event when
- * a put() is done.
- * @return buffered message if there's one, or null
- */
- public T get(EventSubscriber eo) {
- T msg;
- EventSubscriber producer = null;
- synchronized(this) {
- int n = numMsgs;
- if (n > 0) {
- int ic = icons;
- msg = msgs[ic]; msgs[ic]=null;
- icons = (ic + 1) % msgs.length;
- numMsgs = n - 1;
-
- if (srcs.size() > 0) {
- producer = srcs.poll();
- }
- } else {
- msg = null;
- addMsgAvailableListener(eo);
- }
- }
- if (producer != null) {
- producer.onEvent(this, spaceAvailble);
- }
- return msg;
- }
-
-
- /**
- * @return non-null message.
- * @throws Pausable
- */
- public void untilHasMessage() throws Pausable{
- while (hasMessage(Task.getCurrentTask()) == false) {
- Task.pause(this);
- }
- }
-
- /**
- * @return non-null message.
- * @throws Pausable
- */
- public void untilHasMessages(int num) throws Pausable{
- while (hasMessages(num, Task.getCurrentTask()) == false) {
- Task.pause(this);
- }
- }
-
-
- /**
- * @return non-null message.
- * @throws Pausable
- */
- public boolean untilHasMessage(long timeoutMillis) throws Pausable {
- final Task t = Task.getCurrentTask();
- boolean has_msg = hasMessage(t);
- long end = System.currentTimeMillis() + timeoutMillis;
- while (has_msg == false) {
- TimerTask tt = new TimerTask() {
- public void run() {
- Mailbox.this.removeMsgAvailableListener(t);
- t.onEvent(Mailbox.this, timedOut);
- }
- };
- Task.timer.schedule(tt, timeoutMillis);
- Task.pause(this);
- tt.cancel();
- has_msg = hasMessage(t);
- timeoutMillis = end - System.currentTimeMillis();
- if (timeoutMillis <= 0) {
- removeMsgAvailableListener(t);
- break;
- }
- }
- return has_msg;
- }
-
- /**
- * @return non-null message.
- * @throws Pausable
- */
- public boolean untilHasMessages(int num, long timeoutMillis) throws Pausable {
- final Task t = Task.getCurrentTask();
- final long end = System.currentTimeMillis() + timeoutMillis;
-
- boolean has_msg = hasMessages(num, t);
- while (has_msg == false) {
- TimerTask tt = new TimerTask() {
- public void run() {
- Mailbox.this.removeMsgAvailableListener(t);
- t.onEvent(Mailbox.this, timedOut);
- }
- };
- Task.timer.schedule(tt, timeoutMillis);
- Task.pause(this);
- tt.cancel();
- has_msg = hasMessages(num, t);
- timeoutMillis = end - System.currentTimeMillis();
- if (timeoutMillis <= 0) {
- removeMsgAvailableListener(t);
- break;
- }
- }
- return has_msg;
- }
-
-
- /**
- * Non-blocking, nonpausing "wait-until-message-available".
- * @param eo. If non-null, registers this observer and calls it with a MessageAvailable event when
- * a put() is done.
- * @return true's one, or false
- */
- public boolean hasMessage(EventSubscriber eo) {
- boolean has_msg;
- synchronized(this) {
- int n = numMsgs;
- if (n > 0) {
- has_msg = true;
- } else {
- has_msg = false;
- addMsgAvailableListener(eo);
- }
- }
- return has_msg;
- }
-
- public boolean hasMessages(int num, EventSubscriber eo) {
- boolean has_msg;
- synchronized(this) {
- int n = numMsgs;
- if (n >= num) {
- has_msg = true;
- } else {
- has_msg = false;
- addMsgAvailableListener(eo);
- }
- }
- return has_msg;
- }
-
- /**
- * Non-blocking, nonpausing peek.
- * @return buffered message if there's one, or null
- */
- public T peek(int idx) {
- assert idx >= 0 : "negative index";
- T msg;
- synchronized(this) {
- int n = numMsgs;
+ // TODO. Give mbox a config name and id and make monitorable
+ T[] msgs;
+ private int iprod = 0; // producer index
+ private int icons = 0; // consumer index;
+ private int numMsgs = 0;
+ private int maxMsgs = 300;
+ MessageConsumer sink;
+
+ // FIX: I don't like this event design. The only good thing is that
+ // we don't create new event objects every time we signal a client
+ // (subscriber) that's blocked on this mailbox.
+ public static final int SPACE_AVAILABLE = 1;
+ public static final int MSG_AVAILABLE = 2;
+ public static final int TIMED_OUT = 3;
+ public static final Event spaceAvailble = new Event(MSG_AVAILABLE);
+ public static final Event messageAvailable = new Event(SPACE_AVAILABLE);
+ public static final Event timedOut = new Event(TIMED_OUT);
+
+ LinkedList<MessageProducer> pendingProducers = new LinkedList<MessageProducer>();
+
+ // DEBUG stuff
+ // To do: move into monitorable stat object
+ /*
+ * public int nPut = 0; public int nGet = 0; public int nWastedPuts = 0;
+ * public int nWastedGets = 0;
+ */
+ public Mailbox() {
+ this(10);
+ }
+
+ public Mailbox(int initialSize) {
+ this(initialSize, Integer.MAX_VALUE);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Mailbox(int initialSize, int maxSize) {
+ if (initialSize > maxSize)
+ throw new IllegalArgumentException("initialSize: " + initialSize
+ + " cannot exceed maxSize: " + maxSize);
+ msgs = (T[]) new Object[initialSize];
+ maxMsgs = maxSize;
+ }
+
+ /**
+ * Non-blocking, nonpausing get.
+ *
+ * @param eo
+ * . If non-null, registers this observer and calls it with a
+ * MessageAvailable event when a put() is done.
+ * @return buffered message if there's one, or null
+ */
+ public T get(MessageConsumer eo) {
+ T msg;
+ MessageProducer producer = null;
+ synchronized (this) {
+ int n = numMsgs;
+ if (n > 0) {
+ int ic = icons;
+ msg = msgs[ic];
+ msgs[ic] = null;
+ icons = (ic + 1) % msgs.length;
+ numMsgs = n - 1;
+
+ assert msg != null : "received null message!";
+
+ if (pendingProducers.size() > 0) {
+ producer = pendingProducers.poll();
+ }
+ } else {
+ msg = null;
+ addMessageConsumer(eo);
+ }
+ }
+ if (producer != null) {
+ producer.spaceAvailable(this);
+ }
+ return msg;
+ }
+
+ /**
+ * @return non-null message.
+ * @throws Pausable
+ */
+ public void untilHasMessage() throws Pausable {
+ while (hasMessage(Task.getCurrentTask()) == false) {
+ Task.pause(this);
+ }
+ }
+
+ /**
+ * @return non-null message.
+ * @throws Pausable
+ */
+ public void untilHasMessages(int num) throws Pausable {
+ while (hasMessages(num, Task.getCurrentTask()) == false) {
+ Task.pause(this);
+ }
+ }
+
+ /**
+ * @return non-null message.
+ * @throws Pausable
+ */
+ public boolean untilHasMessage(long timeoutMillis) throws Pausable {
+ final Task t = Task.getCurrentTask();
+ boolean has_msg = hasMessage(t);
+ long end = System.currentTimeMillis() + timeoutMillis;
+ int iteration = 0;
+ while (has_msg == false) {
+ if (++iteration <= 3) {
+ // Timer creation is costly, and frequently it has to
+ // be cancelled anyway. Try to avoid it:
+ Task.yield();
+ } else {
+ TimerTask tt = startConsumerTimeoutTimer(t, timeoutMillis);
+ Task.pause(this);
+ tt.cancel();
+ }
+ has_msg = hasMessage(t);
+ timeoutMillis = end - System.currentTimeMillis();
+ if (timeoutMillis <= 0) {
+ removeMessageConsumer(t);
+ break;
+ }
+ }
+ return has_msg;
+ }
+
+ /**
+ * @return non-null message.
+ * @throws Pausable
+ */
+ public boolean untilHasMessages(int num, long timeoutMillis)
+ throws Pausable {
+ final Task t = Task.getCurrentTask();
+ final long end = System.currentTimeMillis() + timeoutMillis;
+
+ boolean has_msg = hasMessages(num, t);
+ int iteration = 0;
+ while (has_msg == false) {
+ if (++iteration <= 3) {
+ // Timer creation is costly, and frequently it has to
+ // be cancelled anyway. Try to avoid it:
+ Task.yield();
+ } else {
+ TimerTask tt = startConsumerTimeoutTimer(t, timeoutMillis);
+ Task.pause(this);
+ if (!tt.cancel()) {
+ removeMessageConsumer(t);
+ }
+ }
+
+ has_msg = hasMessages(num, t);
+ timeoutMillis = end - System.currentTimeMillis();
+ if (!has_msg && timeoutMillis <= 0) {
+ removeMessageConsumer(t);
+ break;
+ }
+ }
+ return has_msg;
+ }
+
+ private TimerTask startConsumerTimeoutTimer(final Task t, long timeoutMillis) {
+ TimerTask tt = new TimerTask() {
+ public void run() {
+ if (removeMessageConsumer(t)) {
+ t.consumeTimeout(Mailbox.this);
+ }
+ }
+ };
+ Task.timer.schedule(tt, timeoutMillis);
+ return tt;
+ }
+
+ /**
+ * Non-blocking, nonpausing "wait-until-message-available".
+ *
+ * @param eo
+ * . If non-null, registers this observer and calls it with a
+ * MessageAvailable event when a put() is done.
+ * @return true's one, or false
+ */
+ public boolean hasMessage(MessageConsumer eo) {
+ boolean has_msg;
+ synchronized (this) {
+ int n = numMsgs;
+ if (n > 0) {
+ has_msg = true;
+ } else {
+ has_msg = false;
+ addMessageConsumer(eo);
+ }
+ }
+ return has_msg;
+ }
+
+ public boolean hasMessages(int num, MessageConsumer eo) {
+ boolean has_msg;
+ synchronized (this) {
+ int n = numMsgs;
+ if (n >= num) {
+ has_msg = true;
+ } else {
+ has_msg = false;
+ addMessageConsumer(eo);
+ }
+ }
+ return has_msg;
+ }
+
+ /**
+ * Non-blocking, nonpausing peek.
+ *
+ * @return buffered message if there's one, or null
+ */
+ public T peek(int idx) {
+ assert idx >= 0 : "negative index";
+ T msg;
+ synchronized (this) {
+ int n = numMsgs;
if (idx < n) {
- int ic = icons;
- msg = msgs[(ic+idx)%msgs.length];
- } else {
- msg = null;
- }
- }
- return msg;
- }
-
- public T remove(final int idx) {
- assert idx >= 0 : "negative index";
- T msg;
- synchronized(this) {
- int n = numMsgs;
- if (idx < n) {
- int ic = icons;
- int mlen = msgs.length;
- msg = msgs[(ic+idx)%mlen];
- for (int i = idx; i > 0; i--) {
- msgs[(ic+i)%mlen] = msgs[(ic+i-1)%mlen];
- }
- msgs[icons] = null;
- numMsgs -= 1;
- icons = (icons+1)%mlen;
- } else {
- throw new IllegalStateException();
- }
- }
- return msg;
- }
-
- /**
- * Non-blocking, nonpausing put.
- * @param eo. If non-null, registers this observer and calls it with an SpaceAvailable event
- * when there's space.
- * @return buffered message if there's one, or null
- */
- public boolean put(T msg, EventSubscriber eo) {
- boolean ret = true; // assume we will be able to enqueue
- EventSubscriber subscriber;
- synchronized(this) {
- if (msg == null) {
- throw new NullPointerException("Null message supplied to put");
- }
- int ip = iprod;
- int ic = icons;
- int n = numMsgs;
- if (n == msgs.length) {
- assert ic == ip : "numElements == msgs.length && ic != ip";
- if (n < maxMsgs) {
- T[] newmsgs = (T[]) new Object[Math.min(n * 2, maxMsgs)];
- System.arraycopy(msgs, ic, newmsgs, 0, n - ic);
- if (ic > 0) {
- System.arraycopy(msgs, 0, newmsgs, n - ic, ic);
- }
- msgs = newmsgs;
- ip = n;
- ic = 0;
- } else {
- ret = false;
- }
- }
- if (ret) {
- numMsgs = n + 1;
- msgs[ip] = msg;
- iprod = (ip + 1) % msgs.length;
- icons = ic;
- subscriber = sink;
- sink = null;
- } else {
- subscriber = null;
- // unable to enqueue
- if (eo != null) {
- srcs.add(eo);
- }
- }
- }
- // notify get's subscriber that something is available
- if (subscriber != null) {
- subscriber.onEvent(this, messageAvailable);
- }
- return ret;
- }
-
- /**
- * Get, don't pause or block.
- *
- * @return stored message, or null if no message found.
- */
- public T getnb() {
- return get(null);
- }
-
- /**
- * @return non-null message.
- * @throws Pausable
- */
- public T get() throws Pausable{
- Task t = Task.getCurrentTask();
- T msg = get(t);
- while (msg == null) {
- Task.pause(this);
- msg = get(t);
- }
- return msg;
- }
-
-
- /**
- * @return non-null message.
- * @throws Pausable
- */
- public T get(long timeoutMillis) throws Pausable {
- final Task t = Task.getCurrentTask();
- T msg = get(t);
- long end = System.currentTimeMillis() + timeoutMillis;
- while (msg == null) {
- TimerTask tt = new TimerTask() {
- public void run() {
- Mailbox.this.removeMsgAvailableListener(t);
- t.onEvent(Mailbox.this, timedOut);
- }
- };
- Task.timer.schedule(tt, timeoutMillis);
- Task.pause(this);
- tt.cancel();
-
- msg = get(t);
-
- timeoutMillis = end - System.currentTimeMillis();
- if (timeoutMillis <= 0) {
- removeMsgAvailableListener(t);
- break;
- }
- }
- return msg;
- }
-
-
- /**
- * Takes an array of mailboxes and returns the index of the first mailbox
- * that has a message. It is possible that because of race conditions, an
- * earlier mailbox in the list may also have received a message.
- */
- public static int select(Mailbox... mboxes) throws Pausable {
- while (true) {
- for (int i = 0; i < mboxes.length; i++) {
- if (mboxes[i].hasMessage()) {
- return i;
- }
- }
- Task t = Task.getCurrentTask();
- EmptySet_MsgAvListener pauseReason =
- new EmptySet_MsgAvListener(t, mboxes);
- for (int i = 0; i < mboxes.length; i++) {
- mboxes[i].addMsgAvailableListener(pauseReason);
- }
- Task.pause(pauseReason);
- for (int i = 0; i < mboxes.length; i++) {
- mboxes[i].removeMsgAvailableListener(pauseReason);
- }
- }
- }
-
- public synchronized void addSpaceAvailableListener(EventSubscriber spcSub) {
- srcs.add(spcSub);
- }
-
- public synchronized void removeSpaceAvailableListener(EventSubscriber spcSub) {
- srcs.remove(spcSub);
- }
-
-
- public synchronized void addMsgAvailableListener(EventSubscriber msgSub) {
- if (sink != null && sink != msgSub) {
- throw new AssertionError(
- "Error: A mailbox can not be shared by two consumers. New = "
- + msgSub + ", Old = " + sink);
- }
- sink = msgSub;
- }
-
- public synchronized void removeMsgAvailableListener(EventSubscriber msgSub) {
- if (sink == msgSub) {
- sink = null;
- }
- }
-
- public boolean putnb(T msg) {
- return put(msg, null);
- }
-
- public void put(T msg) throws Pausable {
- Task t = Task.getCurrentTask();
- t.checkKill();
- while (!put(msg, t)) {
- Task.pause(this);
- }
- }
-
- public boolean put(T msg, int timeoutMillis) throws Pausable {
- final Task t = Task.getCurrentTask();
- long begin = System.currentTimeMillis();
- while (!put(msg, t)) {
- TimerTask tt = new TimerTask() {
- public void run() {
- Mailbox.this.removeSpaceAvailableListener(t);
- t.onEvent(Mailbox.this, timedOut);
- }
- };
- Task.timer.schedule(tt, timeoutMillis);
- Task.pause(this);
- if (System.currentTimeMillis() - begin >= timeoutMillis) {
- return false;
- }
- }
- return true;
- }
-
- public void putb(T msg) {
- putb(msg, 0 /* infinite wait */);
- }
-
- public class BlockingSubscriber implements EventSubscriber {
- public volatile boolean eventRcvd = false;
- public void onEvent(EventPublisher ep, Event e) {
- synchronized (Mailbox.this) {
- eventRcvd = true;
- Mailbox.this.notify();
- }
- }
- public void blockingWait(final long timeoutMillis) {
- long start = System.currentTimeMillis();
- long remaining = timeoutMillis;
- boolean infiniteWait = timeoutMillis == 0;
- synchronized (Mailbox.this) {
- while (!eventRcvd && (infiniteWait || remaining > 0)) {
- try {
- Mailbox.this.wait(infiniteWait? 0 : remaining);
- } catch (InterruptedException ie) {}
- long elapsed = System.currentTimeMillis() - start;
- remaining -= elapsed;
- }
- }
- }
- }
-
- public void putb(T msg, final long timeoutMillis) {
- BlockingSubscriber evs = new BlockingSubscriber();
- if (!put(msg, evs)) {
- evs.blockingWait(timeoutMillis);
- }
- if (!evs.eventRcvd) {
- removeSpaceAvailableListener(evs);
- }
- }
-
- public synchronized int size() {
- return numMsgs;
- }
-
- public synchronized boolean hasMessage() {
- return numMsgs > 0;
- }
-
- /** return true if the mailbox has at least <code>num</code> messages. */
- public synchronized boolean hasMessages(int num) {
- return numMsgs >= num;
- }
-
- public synchronized boolean hasSpace() {
- return (maxMsgs - numMsgs) > 0;
- }
-
- /**
- * retrieve a message, blocking the thread indefinitely. Note, this is a
- * heavyweight block, unlike #get() that pauses the Fiber but doesn't block
- * the thread.
- */
-
- public T getb() {
- return getb(0);
- }
-
- /**
- * retrieve a msg, and block the Java thread for the time given.
- *
- * @param millis.
- * max wait time
- * @return null if timed out.
- */
- public T getb(final long timeoutMillis) {
- BlockingSubscriber evs = new BlockingSubscriber();
- T msg;
-
- if ((msg = get(evs)) == null) {
- evs.blockingWait(timeoutMillis);
- if (evs.eventRcvd) {
- msg = get(null); // non-blocking get.
- assert msg != null: "Received event, but message is null";
- }
- }
- if (msg == null) {
- removeMsgAvailableListener(evs);
- }
- return msg;
- }
-
- public synchronized String toString() {
- return "id:" + System.identityHashCode(this) + " " +
- // DEBUG "nGet:" + nGet + " " +
- // "nPut:" + nPut + " " +
- // "numWastedPuts:" + nWastedPuts + " " +
- // "nWastedGets:" + nWastedGets + " " +
- "numMsgs:" + numMsgs;
- }
-
- // Implementation of PauseReason
- public boolean isValid(Task t) {
- synchronized(this) {
- return (t == sink) || srcs.contains(t);
- }
- }
-
- public synchronized Object[] messages() {
- synchronized (this) {
+ int ic = icons;
+ msg = msgs[(ic + idx) % msgs.length];
+
+ assert msg != null : "peeked null message!";
+ } else {
+ msg = null;
+ }
+ }
+ return msg;
+ }
+
+ public T remove(final int idx) {
+ assert idx >= 0 : "negative index";
+ T msg;
+ synchronized (this) {
+ int n = numMsgs;
+ assert idx < numMsgs;
+ if (idx < n) {
+ int ic = icons;
+ int mlen = msgs.length;
+ msg = msgs[(ic + idx) % mlen];
+ for (int i = idx; i > 0; i--) {
+ msgs[(ic + i) % mlen] = msgs[(ic + i - 1) % mlen];
+ }
+ msgs[icons] = null;
+ numMsgs -= 1;
+ icons = (icons + 1) % mlen;
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+ return msg;
+ }
+
+ /**
+ * Non-blocking, nonpausing put.
+ *
+ * @param eo
+ * . If non-null, registers this observer and calls it with an
+ * SpaceAvailable event when there's space.
+ * @return buffered message if there's one, or null
+ */
+ public boolean put(T msg, MessageProducer eo) {
+ boolean ret = true; // assume we will be able to enqueue
+ MessageConsumer consumer;
+ synchronized (this) {
+ if (msg == null) {
+ throw new NullPointerException("Null message supplied to put");
+ }
+ int ip = iprod;
+ int ic = icons;
+ int n = numMsgs;
+ if (n == msgs.length) {
+ assert ic == ip : "numElements == msgs.length && ic != ip";
+ if (n < maxMsgs) {
+ T[] newmsgs = (T[]) new Object[Math.min(n * 2, maxMsgs)];
+ System.arraycopy(msgs, ic, newmsgs, 0, n - ic);
+ if (ic > 0) {
+ System.arraycopy(msgs, 0, newmsgs, n - ic, ic);
+ }
+ msgs = newmsgs;
+ ip = n;
+ ic = 0;
+ } else {
+ ret = false;
+ }
+ }
+ if (ret) {
+ numMsgs = n + 1;
+ msgs[ip] = msg;
+ iprod = (ip + 1) % msgs.length;
+ icons = ic;
+ consumer = sink;
+ sink = null;
+ } else {
+ consumer = null;
+ // unable to enqueue
+ if (eo != null) {
+ pendingProducers.add(eo);
+ }
+ }
+ }
+ // notify get's subscriber that something is available
+ if (consumer != null) {
+ consumer.messageAvailable(this);
+ }
+ return ret;
+ }
+
+ /**
+ * Get, don't pause or block.
+ *
+ * @return stored message, or null if no message found.
+ */
+ public T getnb() {
+ return get(null);
+ }
+
+ /**
+ * @return non-null message.
+ * @throws Pausable
+ */
+ public T get() throws Pausable {
+ Task t = Task.getCurrentTask();
+ T msg = get(t);
+ while (msg == null) {
+ Task.pause(this);
+ removeMessageConsumer(t);
+ msg = get(t);
+ }
+ return msg;
+ }
+
+ /**
+ * @return non-null message.
+ * @throws Pausable
+ */
+ public T get(long timeoutMillis) throws Pausable {
+ final Task t = Task.getCurrentTask();
+ T msg = get(t);
+ long end = System.currentTimeMillis() + timeoutMillis;
+ while (msg == null) {
+ TimerTask tt = new TimerTask() {
+ public void run() {
+ if (Mailbox.this.removeMessageConsumer(t)) {
+ t.consumeTimeout(Mailbox.this);
+ }
+ }
+ };
+ Task.timer.schedule(tt, timeoutMillis);
+ Task.pause(this);
+ tt.cancel();
+ removeMessageConsumer(t);
+ msg = get(t);
+
+ timeoutMillis = end - System.currentTimeMillis();
+ if (timeoutMillis <= 0) {
+ removeMessageConsumer(t);
+ break;
+ }
+ }
+ return msg;
+ }
+
+ public synchronized void addMessageProducer(MessageProducer spcSub) {
+ pendingProducers.add(spcSub);
+ }
+
+ public synchronized boolean removeMessageProducer(MessageProducer spcSub) {
+ return pendingProducers.remove(spcSub);
+ }
+
+ public synchronized void addMessageConsumer(MessageConsumer msgSub) {
+ if (sink != null && sink != msgSub) {
+ throw new AssertionError(
+ "Error: A mailbox can not be shared by two consumers. New = "
+ + msgSub + ", Old = " + sink);
+ }
+ sink = msgSub;
+ }
+
+ public synchronized boolean removeMessageConsumer(
+ MessageConsumer msgSub) {
+ if (sink == msgSub) {
+ sink = null;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public boolean putnb(T msg) {
+ return put(msg, null);
+ }
+
+ public void put(T msg) throws Pausable {
+ Task t = Task.getCurrentTask();
+ t.checkKill();
+ while (!put(msg, t)) {
+ Task.pause(this);
+ }
+ }
+
+ public boolean put(T msg, int timeoutMillis) throws Pausable {
+ final Task t = Task.getCurrentTask();
+ long begin = System.currentTimeMillis();
+ while (!put(msg, t)) {
+ TimerTask tt = new TimerTask() {
+ public void run() {
+ if (Mailbox.this.removeMessageProducer(t)) {
+ t.produceTimeout(Mailbox.this);
+ }
+ }
+ };
+ Task.timer.schedule(tt, timeoutMillis);
+ Task.pause(this);
+ tt.cancel();
+ removeMessageProducer(t);
+ if (System.currentTimeMillis() - begin >= timeoutMillis) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean putb(T msg) {
+ return putb(msg, 0 /* infinite wait */);
+ }
+
+ public class BlockingSubscriber implements MessageProducer, MessageConsumer {
+ public synchronized void spaceAvailable(Mailbox ep) {
+ eventRcvd = true;
+ notifyAll();
+ }
+
+ public synchronized void produceTimeout(Mailbox pub) {
+ notifyAll();
+ }
+
+ public synchronized void messageAvailable(Mailbox ep) {
+ eventRcvd = true;
+ notifyAll();
+ }
+
+ public synchronized void consumeTimeout(Mailbox pub) {
+ notifyAll();
+ }
+
+ public volatile boolean eventRcvd = false;
+
+ public boolean blockingWait(final long timeoutMillis) {
+ long absEnd = timeoutMillis == 0
+ ? Long.MAX_VALUE
+ : System.currentTimeMillis() + timeoutMillis;
+ synchronized (this) {
+ long now = System.currentTimeMillis();
+ while (!eventRcvd && (now < absEnd)) {
+ try {
+ this.wait(absEnd - now);
+ } catch (InterruptedException ie) {
+ }
+ now = System.currentTimeMillis();
+ }
+ }
+
+ return eventRcvd;
+ }
+ }
+
+ public boolean putb(T msg, final long timeoutMillis) {
+ BlockingSubscriber evs = new BlockingSubscriber();
+ if (!put(msg, evs)) {
+ boolean did_put = evs.blockingWait(timeoutMillis);
+ removeMessageProducer(evs);
+ return did_put;
+ } else {
+ return true;
+ }
+ }
+
+ public synchronized int size() {
+ return numMsgs;
+ }
+
+ public synchronized boolean hasMessage() {
+ return numMsgs > 0;
+ }
+
+ /** return true if the mailbox has at least <code>num</code> messages. */
+ public synchronized boolean hasMessages(int num) {
+ return numMsgs >= num;
+ }
+
+ public synchronized boolean hasSpace() {
+ return (maxMsgs - numMsgs) > 0;
+ }
+
+ /**
+ * retrieve a message, blocking the thread indefinitely. Note, this is a
+ * heavyweight block, unlike #get() that pauses the Fiber but doesn't block
+ * the thread.
+ */
+
+ public T getb() {
+ return getb(0);
+ }
+
+ /**
+ * retrieve a msg, and block the Java thread for the time given.
+ *
+ * @param millis
+ * . max wait time
+ * @return null if timed out.
+ */
+ public T getb(final long timeoutMillis) {
+ BlockingSubscriber evs = new BlockingSubscriber();
+ T msg;
+
+ if ((msg = get(evs)) == null) {
+ if (evs.blockingWait(timeoutMillis)) {
+ msg = get(null); // non-blocking get.
+ assert msg != null : "Received event, but message is null";
+ }
+ }
+ removeMessageConsumer(evs);
+ return msg;
+ }
+
+ public synchronized String toString() {
+ return "id:" + System.identityHashCode(this) + " " +
+ // DEBUG "nGet:" + nGet + " " +
+ // "nPut:" + nPut + " " +
+ // "numWastedPuts:" + nWastedPuts + " " +
+ // "nWastedGets:" + nWastedGets + " " +
+ "numMsgs:" + numMsgs;
+ }
+
+ // Implementation of PauseReason
+ public boolean isValid(Task t) {
+ synchronized (this) {
+ return (t == sink) || pendingProducers.contains(t);
+ }
+ }
+
+ public synchronized Object[] messages() {
+ synchronized (this) {
Object[] result = new Object[numMsgs];
for (int i = 0; i < numMsgs; i++) {
- result[i] = msgs[(icons+i)%msgs.length];
+ result[i] = msgs[(icons + i) % msgs.length];
}
return result;
}
-
- }
-}
-class EmptySet_MsgAvListener implements PauseReason, EventSubscriber {
- final Task task;
- final Mailbox[] mbxs;
-
- EmptySet_MsgAvListener(Task t, Mailbox[] mbs) {
- task = t;
- mbxs = mbs;
- }
-
- public boolean isValid(Task t) {
- // The pauseReason is true (there is valid reason to continue
- // pausing) if none of the mboxes have any elements
- for (Mailbox mb : mbxs) {
- if (mb.hasMessage())
- return false;
- }
- return true;
- }
-
- public void onEvent(EventPublisher ep, Event e) {
- for (Mailbox m : mbxs) {
- if (m != ep) {
- ((Mailbox)ep).removeMsgAvailableListener(this);
- }
- }
- task.resume();
- }
-
- public void cancel() {
- for (Mailbox mb : mbxs) {
- mb.removeMsgAvailableListener(this);
- }
- }
+ }
}
View
6 src/kilim/MessageConsumer.java
@@ -0,0 +1,6 @@
+package kilim;
+
+public interface MessageConsumer {
+ void messageAvailable(Mailbox pub);
+ void consumeTimeout(Mailbox pub);
+}
View
6 src/kilim/MessageProducer.java
@@ -0,0 +1,6 @@
+package kilim;
+
+public interface MessageProducer {
+ void spaceAvailable(Mailbox src);
+ void produceTimeout(Mailbox pub);
+}
View
14 src/kilim/RingQueue.java
@@ -18,6 +18,20 @@ public RingQueue(int initialSize, int maxSize) {
public int size() {return size;}
+ public T peek() {
+ T elem;
+ T[] elems;
+ int n = size;
+ if (n > 0) {
+ elems = elements;
+ int ic = icons;
+ elem = elems[ic];
+ } else {
+ elem = null;
+ }
+ return elem;
+ }
+
public T get() {
T elem;
T[] elems;
View
25 src/kilim/Scheduler.java
@@ -8,7 +8,6 @@
import java.util.Comparator;
import java.util.LinkedList;
-import java.util.PriorityQueue;
/**
* This is a basic FIFO Executor. It maintains a list of
@@ -25,17 +24,7 @@
LinkedList<WorkerThread> allThreads = new LinkedList<WorkerThread>();
LinkedList<WorkerThread> waitingThreads = new LinkedList<WorkerThread>();
protected boolean shutdown = false;
- // protected RingQueue<Task> runnableTasks = new RingQueue<Task>(1000);
- protected PriorityQueue<Task> runnableTasks = new PriorityQueue<Task>(1000, new Comparator<Task>() {
-
- public int compare(Task o1, Task o2) {
- if (o1.priority < o2.priority)
- return -1;
- else if (o1.priority == o2.priority)
- return 0;
- else
- return 1;
- }});
+ protected RingQueue<Task> runnableTasks = new RingQueue<Task>(1000);
static {
String s = System.getProperty("kilim.Scheduler.numThreads");
@@ -69,8 +58,9 @@ public void schedule(Task t) {
synchronized(this) {
assert t.running == true : "Task " + t + " scheduled even though running is false";
- runnableTasks.add(t);
- wt = waitingThreads.poll();
+ runnableTasks.put(t);
+ if (!waitingThreads.isEmpty())
+ wt = waitingThreads.poll();
}
if (wt != null) {
synchronized(wt) {
@@ -98,19 +88,14 @@ Task getNextTask(WorkerThread wt) {
return t;
}
- t = runnableTasks.peek();
+ t = runnableTasks.get();
if (t == null) {
waitingThreads.add(wt);
} else {
- boolean removed = runnableTasks.remove(t);
- assert removed : "queue not in order?";
prefThread = t.preferredResumeThread;
}
}
/////////////
- // race here: added to waiting threads above,
- // and received notify before entering waitForMsg
- /////////////
if (t == null) {
wt.waitForMsgOrSignal();
} else if (prefThread == null || prefThread == wt) {
View
73 src/kilim/Task.java
@@ -17,7 +17,7 @@
* provide a pausable execute method.
*
*/
-public abstract class Task implements EventSubscriber {
+public abstract class Task implements MessageConsumer, MessageProducer {
public volatile Thread currentThread = null;
static PauseReason yieldReason = new YieldReason();
@@ -160,15 +160,25 @@ public int getStackDepth() {
throw new AssertionError("Expected task to be run by WorkerThread");
}
- public void onEvent(EventPublisher ep, Event e) {
- // This is sneaky. We _know_ that the only time a task will get registered
- // is mailbox.put or get(), and that it'll be the pausereason as well.
- // @drkrab: no we don't! This can be a timedOut event, and that also
- // needs us to resume.
- //if (ep == pauseReason) {
- resume();
- //}
+ public void consumeTimeout(Mailbox pub) {
+ resume();
}
+
+ public void produceTimeout(Mailbox pub) {
+ resume();
+ }
+
+ public void messageAvailable(Mailbox ep) {
+ ep.removeMessageConsumer(this);
+ resume();
+ }
+
+ public void spaceAvailable(Mailbox ep) {
+ ep.removeMessageProducer(this);
+ resume();
+ }
+
+
/**
* This is typically called by a pauseReason to resume the task.
*/
@@ -188,17 +198,30 @@ public void resume() {
}
}
- public void informOnExit(Mailbox<ExitMsg> exit) {
- if (isDone()) {
- exit.putnb(new ExitMsg(this, exitResult));
- return;
- }
- synchronized (this) {
- if (exitMBs == null) exitMBs = new LinkedList<Mailbox<ExitMsg>>();
- exitMBs.add(exit);
+ public synchronized void informOnExit(Mailbox<ExitMsg> exit) {
+ if (isDone()) {
+ exit.putnb(new ExitMsg(this, exitResult));
+ return;
+ } else {
+ if (exitMBs == null) exitMBs = new LinkedList<Mailbox<ExitMsg>>();
+ exitMBs.add(exit);
}
}
-
+
+ public synchronized void informOfExit(TaskDoneReason doneReason) {
+ if (exitMBs != null) {
+ if (doneReason != null) {
+ exitResult = doneReason.exitObj;
+ }
+ ExitMsg msg = new ExitMsg(this, exitResult);
+ for (Mailbox<ExitMsg> exitMB: exitMBs) {
+ exitMB.putnb(msg);
+ }
+ }
+ done = true;
+ }
+
+
/**
* This is a placeholder that doesn't do anything useful.
* Weave replaces the call in the bytecode from
@@ -351,8 +374,8 @@ public final PauseReason getPauseReason() {
* @param f
* @throws Pausable
*/
- public final void setPriority(int value) throws Pausable {
- assert Task.getCurrentTask() == this : "task can only call setPriority this on self";
+ public final void setPriority(int value) /*throws Pausable*/ {
+ //assert Task.getCurrentTask() == this : "task can only call setPriority this on self";
this.priority = value;
}
@@ -394,15 +417,7 @@ void _runExecute(WorkerThread thread) throws NotPausable {
if (numActivePins > 0) {
throw new AssertionError("Task ended but has active locks");
}
- if (exitMBs != null) {
- if (pauseReason instanceof TaskDoneReason) {
- exitResult = ((TaskDoneReason)pauseReason).exitObj;
- }
- ExitMsg msg = new ExitMsg(this, exitResult);
- for (Mailbox<ExitMsg> exitMB: exitMBs) {
- exitMB.putnb(msg);
- }
- }
+ informOfExit((TaskDoneReason)pauseReason);
preferredResumeThread = null;
} else {
if (thread != null) { // it is null for generators
View
49 src/kilim/TaskGroup.java
@@ -1,49 +0,0 @@
-package kilim;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-public class TaskGroup extends Task {
- private Mailbox<Task> addedTasksMB = new Mailbox<Task>();
- private Mailbox<ExitMsg> exitmb = new Mailbox<ExitMsg>();
- private HashSet<Task> tasks = new HashSet<Task>();
-
- public List<ExitMsg> results = Collections.synchronizedList(new ArrayList<ExitMsg>());
-
- public void execute() throws Pausable {
- while (!tasks.isEmpty() || addedTasksMB.hasMessage()) {
- switch (Mailbox.select(addedTasksMB, exitmb)) {
- case 0:
- Task t = addedTasksMB.getnb();
- t.informOnExit(exitmb);
- tasks.add(t);
- break;
- case 1:
- ExitMsg em = exitmb.getnb();
- results.add(em);
- tasks.remove(em.task);
- break;
- }
- }
- exit(results);
- }
-
- @Override
- public ExitMsg joinb() {
- start();
- return super.joinb();
- }
-
- @Override
- public ExitMsg join() throws Pausable {
- start();
- return super.join();
- }
-
- public void add(Task t) {
- t.informOnExit(exitmb);
- addedTasksMB.putnb(t); // will wake up join if it is waiting.
- }
-}
View
7 src/kilim/WorkerThread.java
@@ -66,11 +66,8 @@ public synchronized boolean hasTasks() {
return tasks.size() > 0;
}
public synchronized Task getNextTask() {
- Task task = tasks.peek();
- if (task != null) {
- tasks.remove(task);
- }
- return task;
+ Task task = tasks.poll();
+ return task;
}
public synchronized void waitForMsgOrSignal() {
View
4 src/kilim/analysis/AsmDetector.java
@@ -1,6 +1,8 @@
package kilim.analysis;
import java.util.LinkedList;
+import java.util.Map;
+import java.util.WeakHashMap;
import java.io.IOException;
import java.util.HashMap;
@@ -14,7 +16,7 @@
* of trying to classload it.
*/
public class AsmDetector {
- static HashMap<String, ClassCache> classCacheMap= new HashMap<String, ClassCache>();
+ static Map<String, ClassCache> classCacheMap= new WeakValueHashMap<String, ClassCache>();
public static int getPausableStatus(String className, String methodName,
String desc, Detector detector)
{
View
2  src/kilim/analysis/CallWeaver.java
@@ -764,6 +764,8 @@ private void genRestoreVars(MethodVisitor mv, int stateVar) {
if (!u.isLiveIn(i))
continue;
Value v = f.getLocal(i);
+ if (v.getTypeDesc() == D_UNDEFINED)
+ continue;
int vmt = VMType.toVmType(v.getTypeDesc());
if (v.isConstant()) {
loadConstant(mv, v);
View
16 src/kilim/analysis/ClassWeaver.java
@@ -29,7 +29,16 @@
public class ClassWeaver {
ClassFlow classFlow;
List<ClassInfo> classInfoList = new LinkedList<ClassInfo>();
- static HashSet<String> stateClasses = new HashSet<String>();
+
+ static ThreadLocal<HashSet<String>> stateClasses = new ThreadLocal<HashSet<String>>() {
+ protected java.util.HashSet<String> initialValue() {
+ return new HashSet<String>();
+ }
+ };
+
+ public static void reset() {
+ stateClasses.set(new HashSet<String>());
+ }
public ClassWeaver(byte[] data, Detector detector) {
classFlow = new ClassFlow(data, detector);
@@ -183,10 +192,11 @@ String createStateClass(ValInfoList valInfoList) {
numByType[vi.vmt]++;
}
String className = makeClassName(numByType);
- if (stateClasses.contains(className)) {
+ HashSet<String> classes = stateClasses.get();
+ if (classes.contains(className)) {
return className;
}
- stateClasses.add(className);
+ classes.add(className);
ClassWriter cw = new ClassWriter(false);
cw.visit(V1_1, ACC_PUBLIC | ACC_FINAL, className, null, "kilim/State", null);
View
460 src/kilim/analysis/WeakValueHashMap.java
@@ -0,0 +1,460 @@
+/*
+ * Geotools 2 - OpenSource mapping toolkit
+ * (C) 2003, Geotools Project Managment Committee (PMC)
+ * (C) 2001, Institut de Recherche pour le Developpement
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+package kilim.analysis;
+
+// Collections and references
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+
+/**
+ * A hashtable-based {@link Map} implementation with <em>weak values</em>. An entry in a
+ * {@code WeakValueHashMap} will automatically be removed when its value is no longer
+ * in ordinary use. This class is similar to the standard {@link WeakHashMap} class provided
+ * is J2SE, except that weak references are hold on values instead of keys.
+ * <p>
+ * The {@code WeakValueHashMap} class is thread-safe.
+ *
+ * @since 2.0
+ * @source $URL: http://svn.geotools.org/geotools/tags/2.2-RC3/module/referencing/src/org/geotools/util/WeakValueHashMap.java $
+ * @version $Id: WeakValueHashMap.java 17672 2006-01-19 00:25:55Z desruisseaux $
+ * @author Martin Desruisseaux
+ *
+ * @see WeakHashMap
+ * @see WeakHashSet
+ */
+public class WeakValueHashMap<K,V> extends AbstractMap<K,V> {
+ /**
+ * Minimal capacity for {@link #table}.
+ */
+ private static final int MIN_CAPACITY = 7;
+
+ /**
+ * Load factor. Control the moment
+ * where {@link #table} must be rebuild.
+ */
+ private static final float LOAD_FACTOR = 0.75f;
+
+ public ReferenceQueue<V> queue = new ReferenceQueue<V>();
+
+ private void clean() {
+ Reference<? extends V> x;
+ while ((x=queue.poll()) != null) {
+ x.clear();
+ }
+ }
+
+ /**
+ * An entry in the {@link WeakValueHashMap}. This is a weak reference
+ * to a value together with a strong reference to a key.
+ */
+ @SuppressWarnings("unchecked")
+ private final static class Entry<K,V> extends WeakReference<V> implements Map.Entry<K,V> {
+ /**
+ * The key.
+ */
+ K key;
+
+ /**
+ * The next entry, or {@code null} if there is none.
+ */
+ Entry next;
+
+ /**
+ * Index for this element in {@link #table}. This index
+ * must be updated at every {@link #rehash} call.
+ */
+ int index;
+
+ private WeakValueHashMap<K, V> owner;
+
+ /**
+ * Constructs a new weak reference.
+ */
+ Entry(final K key, final V value, final Entry next, final int index, WeakValueHashMap<K, V> owner) {
+ super(value, owner.queue);
+ this.key = key;
+ this.next = next;
+ this.index = index;
+ this.owner = owner;
+ }
+
+ /**
+ * Returns the key corresponding to this entry.
+ */
+ public K getKey() {
+ return key;
+ }
+
+ /**
+ * Returns the value corresponding to this entry.
+ */
+ public V getValue() {
+ return (V) get();
+ }
+
+ /**
+ * Replaces the value corresponding to this entry with the specified value.
+ */
+ public V setValue(final V value) {
+ if (value != null) {
+ throw new UnsupportedOperationException();
+ }
+ V old = (V) get();
+ clear();
+ return old;
+ }
+
+ /**
+ * Clear the reference. The {@link WeakCollectionCleaner} requires that this method is
+ * overriden in order to remove this entry from the enclosing hash map.
+ */
+ public void clear() {
+ super.clear();
+ owner.removeEntry(this);
+ key = null;
+ }
+
+ /**
+ * Compares the specified object with this entry for equality.
+ */
+ public boolean equals(final Object other) {
+ if (other instanceof Map.Entry) {
+ final Map.Entry that = (Map.Entry) other;
+
+ return eq(this.getKey(), that.getKey()) &&
+ eq(this.getValue(), that.getValue());
+ }
+ return false;
+ }
+
+ /**
+ * Returns the hash code value for this map entry.
+ */
+ public int hashCode() {
+ final Object val = get();
+ return (key==null ? 0 : key.hashCode()) ^
+ (val==null ? 0 : val.hashCode());
+ }
+ }
+
+ static boolean eq(Object o1, Object o2) {
+ return o1 == null ? o2 == null : o1.equals(o2);
+ }
+
+ /**
+ * Table of weak references.
+ */
+ private Entry<K,V>[] table;
+
+ /**
+ * Number of non-nul elements in {@link #table}.
+ */
+ private int count;
+
+ /**
+ * The next size value at which to resize. This value should
+ * be <code>{@link #table}.length*{@link #loadFactor}</code>.
+ */
+ private int threshold;
+
+ /**
+ * The timestamp when {@link #table} was last rehashed. This information
+ * is used to avoid too early table reduction. When the garbage collector
+ * collected a lot of elements, we will wait at least 20 seconds before
+ * rehashing {@link #table}. Too early table reduction leads to many cycles
+ * like "reduce", "expand", "reduce", "expand", etc.
+ */
+ private long lastRehashTime;
+
+ /**
+ * Number of millisecond to wait before to rehash
+ * the table for reducing its size.
+ */
+ private static final long HOLD_TIME = 20*1000L;
+
+ /**
+ * Construct a {@code WeakValueHashMap}.
+ */
+ public WeakValueHashMap() {
+ table = new Entry[MIN_CAPACITY];
+ threshold = Math.round(table.length*LOAD_FACTOR);
+ lastRehashTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Invoked by {@link Entry} when an element has been collected
+ * by the garbage collector. This method will remove the weak reference
+ * from {@link #table}.
+ */
+ private synchronized void removeEntry(final Entry toRemove) {
+ assert valid() : count;
+ final int i = toRemove.index;
+ // Index 'i' may not be valid if the reference 'toRemove'
+ // has been already removed in a previous rehash.
+ if (i < table.length) {
+ Entry prev = null;
+ Entry e = table[i];
+ while (e != null) {
+ if (e == toRemove) {
+ if (prev != null) {
+ prev.next = e.next;
+ } else {
+ table[i] = e.next;
+ }
+ count--;
+ assert valid();
+
+ // If the number of elements has dimunished
+ // significatively, rehash the table.
+ if (count <= threshold/4) {
+ rehash(false);
+ }
+ // We must not continue the loop, since
+ // variable 'e' is no longer valid.
+ return;
+ }
+ prev = e;
+ e = e.next;
+ }
+ }
+ assert valid();
+ /*
+ * If we reach this point, its mean that reference 'toRemove' has not
+ * been found. This situation may occurs if 'toRemove' has already been
+ * removed in a previous run of {@link #rehash}.
+ */
+ }
+
+ /**
+ * Rehash {@link #table}.
+ *
+ * @param augmentation {@code true} if this method is invoked
+ * for augmenting {@link #table}, or {@code false} if
+ * it is invoked for making the table smaller.
+ */
+ private void rehash(final boolean augmentation) {
+ assert Thread.holdsLock(this);
+ assert valid();
+ final long currentTime = System.currentTimeMillis();
+ final int capacity = Math.max(Math.round(count/(LOAD_FACTOR/2)), count+MIN_CAPACITY);
+ if (augmentation ? (capacity<=table.length) :
+ (capacity>=table.length || currentTime-lastRehashTime<HOLD_TIME))
+ {
+ return;
+ }
+ lastRehashTime = currentTime;
+ final Entry[] oldTable = table;
+ table = new Entry[capacity];
+ threshold = Math.round(capacity*LOAD_FACTOR);
+ for (int i=0; i<oldTable.length; i++) {
+ for (Entry old=oldTable[i]; old!=null;) {
+ final Entry e=old;
+ old = old.next; // On retient 'next' tout de suite car sa valeur va changer...
+ final Object key = e.key;
+ if (key != null) {
+ final int index=(key.hashCode() & 0x7FFFFFFF) % table.length;
+ e.index = index;
+ e.next = table[index];
+ table[index] = e;
+ } else {
+ count--;
+ }
+ }
+ }
+ final Logger logger = Logger.getLogger("org.geotools.util");
+ final Level level = Level.FINEST;
+ if (logger.isLoggable(level)) {
+ final LogRecord record = new LogRecord(level, "Rehash from " + oldTable.length +
+ " to " + table.length);
+ record.setSourceMethodName(augmentation ? "canonicalize" : "remove");
+ record.setSourceClassName("WeakValueHashMap");
+ logger.log(record);
+ }
+ assert valid();
+ }
+
+ /**
+ * Check if this {@code WeakValueHashMap} is valid. This method counts the
+ * number of elements and compare it to {@link #count}. If the check fails,
+ * the number of elements is corrected (if we didn't, an {@link AssertionError}
+ * would be thrown for every operations after the first error, which make
+ * debugging more difficult). The set is otherwise unchanged, which should
+ * help to get similar behaviour as if assertions hasn't been turned on.
+ */
+ private boolean valid() {
+ int n=0;
+ for (int i=0; i<table.length; i++) {
+ for (Entry e=table[i]; e!=null