Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Distribution: Support properly that messages can e sent to a peer nod…
…e before the port for that node has been set.
  • Loading branch information
Erik Søe Sørensen authored and krestenkrab committed Dec 26, 2010
1 parent a531ffb commit 17d9a08
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 26 deletions.
8 changes: 4 additions & 4 deletions src/main/java/erjang/EPID.java
Expand Up @@ -112,14 +112,14 @@ public static EPID make(EAtom node, int id, int serial, int creation) {
// return DEADPID?
}
EAbstractNode peer = EPeer.get(node);

if (peer instanceof EPeer) {
return new EExternalPID((EPeer) peer, id, serial, creation);
} else {
System.err.println("localnode="+ERT.getLocalNode().node+"; asking="+node);

// must be local with different name
return new EExternalPID(EPeer.get_or_create(node, creation, null, 0, 0), id, serial, creation);
// Presumably another node
// might be local with different name
return new EExternalPID(EPeer.get_or_create(node, creation, 0, 0), id, serial, creation);
}
}

Expand Down
117 changes: 95 additions & 22 deletions src/main/java/erjang/EPeer.java
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -58,34 +59,122 @@ public class EPeer extends EAbstractNode {
private static final EAtom am_ = EAtom.intern("");

static ConcurrentHashMap<EAtom, EPeer> peers = new ConcurrentHashMap<EAtom, EPeer>();
/** Invariant: (port != null) xor (port_queue != null). */
private EInternalPort port;
private LinkedList<ByteBuffer[]> port_queue;

public EPeer(EAtom node, int creation, EInternalPort port, int flags,
int version) {

super(node);
this.flags = flags;
this.creation = creation;
this.port = port;
this.ntype = version == 5 ? NTYPE_R6 : 0;
this.port_queue = (this.port == null)
? new LinkedList<ByteBuffer[]>() : null;
}

private void setPort(EInternalPort port) throws Pausable {
assert(port != null);
assert(this.port == null);
assert(this.port_queue != null);
//System.err.println("EPeer: Patching port for "+node+" to "+port);
EDriverTask task = port.task();
assert(task != null);
/* Note: The following could be simpler if
* EDriverTask.outputv() wasn't Pausable - which it
* shouldn't be, but is as long as we're using
* bounded-size Mailboxes. -- eriksoe */

try {
// Empty port_queue, then set port:
while (true) {
ByteBuffer[] msg;
synchronized (this) {
if (port_queue.isEmpty()) { // Go from queueing mode to direct-port mode:
this.port = port;
this.port_queue = null;
break;
}
msg = port_queue.removeFirst();
assert(msg != null);
}
task.outputv(null, msg); // Pausable so called outside lock
}
} catch (IOException e) {
e.printStackTrace();
close_and_finish(port);
}
}

private void send_to_port(ByteBuffer[] ev, EHandle sender) throws Pausable {
// Either send to port, or enqueue...:
EInternalPort port;
synchronized (this) { // get port; enqueue if necessary
port = this.port;
if (port == null) { // In queueing mode
assert(port_queue != null);
port_queue.addLast(ev);
return;
}
}
assert (port != null); // In direct-to-port mode

EDriverTask task = port.task();
if (task != null) {
try {
task.outputv(null, ev);
} catch (IOException e) {
e.printStackTrace();
close_and_finish(port);
}
} else {
System.err.println("sending cast to dead task (port="+port+", this="+this+", sender="+sender+")");
}
}


// TODO: who closes/deletes these peers?

/** get_or_create - "no port" case. */
public static EPeer get_or_create(EAtom node, int creation, int flags, int version) {
EPeer peer = peers.get(node);
if (peer != null) {
// check version, etc?
return peer;
}

peer = new EPeer(node, creation, null, flags, version);
peers.put(node, peer);

return peer;
}

/** get_or_create - "port given" case. */
public static EPeer get_or_create(EAtom node, int creation,
EInternalPort port, int flags, int version) {
EInternalPort port, int flags, int version) throws Pausable {

EPeer peer = peers.get(node);
if (peer != null) {
// check version, etc?

boolean do_set_port;
synchronized (peer) {
do_set_port = (peer.port == null && port != null);
}
if (do_set_port) {
peer.setPort(port);
} else if (port != null) {
System.err.println("Port already set for node (to "+peer.port+"); refraining from setting it again (to "+port+")");
}

return peer;
}

peer = new EPeer(node, creation, port, flags, version);
peers.put(node, peer);

return peer;

}

public void net_message(EInternalPort port, ByteBuffer hdr, ByteBuffer buf) throws Pausable {
Expand Down Expand Up @@ -423,18 +512,7 @@ void dsig_cast(EHandle sender, ETuple hdr) throws Pausable {

ByteBuffer[] ev = new ByteBuffer[] { disthdr, barr };

try {
EDriverTask task;
if (this.port != null && (task = this.port.task()) != null) {
task.outputv(null, ev);
} else {
System.err.println("sending cast to dead task");
}
} catch (IOException e) {
e.printStackTrace();
close_and_finish(port);
}

send_to_port(ev, sender);
}

int dsig_cast(EHandle sender, ETuple hdr, EObject payload) throws Pausable {
Expand All @@ -453,13 +531,8 @@ int dsig_cast(EHandle sender, ETuple hdr, EObject payload) throws Pausable {

ByteBuffer[] ev = new ByteBuffer[] { disthdr, barr };

try {
this.port.task().outputv(null, ev);
} catch (IOException e) {
e.printStackTrace();
close_and_finish(port);
}

send_to_port(ev, sender);

return eos.size();

}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/erjang/driver/EDriverTask.java
Expand Up @@ -377,6 +377,7 @@ public void execute() throws Pausable {
*/
protected void main_loop() throws Exception, Pausable {

/** out is used locally later, but we allocate it once and for all. */
List<ByteBuffer> out = new ArrayList<ByteBuffer>();
EObject msg;

Expand Down

0 comments on commit 17d9a08

Please sign in to comment.