diff --git a/src/main/java/zmq/DecoderBase.java b/src/main/java/zmq/DecoderBase.java index 41127865a..d1a9b097b 100644 --- a/src/main/java/zmq/DecoderBase.java +++ b/src/main/java/zmq/DecoderBase.java @@ -89,6 +89,7 @@ public ByteBuffer get_buffer() { } else b = read_buf; } else { + zero_copy = false; b = buf; b.clear(); } diff --git a/src/main/java/zmq/Poller.java b/src/main/java/zmq/Poller.java index c47a8eec3..7b471aabd 100644 --- a/src/main/java/zmq/Poller.java +++ b/src/main/java/zmq/Poller.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -205,8 +204,6 @@ public void run() { int rc; try { rc = selector.select(timeout); - } catch (ClosedSelectorException e) { - break; } catch (IOException e) { throw new ZError.IOException(e); } @@ -227,13 +224,14 @@ public void run() { continue; } - - if (key.isAcceptable()) { - evt.accept_event(); - } else if (key.isReadable() ) { - evt.in_event(); - } else if (key.isWritable()) { + if (key.isWritable()) { evt.out_event(); + } + + if (key.isReadable() ) { + evt.in_event(); + } else if (key.isAcceptable()) { + evt.accept_event(); } else if (key.isConnectable()) { evt.connect_event(); } diff --git a/src/main/java/zmq/SessionBase.java b/src/main/java/zmq/SessionBase.java index a1fd0fed3..384ae74da 100644 --- a/src/main/java/zmq/SessionBase.java +++ b/src/main/java/zmq/SessionBase.java @@ -26,47 +26,47 @@ public class SessionBase extends Own implements Pipe.IPipeEvents, IPollEvents { - Logger LOG = LoggerFactory.getLogger(SessionBase.class); + private static Logger LOG = LoggerFactory.getLogger(SessionBase.class); // If true, this session (re)connects to the peer. Otherwise, it's // a transient session created by the listener. - boolean connect; + private boolean connect; // Pipe connecting the session to its socket. - Pipe pipe; + private Pipe pipe; // This flag is true if the remainder of the message being processed // is still in the in pipe. - boolean incomplete_in; + private boolean incomplete_in; // True if termination have been suspended to push the pending // messages to the network. - boolean pending; + private boolean pending; // The protocol I/O engine connected to the session. - IEngine engine; + private IEngine engine; // The socket the session belongs to. - SocketBase socket; + private SocketBase socket; // I/O thread the session is living in. It will be used to plug in // the engines into the same thread. - IOThread io_thread; + private IOThread io_thread; // ID of the linger timer private static int linger_timer_id = 0x20; // True is linger timer is running. - boolean has_linger_timer; + private boolean has_linger_timer; // If true, identity is to be sent/recvd from the network. - boolean send_identity; - boolean recv_identity; + private boolean send_identity; + private boolean recv_identity; // Protocol and address to use when connecting. - final Address addr; + private final Address addr; - IOObject io_object; + private IOObject io_object; public static SessionBase create(IOThread io_thread_, boolean connect_, SocketBase socket_, Options options_, Address addr_) { diff --git a/src/main/java/zmq/Signaler.java b/src/main/java/zmq/Signaler.java index 43950fe9e..662bcc8b0 100644 --- a/src/main/java/zmq/Signaler.java +++ b/src/main/java/zmq/Signaler.java @@ -131,8 +131,11 @@ boolean wait_event (long timeout_) { ZError.EAGAIN(); return false; } + selector.selectedKeys().clear(); + + assert (rc == 1); return true; diff --git a/src/main/java/zmq/SocketBase.java b/src/main/java/zmq/SocketBase.java index de2ecd61c..dfb3eef59 100644 --- a/src/main/java/zmq/SocketBase.java +++ b/src/main/java/zmq/SocketBase.java @@ -575,8 +575,8 @@ public boolean send (Msg msg_, int flags_) } // Process pending commands, if any. - boolean brc = process_commands (0, true); - if (!brc) + boolean rc = process_commands (0, true); + if (!rc) return false; // Clear any user-visible flags that are set on the message. @@ -587,8 +587,8 @@ public boolean send (Msg msg_, int flags_) msg_.set_flags (Msg.more); // Try to send the message. - brc = xsend (msg_, flags_); - if (brc) + rc = xsend (msg_, flags_); + if (rc) return true; if (!ZError.is(ZError.EAGAIN)) return false; @@ -609,8 +609,8 @@ public boolean send (Msg msg_, int flags_) while (true) { if (!process_commands (timeout, false) ) return false; - brc = xsend (msg_, flags_); - if (brc) + rc = xsend (msg_, flags_); + if (rc) break; if (!ZError.is(ZError.EAGAIN)) return false; diff --git a/src/main/java/zmq/StreamEngine.java b/src/main/java/zmq/StreamEngine.java index f8ace2ecc..0323cef56 100644 --- a/src/main/java/zmq/StreamEngine.java +++ b/src/main/java/zmq/StreamEngine.java @@ -31,25 +31,25 @@ public class StreamEngine implements IEngine, IPollEvents { //final private IOObject io_object; private SocketChannel handle; - ByteBuffer inbuf; - int insize; - final DecoderBase decoder; - boolean input_error; + private ByteBuffer inbuf; + private int insize; + private final DecoderBase decoder; + private boolean input_error; private Transfer outbuf; - int outsize; - final EncoderBase encoder; + private int outsize; + private final EncoderBase encoder; // The session this engine is attached to. - SessionBase session; + private SessionBase session; // Detached transient session. - SessionBase leftover_session; + //private SessionBase leftover_session; - Options options; + private Options options; // String representation of endpoint - String endpoint; + private String endpoint; boolean plugged; diff --git a/src/main/java/zmq/YPipe.java b/src/main/java/zmq/YPipe.java index d2f858ebc..fc1c731db 100644 --- a/src/main/java/zmq/YPipe.java +++ b/src/main/java/zmq/YPipe.java @@ -122,13 +122,13 @@ public boolean check_read () { // Was the value prefetched already? If so, return. long h = queue.front_pos(); - if (h != r && r != -1) + if (h != r) return true; // There's no prefetched value, so let us prefetch more values. // Prefetching is to simply retrieve the // pointer from c in atomic fashion. If there are no - // items to prefetch, set c to NULL (using compare-and-swap). + // items to prefetch, set c to -1 (using compare-and-swap). if (c.compareAndSet (h, -1)) { // nothing to read, h == r must be the same } else { @@ -140,7 +140,7 @@ public boolean check_read () // During pipe's lifetime r should never be NULL, however, // it can happen during pipe shutdown when items // are being deallocated. - if (h == r || r == -1) + if (h == r) return false; // There was at least one value prefetched. diff --git a/src/main/java/zmq/YQueue.java b/src/main/java/zmq/YQueue.java index 3103d716c..7d1f9270c 100644 --- a/src/main/java/zmq/YQueue.java +++ b/src/main/java/zmq/YQueue.java @@ -52,13 +52,6 @@ protected Chunk(Class klass, int size, long memory_ptr, boolean allocate) { } } - protected void reset() { - if (!allocate) { - for (int i=0; i != values.length; i++) { - values[i] = null; - } - } - } }; // Back position may point to invalid memory if the queue is empty, @@ -129,9 +122,10 @@ public T back(T val) { public void pop() { + if (!allocate) + begin_chunk.values [begin_pos] = null; begin_pos++; if (begin_pos == size) { - begin_chunk.reset(); begin_chunk = begin_chunk.next; begin_chunk.prev = null; begin_pos = 0; diff --git a/src/main/java/zmq/ZError.java b/src/main/java/zmq/ZError.java index 3c320903e..dda6b6e2b 100644 --- a/src/main/java/zmq/ZError.java +++ b/src/main/java/zmq/ZError.java @@ -151,7 +151,7 @@ public static void ETERM() { } public static void EAGAIN() { - errno(EAGAIN); + errno.set(EAGAIN); } public static void EINTR() { diff --git a/src/test/java/perf/RemoteThr.java b/src/test/java/perf/RemoteThr.java index 1dc59f85a..29c97a959 100644 --- a/src/test/java/perf/RemoteThr.java +++ b/src/test/java/perf/RemoteThr.java @@ -85,7 +85,6 @@ public static void main(String[] argv) { ZMQ.zmq_term (ctx); - }