Skip to content

Commit

Permalink
enhance send performance
Browse files Browse the repository at this point in the history
  • Loading branch information
miniway committed Sep 1, 2012
1 parent c53333c commit 4e350f3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
23 changes: 15 additions & 8 deletions src/main/java/zmq/Signaler.java
Expand Up @@ -25,6 +25,7 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.Pipe;
import java.util.concurrent.atomic.AtomicBoolean;

// This is a cross-platform equivalent to signal_fd. However, as opposed
// to signal_fd there can be at most one signal in the signaler at any
Expand All @@ -36,12 +37,17 @@ public class Signaler {
private Pipe.SinkChannel w;
private Pipe.SourceChannel r;
private Selector selector;
ByteBuffer sdummy;
private ByteBuffer sdummy;
private ByteBuffer rdummy;

// Selector.selectNow at every sending message doesn't show enough performance
private final AtomicBoolean hasevt;

public Signaler() {
// Create the socketpair for signaling.
make_fdpair ();

hasevt = new AtomicBoolean(false);
// Set both fds to non-blocking mode.
try {
Utils.unblock_socket (w);
Expand All @@ -58,6 +64,7 @@ public Signaler() {
}

sdummy = ByteBuffer.allocate(1);
rdummy = ByteBuffer.allocate(1);
sdummy.put((byte)0);

}
Expand Down Expand Up @@ -107,7 +114,7 @@ public void send ()
assert (nbytes == 1);
break;
}

hasevt.set(true);
}

boolean wait_event (long timeout_) {
Expand All @@ -119,7 +126,9 @@ boolean wait_event (long timeout_) {
if (timeout_ < 0) {
rc = selector.select(0);
} else if (timeout_ == 0) {
rc = selector.selectNow();
if (hasevt.compareAndSet(true, false)) {
rc = selector.selectNow();
}
} else {
rc = selector.select(timeout_);
}
Expand All @@ -134,8 +143,6 @@ boolean wait_event (long timeout_) {

selector.selectedKeys().clear();



assert (rc == 1);
return true;

Expand All @@ -144,14 +151,14 @@ boolean wait_event (long timeout_) {

public void recv ()
{
ByteBuffer dummy = ByteBuffer.allocate(1);
int nbytes;
try {
nbytes = r.read(dummy);
nbytes = r.read(rdummy);
rdummy.rewind();
} catch (IOException e) {
throw new ZError.IOException(e);
}
assert (nbytes >= 0);
assert (nbytes == 1);
}


Expand Down
8 changes: 7 additions & 1 deletion src/main/java/zmq/ZError.java
Expand Up @@ -101,7 +101,13 @@ public static void exc(java.io.IOException e) {
}

public static boolean is(int code) {
return errno.get() == code;
switch(code) {
case EINTR:
return false;
default:
return errno.get() == code;
}

}

public static void ENOTSUP() {
Expand Down

0 comments on commit 4e350f3

Please sign in to comment.