Permalink
Browse files

Make all port operations synchroneous "port ! msg" is now a synchrone…

…ous call; and all port-operations are non-Pausable. (BIG DIFF).
  • Loading branch information...
1 parent 00067b2 commit 0780c8b406133d175f4c85c6c5996b0939c4ab08 @krestenkrab krestenkrab committed Oct 28, 2010
Showing with 521 additions and 592 deletions.
  1. +11 −5 src/main/java/erjang/EAbstractNode.java
  2. +11 −6 src/main/java/erjang/EExternalPID.java
  3. +18 −9 src/main/java/erjang/EHandle.java
  4. +5 −5 src/main/java/erjang/EInternalPID.java
  5. +5 −5 src/main/java/erjang/EInternalPort.java
  6. +10 −3 src/main/java/erjang/ELocalNode.java
  7. +15 −15 src/main/java/erjang/EPeer.java
  8. +59 −4 src/main/java/erjang/EProc.java
  9. +13 −50 src/main/java/erjang/ETask.java
  10. +1 −1 src/main/java/erjang/ETimerTask.java
  11. +1 −1 src/main/java/erjang/ExitHook.java
  12. +2 −3 src/main/java/erjang/console/TTYTextAreaDriverControl.java
  13. +1 −2 src/main/java/erjang/driver/EAsync.java
  14. +15 −12 src/main/java/erjang/driver/EDriverControl.java
  15. +24 −24 src/main/java/erjang/driver/EDriverInstance.java
  16. +154 −125 src/main/java/erjang/driver/EDriverTask.java
  17. +1 −2 src/main/java/erjang/driver/EPortControl.java
  18. +4 −64 src/main/java/erjang/driver/ExecDriverInstance.java
  19. +4 −36 src/main/java/erjang/driver/FDDriverInstance.java
  20. +12 −12 src/main/java/erjang/driver/LockingDriverInstance.java
  21. +28 −44 src/main/java/erjang/driver/efile/EFile.java
  22. +2 −2 src/main/java/erjang/driver/efile/FileAsync.java
  23. +4 −28 src/main/java/erjang/driver/inet_gethost/GetHostDriver.java
  24. +7 −8 src/main/java/erjang/driver/js/EJSDriverInstance.java
  25. +6 −44 src/main/java/erjang/driver/ram_file/RamFile.java
  26. +2 −2 src/main/java/erjang/driver/tcp_inet/Packet.java
  27. +5 −5 src/main/java/erjang/driver/tcp_inet/PacketCallbacks.java
  28. +48 −54 src/main/java/erjang/driver/tcp_inet/TCPINet.java
  29. +5 −5 src/main/java/erjang/driver/tcp_inet/TCPINetCallbacks.java
  30. +6 −8 src/main/java/erjang/driver/zlib/ZLibDriver.java
  31. +41 −6 src/main/java/erjang/m/erlang/ErlProc.java
  32. +1 −2 src/main/java/erjang/m/ets/ETable.java
