Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

consolidate different usage pattern with jzmq

  • Loading branch information...
commit a3aa4a6a1e6c75ec8783d4d1444687ae768ef143 1 parent 0c0f9b0
@miniway authored
View
3  src/main/java/org/jeromq/ZContext.java
@@ -145,6 +145,8 @@ public void destroySocket(Socket s) {
public static ZContext shadow(ZContext ctx) {
ZContext shadow = new ZContext();
shadow.setContext(ctx.getContext());
+ shadow.setMain (false);
+
return shadow;
}
@@ -218,7 +220,6 @@ public Context getContext() {
*/
public void setContext (Context ctx) {
this.context = ctx;
- setMain (false);
}
/**
View
195 src/main/java/org/jeromq/ZMQ.java
@@ -21,7 +21,6 @@
package org.jeromq;
-import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
@@ -1133,7 +1132,40 @@ public void setFlags(int flags) {
}
}
- public static class Poller implements Closeable {
+ // GC closes selector handle
+ protected static class ReuseableSelector {
+
+ private Selector selector;
+
+ protected ReuseableSelector () {
+ selector = null;
+ }
+
+ public Selector open () throws IOException
+ {
+ selector = Selector.open();
+ return selector;
+ }
+
+ public Selector get ()
+ {
+ assert (selector != null);
+ assert (selector.isOpen ());
+ return selector;
+ }
+
+ @Override
+ public void finalize ()
+ {
+ try {
+ selector.close ();
+ } catch (IOException e) {
+ }
+ }
+
+ }
+
+ public static class Poller {
/**
* These values can be ORed to specify what we want to poll for.
@@ -1145,11 +1177,11 @@ public void setFlags(int flags) {
private static final int SIZE_DEFAULT = 32;
private static final int SIZE_INCREMENT = 16;
+ private static final ThreadLocal <ReuseableSelector> holder = new ThreadLocal <ReuseableSelector> ();
private Selector selector;
private zmq.PollItem items[];
private long timeout;
private int next;
- private boolean autoClose;
/**
* Class constructor.
@@ -1163,7 +1195,7 @@ protected Poller (Context context, int size) {
items = new zmq.PollItem[size];
timeout = -1L;
next = 0;
- autoClose = true;
+ open ();
}
/**
@@ -1177,30 +1209,22 @@ protected Poller (Context context) {
}
private void open () {
- if (selector != null)
- return;
-
- try {
- selector = Selector.open();
- } catch (IOException e) {
- throw new ZError.IOException(e);
- }
- }
-
- @Override
- public void close () {
- try {
- if (selector != null) {
- selector.close();
- selector = null;
+ if (holder.get () == null) {
+ synchronized (holder) {
+ try {
+ if (selector == null) {
+ ReuseableSelector s = new ReuseableSelector ();
+ selector = s.open ();
+ holder.set (s);
+ }
+ } catch (IOException e) {
+ throw new ZError.IOException(e);
+ }
}
- } catch (IOException e) {
}
+ selector = holder.get ().get ();
}
- public void setAutoClose (boolean autoClose) {
- this.autoClose = autoClose;
- }
/**
* Register a Socket for polling on all events.
*
@@ -1224,25 +1248,35 @@ public int register (Socket socket) {
* @return the index identifying this Socket in the poll set.
*/
public int register (Socket socket, int events) {
-
- int pos = 0;
- for (pos = 0; pos < items.length ;pos++) {
- zmq.PollItem item = items[pos];
- if (item == null) {
- items[pos] = new zmq.PollItem(socket.base, events);
- break;
- }
- }
+
+ return insert (new zmq.PollItem (socket.base, events));
+ }
+
+ /**
+ * Register a Socket for polling on the specified events.
+ *
+ * Automatically grow the internal representation if needed.
+ *
+ * @param channel
+ * the Channel we are registering.
+ * @param events
+ * a mask composed by XORing POLLIN, POLLOUT and POLLERR.
+ * @return the index identifying this Channel in the poll set.
+ */
+ public int register (SelectableChannel channel, int events) {
+
+ return insert (new zmq.PollItem (channel, events));
+ }
+
+ private int insert (zmq.PollItem item)
+ {
+ int pos = next++;
if (pos == items.length) {
zmq.PollItem[] nitems = new zmq.PollItem[items.length + SIZE_INCREMENT];
- for (pos = 0; pos < items.length ;pos++) {
- nitems[pos] = items[pos];
- }
- nitems[pos] = new zmq.PollItem(socket.base, events);
+ System.arraycopy (items, 0, nitems, 0, items.length);
items = nitems;
}
- if (pos >= next)
- next = pos + 1;
+ items [pos] = item;
return pos;
}
@@ -1255,12 +1289,36 @@ public int register (Socket socket, int events) {
public void unregister (Socket socket) {
for (int pos = 0; pos < items.length ;pos++) {
zmq.PollItem item = items[pos];
- if (item.socket() == socket.base) {
- items[pos] = null;
+ if (item.socket () == socket.base) {
+ remove (pos);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Unregister a Socket for polling on the specified events.
+ *
+ * @param channel
+ * the Socket to be unregistered
+ */
+ public void unregister (SelectableChannel channel) {
+ for (int pos = 0; pos < items.length ;pos++) {
+ zmq.PollItem item = items[pos];
+ if (item.getChannel () == channel) {
+ remove (pos);
break;
}
}
}
+
+ private void remove (int pos)
+ {
+ next--;
+ if (pos != next)
+ items [pos] = items [next];
+ items [next] = null;
+ }
/**
* Get the socket associated with an index.
@@ -1322,7 +1380,7 @@ public int getNext () {
* has been set, use that value as timeout; otherwise, block
* indefinitely.
*
- * @return how many objects where signalled by poll ().
+ * @return how many objects where signaled by poll ().
*/
public int poll () {
long tout = -1L;
@@ -1347,59 +1405,54 @@ public int poll () {
*
* @see http://api.zeromq.org/3-0:zmq-poll
*
- * @return how many objects where signalled by poll ()
+ * @return how many objects where signaled by poll ()
*/
public int poll(long tout) {
-
- open();
- int ret = zmq.ZMQ.zmq_poll (selector, items, tout);
- if (autoClose)
- close();
- return ret;
+ return zmq.ZMQ.zmq_poll (selector, items, tout);
}
/**
- * Check whether the specified element in the poll set was signalled for input.
+ * Check whether the specified element in the poll set was signaled for input.
*
* @param index
*
- * @return true if the element was signalled.
+ * @return true if the element was signaled.
*/
public boolean pollin (int index) {
- zmq.PollItem item ;
- if (index < 0 || index >= this.next || (item = items[index]) == null)
+ if (index < 0 || index >= this.next)
return false;
- return item.isReadable();
+
+ return items [index].isReadable();
}
/**
- * Check whether the specified element in the poll set was signalled for output.
+ * Check whether the specified element in the poll set was signaled for output.
*
* @param index
*
- * @return true if the element was signalled.
+ * @return true if the element was signaled.
*/
public boolean pollout (int index) {
- zmq.PollItem item ;
- if (index < 0 || index >= this.next || (item = items[index]) == null)
+
+ if (index < 0 || index >= this.next)
return false;
- return item.isWriteable();
-
+
+ return items [index].isWriteable();
}
/**
- * Check whether the specified element in the poll set was signalled for error.
+ * Check whether the specified element in the poll set was signaled for error.
*
* @param index
*
- * @return true if the element was signalled.
+ * @return true if the element was signaled.
*/
public boolean pollerr (int index) {
- zmq.PollItem item ;
- if (index < 0 || index >= this.next || (item = items[index]) == null)
+
+ if (index < 0 || index >= this.next)
return false;
- return item.isError();
-
+
+ return items [index].isError();
}
}
@@ -1407,10 +1460,14 @@ public boolean pollerr (int index) {
private final zmq.PollItem base;
- public PollItem(Socket s, int ops) {
- base = new zmq.PollItem(s.base, ops);
+ public PollItem (Socket s, int ops) {
+ base = new zmq.PollItem (s.base, ops);
}
-
+
+ public PollItem (SelectableChannel s, int ops) {
+ base = new zmq.PollItem (s, ops);
+ }
+
public final zmq.PollItem base() {
return base;
}
View
13 src/main/java/zmq/ZMQ.java
@@ -593,25 +593,26 @@ public static int zmq_poll(Selector selector, PollItem[] items_, long timeout_ )
long now = 0;
long end = 0;
- HashMap<PollItem, SelectionKey> saved = new HashMap<PollItem,SelectionKey>();
+ HashMap<SelectableChannel, SelectionKey> saved = new HashMap<SelectableChannel, SelectionKey>();
for (SelectionKey key: selector.keys()) {
- saved.put((PollItem)key.attachment(), key);
+ saved.put(key.channel (), key);
}
for (PollItem item: items_) {
if (item == null)
- continue;
+ break;
SelectableChannel ch = item.getChannel(); // mailbox channel if ZMQ socket
- SelectionKey key = saved.remove(item);
+ SelectionKey key = saved.remove(ch);
if (key != null) {
- if (item.interestOps() != item.interestOps()) {
+ if (key.interestOps() != item.interestOps()) {
key.interestOps(item.interestOps());
}
+ key.attach (item);
} else {
try {
- key = ch.register(selector, item.interestOps(),item);
+ key = ch.register(selector, item.interestOps(), item);
} catch (ClosedChannelException e) {
throw new ZError.IOException(e);
}
View
11 src/test/java/guide/tripping.java
@@ -25,12 +25,11 @@ public void run() {
frontend.bind("tcp://*:5555");
backend.bind("tcp://*:5556");
- ZMQ.Poller items = ctx.getContext().poller();
- items.setAutoClose (false);
- items.register(frontend, ZMQ.Poller.POLLIN);
- items.register(backend, ZMQ.Poller.POLLIN);
-
while (!Thread.currentThread().isInterrupted()) {
+ ZMQ.Poller items = ctx.getContext().poller();
+ items.register(frontend, ZMQ.Poller.POLLIN);
+ items.register(backend, ZMQ.Poller.POLLIN);
+
if (items.poll() == -1)
break; // Interrupted
if (items.pollin(0)) {
@@ -43,6 +42,7 @@ public void run() {
msg.send(backend);
}
if (items.pollin(1)) {
+
ZMsg msg = ZMsg.recvMsg(backend);
if (msg == null)
break; // Interrupted
@@ -52,7 +52,6 @@ public void run() {
msg.send(frontend);
}
}
- items.close ();
ctx.destroy();
}
Please sign in to comment.
Something went wrong with that request. Please try again.