Skip to content

Commit

Permalink
PubSub actor is now faster, but still slower than PubSubComponent.
Browse files Browse the repository at this point in the history
  • Loading branch information
laforge49 committed Jan 24, 2012
1 parent 0207438 commit c57981e
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 34 deletions.
53 changes: 37 additions & 16 deletions src/main/java/org/agilewiki/jactor/pubsub/PubSub.java
Expand Up @@ -24,6 +24,7 @@
package org.agilewiki.jactor.pubsub;

import org.agilewiki.jactor.*;
import org.agilewiki.jactor.apc.APCRequestSource;
import org.agilewiki.jactor.lpc.JLPCActor;

import java.util.Collections;
Expand All @@ -34,30 +35,21 @@
/**
* Implements publish/subscribe over JLPCActor.
*/
public class PubSub extends JLPCActor {
final public class PubSub implements Actor {
/**
* The subscribing actors.
*/
private Set<Actor> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<Actor, Boolean>());

/**
* Create a PubSub actor.
* Wraps and enqueues an unwrapped request in the requester's inbox.
*
* @param mailbox The mailbox.
*/
public PubSub(Mailbox mailbox) {
super(mailbox);
}

/**
* The application method for processing requests sent to the actor.
*
* @param request A request.
* @param rp The response processor.
* @throws Exception Any uncaught exceptions raised while processing the request.
* @param requestSource The originator of the request.
* @param request The unwrapped request to be sent.
* @param rp The request processor.
*/
@Override
protected void processRequest(final Object request, final ResponseProcessor rp) throws Exception {
public void acceptRequest(final APCRequestSource requestSource, final Object request, final ResponseProcessor rp) throws Exception {
if (request instanceof Publish) {
final Iterator<Actor> sit = subscribers.iterator();
JAIterator jaIterator = new JAIterator() {
Expand All @@ -74,7 +66,7 @@ protected void process(ResponseProcessor rp1) throws Exception {
}
Actor subscriber = sit.next();
psrp.sent += 1;
send(subscriber, broadcastRequest, psrp);
subscriber.acceptRequest(requestSource, broadcastRequest, psrp);
rp1.process(null);
}
};
Expand All @@ -99,4 +91,33 @@ protected void process(ResponseProcessor rp1) throws Exception {
}
throw new UnsupportedOperationException(request.getClass().getName());
}

/**
* Set the initial capacity for buffered outgoing messages.
*
* @param initialBufferCapacity The initial capacity for buffered outgoing messages.
*/
@Override
public void setInitialBufferCapacity(int initialBufferCapacity) {}

/**
* Returns true when the concurrent data of the actor, or its parent, contains the named data item.
*
* @param name The key for the data item.
* @return True when the concurrent data of the actor, or its parent, contains the named data item.
*/
@Override
public boolean hasDataItem(String name) {
return false;
}

/**
* Returns the actor's mailbox.
*
* @return The actor's mailbox.
*/
@Override
public Mailbox getMailbox() {
throw new UnsupportedOperationException("PubSub has no mailbox");
}
}
2 changes: 1 addition & 1 deletion src/test/java/org/agilewiki/jactor/pubsub/PubSubTest.java
Expand Up @@ -8,7 +8,7 @@ public void test() {
MailboxFactory mailboxFactory = JAMailboxFactory.newMailboxFactory(1);
try {
Mailbox mailbox = mailboxFactory.createMailbox();
Actor publisher = new PubSub(mailbox);
Actor publisher = new PubSub();
JAFuture future = new JAFuture();
Actor subscriber1 = new Subscriber(mailbox);
Actor subscriber2 = new Subscriber(mailbox);
Expand Down
Expand Up @@ -10,7 +10,7 @@ final public class Driver1 extends JLPCActor {

public Driver1(Mailbox mailbox) {
super(mailbox);
pubsub = new PubSub(mailbox);
pubsub = new PubSub();
}

@Override
Expand Down
Expand Up @@ -8,46 +8,46 @@
public class SharedTimingTest extends TestCase {
public void test() {

//int c = 10;
//int s = 1000;
//int p = 1;
//int t = 4;
int c = 10;
int s = 1000;
int p = 1;
int t = 4;

//int c = 50000;
//int s = 1000;
//int p = 4;
//int t = 4;

//4 parallel runs of 50000 requests sent to 1000 subscribers
//publications per sec = 34470872
//response time 116 nanoseconds
//publications per sec = 47058823
//response time 85 nanoseconds

//int c = 10000;
//int s = 1000;
//int p = 8;
//int t = 4;

//8 parallel runs of 10000 requests sent to 1000 subscribers
//publications per sec = 29293299
//response time 137 nanoseconds
//publications per sec = 49844236
//response time 80 nanoseconds

//int c = 1000000;
//int s = 10;
//int p = 8;
//int t = 4;

//8 parallel runs of 1000000 requests sent to 10 subscribers
//publications per sec = 18939393
//response time 211 nanoseconds
//publications per sec = 37330844
//response time 107 nanoseconds

int c = 10000;
int s = 1000;
int p = 16;
int t = 4;
//int c = 10000;
//int s = 1000;
//int p = 16;
//int t = 4;

//16 parallel runs of 10000 requests sent to 1000 subscribers
//publications per sec = 31904287
//response time 53 nanoseconds
//publications per sec = 40660736
//response time 98 nanoseconds

MailboxFactory mailboxFactory = JAMailboxFactory.newMailboxFactory(t);
try {
Expand Down

0 comments on commit c57981e

Please sign in to comment.