@@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import erjang.driver.EDriverTask;
import erjang.m.erlang.ErlDist;
import kilim.Pausable;
@@ -264,15 +265,15 @@ public void monitor_node(EHandle caller, boolean on) {
/** network driver exited!
* @throws Pausable */
- public void node_going_down(EHandle sender, EObject reason) throws Pausable {
+ public void node_going_down(EHandle sender, EObject reason) {
for (Map.Entry<EHandle,AtomicInteger> ent : node_monitors.entrySet()) {
EHandle handle = ent.getKey();
AtomicInteger howmany = ent.getValue();
ETuple nd = ETuple.make(am_nodedown, this.node());
while (howmany.decrementAndGet() >= 0) {
- handle.send(sender, nd);
+ handle.sendb(sender, nd);
}
}
@@ -281,14 +282,14 @@ public void node_going_down(EHandle sender, EObject reason) throws Pausable {
}
public abstract EObject dsig_reg_send(EInternalPID caller, EAtom name,
- EObject msg) throws Pausable;
+ EObject msg) ;
public abstract void dsig_demonitor(EHandle sender, ERef ref,
- EObject to_pid_or_name) throws Pausable;
+ EObject to_pid_or_name);
- public static EAbstractNode get_or_connect(ETask proc, EAtom n) throws Pausable {
+ public static EAbstractNode get_or_connect(EProc proc, EAtom n) throws Pausable {
EAbstractNode res = EPeer.get(n);
if (res == null && (proc instanceof EProc)) {
if (ErlDist.net_kernel__connect__1.invoke((EProc) proc, new EObject[] { n }) == ERT.TRUE) {
@@ -297,4 +298,9 @@ public static EAbstractNode get_or_connect(ETask proc, EAtom n) throws Pausable
}
return res;
}
+
+ public static EAbstractNode get_or_connect(EDriverTask proc, EAtom n) {
+ EAbstractNode res = EPeer.get(n);
+ return res;
+ }
}
@@ -78,29 +78,29 @@ public void set_group_leader(EPID gl) {
}
@Override
- public void exit_signal(EHandle from_pid, EObject reason, boolean exitToSender) throws Pausable {
+ public void exit_signal(EHandle from_pid, EObject reason, boolean exitToSender) {
peer().dsig_exit(from_pid, this, reason);
}
@Override
- public boolean add_monitor(EHandle from_pid, ERef ref) throws Pausable {
+ public boolean add_monitor(EHandle from_pid, ERef ref) {
peer().dsig_monitor(from_pid, this, ref);
return true;
}
@Override
- public boolean link_oneway(EHandle other) throws Pausable {
+ public boolean link_oneway(EHandle other) {
peer().dsig_link(other, this);
return true;
}
@Override
- public void unlink_oneway(EHandle other) throws Pausable {
+ public void unlink_oneway(EHandle other) {
peer().dsig_unlink(other, this);
}
@Override
- public void remove_monitor(EHandle sender, ERef ref, boolean flush) throws Pausable {
+ public void remove_monitor(EHandle sender, ERef ref, boolean flush) {
peer().dsig_demonitor(sender, ref, this);
}
@@ -110,7 +110,12 @@ public int send(EHandle sender, EObject msg) throws Pausable {
}
@Override
- public void send_monitor_exit(EHandle from, ERef ref, EObject reason) throws Pausable {
+ public int sendb(EHandle sender, EObject msg) {
+ return peer().dsig_send(sender, this, msg);
+ }
+
+ @Override
+ public void send_monitor_exit(EHandle from, ERef ref, EObject reason) {
peer().dsig_send_monitor_exit(from, this, ref, reason);
}
@@ -56,8 +56,8 @@ public boolean exists() {
public int send(EHandle sender, EObject msg) throws Pausable {
ETask<?> task = task();
if (task != null) {
- task.mbox.put(msg);
- return task.mbox.size();
+ task.mbox_send(msg);
+ return task.mbox_size();
} else {
if (ERT.log.isLoggable(Level.FINE)) {
ERT.log.fine("sending message to dead process/port ignored "+this+" ! "+msg);
@@ -69,18 +69,27 @@ public int send(EHandle sender, EObject msg) throws Pausable {
public void sendb(EObject msg) {
ETask<?> task = task();
if (task != null) {
- task.mbox().putb(msg);
+ task.mbox_sendb(msg);
}
}
+ public int sendb(EHandle sender, EObject msg) {
+ ETask<?> task = task();
+ if (task != null) {
+ task.mbox_sendb(msg);
+ return task.mbox_size();
+ }
+ return 0;
+ }
+
/**
* @param is_erlang_exit2 TODO
* @param self
* @param result
* @throws Pausable
* @throws Pausable
*/
- public void exit_signal(EHandle from, EObject reason, boolean is_erlang_exit2) throws Pausable {
+ public void exit_signal(EHandle from, EObject reason, boolean is_erlang_exit2) {
ETask<?> task = task();
if (task != null) {
task.send_exit(from, reason, is_erlang_exit2);
@@ -94,16 +103,16 @@ public void exit_signal(EHandle from, EObject reason, boolean is_erlang_exit2) t
* @throws Pausable
* @throws Pausable
*/
- public abstract boolean link_oneway(EHandle other) throws Pausable;
+ public abstract boolean link_oneway(EHandle other);
- public abstract void unlink_oneway(EHandle other) throws Pausable;
+ public abstract void unlink_oneway(EHandle other);
/**
* @param ref TODO
* @param selfHandle
* @throws Pausable
*/
- public abstract boolean add_monitor(EHandle observer, ERef ref) throws Pausable;
+ public abstract boolean add_monitor(EHandle observer, ERef ref);
/**
@@ -146,8 +155,8 @@ public EAtom node() {
* @param r
* @throws Pausable
*/
- public abstract void remove_monitor(EHandle sender, ERef r, boolean flush) throws Pausable;
+ public abstract void remove_monitor(EHandle sender, ERef r, boolean flush);
- public abstract void send_monitor_exit(EHandle from, ERef ref, EObject reason) throws Pausable;
+ public abstract void send_monitor_exit(EHandle from, ERef ref, EObject reason);
}
@@ -88,21 +88,21 @@ EProc task() {
* @see erjang.EHandle#link_oneway(erjang.EHandle)
*/
@Override
- public boolean link_oneway(EHandle other) throws Pausable {
+ public boolean link_oneway(EHandle other) {
EProc task = this.task;
if (task != null)
return task.link_oneway(other);
return false;
}
@Override
- public void unlink_oneway(EHandle other) throws Pausable {
+ public void unlink_oneway(EHandle other) {
EProc task = this.task;
if (task != null)
task.unlink_oneway(other);
}
- public synchronized boolean add_monitor(EHandle observer, ERef ref) throws Pausable {
+ public synchronized boolean add_monitor(EHandle observer, ERef ref) {
EProc task = this.task;
if (task != null)
return task.add_monitor(observer, ref);
@@ -111,7 +111,7 @@ public synchronized boolean add_monitor(EHandle observer, ERef ref) throws Pausa
}
@Override
- public void send_monitor_exit(EHandle from, ERef ref, EObject reason) throws Pausable {
+ public void send_monitor_exit(EHandle from, ERef ref, EObject reason) {
EProc task = this.task;
if (task != null) {
task.send_monitor_exit(from, ref, reason);
@@ -120,7 +120,7 @@ public void send_monitor_exit(EHandle from, ERef ref, EObject reason) throws Pau
}
@Override
- public void remove_monitor(EHandle sender, ERef r, boolean flush) throws Pausable {
+ public void remove_monitor(EHandle sender, ERef r, boolean flush) {
EProc task = this.task;
if (task != null)
task.remove_monitor(r, flush);
@@ -65,28 +65,28 @@ public boolean exists() {
* @see erjang.EHandle#link_oneway(erjang.EHandle)
*/
@Override
- public boolean link_oneway(EHandle other) throws Pausable {
+ public boolean link_oneway(EHandle other) {
return task.link_oneway(other);
}
@Override
- public void unlink_oneway(EHandle other) throws Pausable {
+ public void unlink_oneway(EHandle other) {
task.unlink_oneway(other);
}
- public boolean add_monitor(EHandle target, ERef ref) throws Pausable {
+ public boolean add_monitor(EHandle target, ERef ref) {
// TODO: check if task is alive!
return task.add_monitor(target, ref);
}
@Override
- public void remove_monitor(EHandle sender, ERef r, boolean flush) throws Pausable {
+ public void remove_monitor(EHandle sender, ERef r, boolean flush) {
task.remove_monitor(r, flush);
}
@Override
public void send_monitor_exit(EHandle from, ERef ref, EObject reason)
- throws Pausable {
+ {
EDriverTask task = this.task;
if (task != null) {
task.send_monitor_exit(from, ref, reason);
@@ -95,12 +95,19 @@ public synchronized int createPortID() {
}
public EObject dsig_reg_send(EInternalPID caller, EAtom name,
- EObject msg) throws Pausable {
- return ERT.send(caller.task(), name, msg);
+ EObject msg) {
+
+ EObject rcv = ERT.whereis(name);
+ EHandle hdl;
+ if (rcv != null && (hdl = rcv.testHandle()) != null) {
+ hdl.sendb(caller, msg);
+ }
+
+ return msg;
}
public void dsig_demonitor(EHandle sender, ERef ref,
- EObject to_pid_or_name) throws Pausable
+ EObject to_pid_or_name)
{
EInternalPID pid;
EAtom name;
@@ -88,7 +88,7 @@ public static EPeer get_or_create(EAtom node, int creation,
}
- public void net_message(EInternalPort port, ByteBuffer hdr, ByteBuffer buf) throws Pausable {
+ public void net_message(EInternalPort port, ByteBuffer hdr, ByteBuffer buf) {
try {
net_message2(port, hdr, buf);
} catch (IOException e) {
@@ -108,7 +108,7 @@ public void net_message(EInternalPort port, ByteBuffer hdr, ByteBuffer buf) thro
}
public void net_message2(EInternalPort port, ByteBuffer hdr, ByteBuffer buf)
- throws IOException, Pausable {
+ throws IOException {
if (buf.remaining() == 0) {
if (ERT.DEBUG_DIST) {
@@ -208,7 +208,7 @@ public void net_message2(EInternalPort port, ByteBuffer hdr, ByteBuffer buf)
if (dst == null)
throw new IOException("protocol error");
if (dst != null) {
- dst.send(null, msg);
+ dst.sendb(null, msg);
}
return;
}
@@ -408,7 +408,7 @@ public static EAbstractNode get(EAtom node) {
return peers.get(node);
}
- void dsig_cast(EHandle sender, ETuple hdr) throws Pausable {
+ void dsig_cast(EHandle sender, ETuple hdr) {
ByteBuffer disthdr = ByteBuffer.allocate(3);
disthdr.put((byte) 131);
@@ -430,14 +430,14 @@ void dsig_cast(EHandle sender, ETuple hdr) throws Pausable {
} else {
System.err.println("sending cast to dead task");
}
- } catch (IOException e) {
+ } catch (RuntimeException e) {
e.printStackTrace();
close_and_finish(port);
}
}
- int dsig_cast(EHandle sender, ETuple hdr, EObject payload) throws Pausable {
+ int dsig_cast(EHandle sender, ETuple hdr, EObject payload) {
ByteBuffer disthdr = ByteBuffer.allocate(3);
disthdr.put((byte) 131);
@@ -455,7 +455,7 @@ int dsig_cast(EHandle sender, ETuple hdr, EObject payload) throws Pausable {
try {
this.port.task().outputv(null, ev);
- } catch (IOException e) {
+ } catch (RuntimeException e) {
e.printStackTrace();
close_and_finish(port);
}
@@ -464,50 +464,50 @@ int dsig_cast(EHandle sender, ETuple hdr, EObject payload) throws Pausable {
}
- public int dsig_send(EHandle sender, EExternalPID pid, EObject msg) throws Pausable {
+ public int dsig_send(EHandle sender, EExternalPID pid, EObject msg) {
ETuple hdr = ETuple.make(ERT.box(SEND), (EAtom) am_, pid);
return dsig_cast(sender, hdr, msg);
}
public void dsig_send_monitor_exit(EHandle sender, EPID to_pid,
- ERef ref, EObject reason) throws Pausable {
+ ERef ref, EObject reason) {
ETuple hdr = ETuple.make(ERT.box(MONITOR_P_EXIT), sender, to_pid,
reason);
dsig_cast(sender, hdr);
}
- public void dsig_monitor(EHandle sender, EObject to_pid, ERef ref) throws Pausable {
+ public void dsig_monitor(EHandle sender, EObject to_pid, ERef ref) {
ETuple hdr = ETuple.make(ERT.box(MONITOR_P), sender, to_pid, ref);
dsig_cast(sender, hdr);
}
public void dsig_exit(EHandle sender, EPID to_pid,
- EObject reason) throws Pausable {
+ EObject reason) {
ETuple hdr = ETuple.make(ERT.box(EXIT), sender, to_pid, reason);
dsig_cast(sender, hdr);
}
- public void dsig_link(EHandle sender, EExternalPID to_pid) throws Pausable {
+ public void dsig_link(EHandle sender, EExternalPID to_pid) {
ETuple hdr = ETuple.make(ERT.box(LINK), sender, to_pid);
dsig_cast(sender, hdr);
}
- public void dsig_unlink(EHandle sender, EExternalPID to_pid) throws Pausable {
+ public void dsig_unlink(EHandle sender, EExternalPID to_pid) {
ETuple hdr = ETuple.make(ERT.box(UNLINK), sender, to_pid);
dsig_cast(sender, hdr);
}
public void dsig_demonitor(EHandle sender, ERef ref,
- EObject to_pid_or_name) throws Pausable {
+ EObject to_pid_or_name) {
ETuple hdr = ETuple.make(ERT.box(DEMONITOR_P), sender, to_pid_or_name, ref);
dsig_cast(sender, hdr);
}
- public EObject dsig_reg_send(EInternalPID sender, EAtom to_name, EObject msg) throws Pausable {
+ public EObject dsig_reg_send(EInternalPID sender, EAtom to_name, EObject msg) {
ETuple hdr = ETuple.make(ERT.box(REG_SEND), sender, am_, to_name);
dsig_cast(sender, hdr, msg);
return msg;
Oops, something went wrong.

0 comments on commit 0780c8b

Please sign in to comment.