Skip to content

Commit

Permalink
Merge d2d7e97 into b405d87
Browse files Browse the repository at this point in the history
  • Loading branch information
isahkemat committed Jun 22, 2019
2 parents b405d87 + d2d7e97 commit ac76edd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
13 changes: 5 additions & 8 deletions src/main/java/org/zeromq/ZPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,13 @@ public boolean events(Socket socket, int events)
boolean first = true;
for (ItemHolder holder : holders) {
if (holder.item().hasEvent(events)) {
if (first) {
first = false;
has = true;
}
EventsHandler handler = holder.handler() == null ? globalHandler : holder.handler();
if (handler != null) {
boolean evts = handler.events(socket, events);
if (first) {
first = false;
has = evts;
}
else {
has &= evts;
}
has &= handler.events(socket, events);
}
}
}
Expand Down
40 changes: 40 additions & 0 deletions src/test/java/org/zeromq/TestZPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
Expand Down Expand Up @@ -477,4 +478,43 @@ private final Pipe pipe(ZPoller poller, BiFunction<SelectableChannel, Integer, B

return pipe;
}

@Test
public void testIssue729() throws InterruptedException, IOException
{
int port = Utils.findOpenPort();
ZContext ctx = new ZContext();
Socket sub = ctx.createSocket(SocketType.SUB);
Socket pub = ctx.createSocket(SocketType.PUB);
sub.bind("tcp://127.0.0.1:" + port);
pub.connect("tcp://127.0.0.1:" + port);
boolean subscribe = sub.subscribe("");
assertTrue("SUB Socket could not subscribe", subscribe);
ZPoller zPoller = new ZPoller(ctx);
zPoller.register(new ZPoller.ZPollItem(sub, null, ZPoller.POLLIN));

Thread server = new Thread(new Runnable()
{
@Override
public void run()
{
while (true) {
pub.send("hello");
try {
Thread.sleep(100);
}
catch (InterruptedException ignored) {
}
}
}
});
server.start();
int rc = zPoller.poll(-1);
server.interrupt();
assertThat("ZPoller does not understand SUB socket signaled", rc, is(1));
boolean pollin = zPoller.pollin(sub);
assertTrue(pollin);
String hello = sub.recvStr();
assertThat("recieved message are not identical to what has been sent", hello, is("hello"));
}
}

0 comments on commit ac76edd

Please sign in to comment.