Skip to content

Commit

Permalink
Simplified mailbox now works.
Browse files Browse the repository at this point in the history
  • Loading branch information
laforge49 committed Feb 12, 2012
1 parent ec9d7a6 commit f55a002
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 137 deletions.
26 changes: 0 additions & 26 deletions src/main/java/org/agilewiki/jactor/Mailbox.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,13 @@
* </p>
*/
public interface Mailbox extends APCMailbox {

/**
* Returns the controlling mailbox, or null.
*/
public Mailbox getControllingMailbox();

/**
* Gains control over the mailbox.
*
* @param activeMailbox The mailbox gaining control.
* @return True when control was acquired.
*/
public boolean acquireMailboxControl(Mailbox activeMailbox);

/**
* Relinquish control over the mailbox.
*/
public void relinquishMailboxControl();

/**
* Returns true when all requests are to be processed asynchronously.
*
* @return True when all requests are to be processed asynchronously.
*/
public boolean isAsync();

/**
* Dispatch any enqueued requests, if possible.
*
* @param activeMailbox The mailbox that was just in control.
*/
public void dispatchRemaining(Mailbox activeMailbox);

/**
* Returns the mailbox factory.
*
Expand Down
29 changes: 15 additions & 14 deletions src/main/java/org/agilewiki/jactor/bind/JBActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.agilewiki.jactor.apc.*;
import org.agilewiki.jactor.bufferedEvents.BufferedEventsDestination;
import org.agilewiki.jactor.bufferedEvents.BufferedEventsQueue;
import org.agilewiki.jactor.events.EventQueue;
import org.agilewiki.jactor.lpc.RequestSource;
import org.agilewiki.jactor.lpc.TransparentException;
import org.agilewiki.jactor.stateMachine.ExtendedResponseProcessor;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentSkipListMap;

/**
Expand Down Expand Up @@ -494,8 +496,8 @@ final protected void routeRequest(final RequestSource requestSource,
final ResponseProcessor rp,
final Binding binding)
throws Exception {
final Mailbox sourceMailbox = requestSource.getMailbox();
final ExceptionHandler sourceExceptionHandler = requestSource.getExceptionHandler();
Mailbox sourceMailbox = requestSource.getMailbox();
ExceptionHandler sourceExceptionHandler = requestSource.getExceptionHandler();
if (sourceMailbox == mailbox) {
syncProcess(request, rp, sourceExceptionHandler, requestSource, binding);
return;
Expand All @@ -504,22 +506,21 @@ final protected void routeRequest(final RequestSource requestSource,
asyncSend(requestSource, request, rp, sourceExceptionHandler);
return;
}
final Mailbox srcControllingMailbox = sourceMailbox.getControllingMailbox();
if (mailbox.getControllingMailbox() == srcControllingMailbox) {
EventQueue<ArrayList<JAMessage>> srcController = sourceMailbox.getEventQueue().getController();
EventQueue<ArrayList<JAMessage>> eventQueue = mailbox.getEventQueue();
if (eventQueue.getController() == srcController) {
syncSend(requestSource, request, rp, sourceExceptionHandler, binding);
return;
}
if (!mailbox.acquireMailboxControl(srcControllingMailbox)) {
if (!eventQueue.acquireControl(srcController)) {
asyncSend(requestSource, request, rp, sourceExceptionHandler);
mailbox.dispatchRemaining(srcControllingMailbox);
return;
}
try {
syncSend(requestSource, request, rp, sourceExceptionHandler, binding);
} finally {
mailbox.sendPendingMessages();
mailbox.relinquishMailboxControl();
mailbox.dispatchRemaining(srcControllingMailbox);
eventQueue.relinquishControl();
}
}

Expand Down Expand Up @@ -662,21 +663,21 @@ public void process(final Object response)
asyncException((Exception) response, sourceExceptionHandler, rs.getMailbox());
else try {
Mailbox sourceMailbox = rs.getMailbox();
Mailbox srcControllingMailbox = sourceMailbox.getControllingMailbox();
Mailbox controllingMailbox = mailbox.getControllingMailbox();
if (srcControllingMailbox == controllingMailbox) {
EventQueue<ArrayList<JAMessage>> srcController = sourceMailbox.getEventQueue().getController();
EventQueue<ArrayList<JAMessage>> eventQueue = mailbox.getEventQueue();
EventQueue<ArrayList<JAMessage>> controller = eventQueue.getController();
if (srcController == controller) {
rp.process(response);
} else if (sourceMailbox.isAsync()) {
asyncResponse(rs, request, response, rp);
} else if (!mailbox.acquireMailboxControl(srcControllingMailbox)) {
} else if (!eventQueue.acquireControl(srcController)) {
asyncResponse(rs, request, response, rp);
} else {
try {
rp.process(response);
} finally {
mailbox.sendPendingMessages();
mailbox.relinquishMailboxControl();
mailbox.dispatchRemaining(srcControllingMailbox);
eventQueue.relinquishControl();
}
}
} catch (Exception ex) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/agilewiki/jactor/events/EventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface EventQueue<E> extends EventDispatcher<E>, EventDestination<E> {
* @param controller A queue.
* @return True when control was acquired.
*/
public boolean acquireControl(JAEventQueue<E> controller);
public boolean acquireControl(EventQueue<E> controller);

/**
* Relinquish foreign control over the queue.
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/org/agilewiki/jactor/events/JAEventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ public JAEventQueue(ThreadManager threadManager) {
/**
* Gain control of the queue.
*
* @param controller A queue.
* @param eventQueue A queue.
* @return True when control was acquired.
*/
public boolean acquireControl(JAEventQueue<E> controller) {
return atomicControl.compareAndSet(null, controller);
public boolean acquireControl(EventQueue<E> eventQueue) {
return atomicControl.compareAndSet(null, eventQueue.getController());
}

/**
Expand All @@ -102,8 +102,7 @@ public void relinquishControl() {
if (c == this)
return;
atomicControl.set(null);
if (queue.poll() != null)
threadManager.process(task);
threadManager.process(task);
}

/**
Expand Down
31 changes: 17 additions & 14 deletions src/main/java/org/agilewiki/jactor/lpc/JLPCActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import org.agilewiki.jactor.bind.ConstrainedRequest;
import org.agilewiki.jactor.bufferedEvents.BufferedEventsDestination;
import org.agilewiki.jactor.bufferedEvents.BufferedEventsQueue;
import org.agilewiki.jactor.events.EventQueue;
import org.agilewiki.jactor.stateMachine.ExtendedResponseProcessor;
import org.agilewiki.jactor.stateMachine._SMBuilder;

import java.util.ArrayList;

/**
* <p>
* An actor which implements Local Procedure Calls (LPC)
Expand Down Expand Up @@ -186,9 +189,9 @@ public void acceptRequest(final APCRequestSource apcRequestSource,
final Object request,
final ResponseProcessor rp)
throws Exception {
final RequestSource rs = (RequestSource) apcRequestSource;
final Mailbox sourceMailbox = rs.getMailbox();
final ExceptionHandler sourceExceptionHandler = rs.getExceptionHandler();
RequestSource rs = (RequestSource) apcRequestSource;
Mailbox sourceMailbox = rs.getMailbox();
ExceptionHandler sourceExceptionHandler = rs.getExceptionHandler();
if (sourceMailbox == mailbox) {
syncProcess(request, rp, sourceExceptionHandler, rs);
return;
Expand All @@ -197,21 +200,21 @@ public void acceptRequest(final APCRequestSource apcRequestSource,
asyncSend(rs, request, rp, sourceExceptionHandler);
return;
}
final Mailbox srcControllingMailbox = sourceMailbox.getControllingMailbox();
if (mailbox.getControllingMailbox() == srcControllingMailbox) {
EventQueue<ArrayList<JAMessage>> srcController = sourceMailbox.getEventQueue().getController();
EventQueue<ArrayList<JAMessage>> eventQueue = mailbox.getEventQueue();
if (eventQueue.getController() == srcController) {
syncSend(rs, request, rp, sourceExceptionHandler);
return;
}
if (!mailbox.acquireMailboxControl(srcControllingMailbox)) {
if (!eventQueue.acquireControl(srcController)) {
asyncSend(rs, request, rp, sourceExceptionHandler);
return;
}
try {
syncSend(rs, request, rp, sourceExceptionHandler);
} finally {
mailbox.sendPendingMessages();
mailbox.relinquishMailboxControl();
mailbox.dispatchRemaining(srcControllingMailbox);
eventQueue.relinquishControl();
}
}

Expand Down Expand Up @@ -350,21 +353,21 @@ public void process(final Object response)
asyncException((Exception) response, sourceExceptionHandler, rs.getMailbox());
else try {
Mailbox sourceMailbox = rs.getMailbox();
Mailbox srcControllingMailbox = sourceMailbox.getControllingMailbox();
Mailbox controllingMailbox = mailbox.getControllingMailbox();
if (srcControllingMailbox == controllingMailbox) {
EventQueue<ArrayList<JAMessage>> srcController = sourceMailbox.getEventQueue().getController();
EventQueue<ArrayList<JAMessage>> eventQueue = mailbox.getEventQueue();
EventQueue<ArrayList<JAMessage>> controller = eventQueue.getController();
if (srcController == controller) {
rp.process(response);
} else if (sourceMailbox.isAsync()) {
asyncResponse(rs, request, response, rp);
} else if (!mailbox.acquireMailboxControl(srcControllingMailbox)) {
} else if (!eventQueue.acquireControl(srcController)) {
asyncResponse(rs, request, response, rp);
} else {
try {
rp.process(response);
} finally {
mailbox.sendPendingMessages();
mailbox.relinquishMailboxControl();
mailbox.dispatchRemaining(srcControllingMailbox);
eventQueue.relinquishControl();
}
}
} catch (Exception ex) {
Expand Down
75 changes: 0 additions & 75 deletions src/main/java/org/agilewiki/jactor/lpc/JLPCMailbox.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import org.agilewiki.jactor.apc.JAPCMailbox;
import org.agilewiki.jactor.bufferedEvents.BufferedEventsQueue;

import java.util.concurrent.atomic.AtomicReference;

/**
* Implements Mailbox.
*/
Expand All @@ -18,12 +16,6 @@ final public class JLPCMailbox extends JAPCMailbox implements Mailbox {
*/
private MailboxFactory mailboxFactory;

/**
* Tracks which mailbox has control. If an exchange can gain control
* over another exchange, it can send requests to it synchronously.
*/
final private AtomicReference<Mailbox> atomicControl = new AtomicReference<Mailbox>();

/**
* Set to true when all requests are to be processed asynchronously.
*/
Expand Down Expand Up @@ -87,71 +79,4 @@ public MailboxFactory getMailboxFactory() {
public boolean isAsync() {
return async;
}

/**
* Returns the controlling mailbox.
*/
@Override
public Mailbox getControllingMailbox() {
Mailbox c = atomicControl.get();
if (c == null) return this;
return c;
}

/**
* Gains control over the mailbox.
*
* @param srcControllingMailbox The mailbox gaining control.
* @return True when control was acquired.
*/
@Override
public boolean acquireMailboxControl(Mailbox srcControllingMailbox) {
return atomicControl.compareAndSet(null, srcControllingMailbox);
}

/**
* Relinquish control over the mailbox.
*/
@Override
public void relinquishMailboxControl() {
atomicControl.set(null);
}

/**
* The dispatchMessages method processes any messages in the queue.
* True is returned if any messages were actually processed.
*/
@Override
public boolean dispatchEvents() {
if (async) return super.dispatchEvents();
boolean dispatched = false;
if (acquireMailboxControl(this)) {
try {
dispatched = super.dispatchEvents();
} finally {
relinquishMailboxControl();
}
}
return dispatched;
}

/**
* Dispatch any enqueued requests, if possible.
*
* @param controllingMailbox The mailbox that was just in control.
*/
public void dispatchRemaining(final Mailbox controllingMailbox) {
while (!isEmpty()) {
if (getControllingMailbox() == controllingMailbox) {
super.dispatchEvents();
} else if (acquireMailboxControl(controllingMailbox)) {
try {
super.dispatchEvents();
} finally {
sendPendingMessages();
relinquishMailboxControl();
}
} else return;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void test4() {

public void test5() {
System.out.println("start ResponsePrinterTest 5");
JAMailboxFactory mailboxFactory = JAMailboxFactory.newMailboxFactory(1);
JAMailboxFactory mailboxFactory = JAMailboxFactory.newMailboxFactory(10);
try {

JCActor a = new JCActor(mailboxFactory.createMailbox());
Expand All @@ -123,7 +123,7 @@ public void test5() {
PrintResponse printResponse = new PrintResponse(new Hi(), a);
PrintParallelResponse printParallelResponse = new PrintParallelResponse(count, bs, printResponse);
int j = 0;
while (j < 10) {
while (j < 1) {
printParallelResponse.send(future, c);
j += 1;
}
Expand Down

0 comments on commit f55a002

Please sign in to comment.