Skip to content

Commit

Permalink
Merge pull request #759 from fredoboulo/fix/dist
Browse files Browse the repository at this point in the history
Problem: IndexOutOfBoundsException occurs when subscriptions count ex…
  • Loading branch information
daveyarwood committed Dec 10, 2019
2 parents e382fa6 + 1a9de2c commit 63ed0b7
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 8 deletions.
21 changes: 16 additions & 5 deletions src/main/java/zmq/socket/pubsub/Dist.java
Expand Up @@ -10,7 +10,6 @@
class Dist
{
// List of outbound pipes.
//typedef array_t <zmq::pipe_t, 2> pipes_t;
private final List<Pipe> pipes;

// Number of all the pipes to send the next message to.
Expand Down Expand Up @@ -109,12 +108,14 @@ public void terminated(Pipe pipe)
public void activated(Pipe pipe)
{
// Move the pipe from passive to eligible state.
Collections.swap(pipes, pipes.indexOf(pipe), eligible);
eligible++;
if (eligible < pipes.size()) {
Collections.swap(pipes, pipes.indexOf(pipe), eligible);
eligible++;
}

// If there's no message being sent at the moment, move it to
// the active state.
if (!more) {
if (!more && active < pipes.size()) {
Collections.swap(pipes, eligible - 1, active);
active++;
}
Expand All @@ -136,7 +137,7 @@ public boolean sendToMatching(Msg msg)
// Push the message to matching pipes.
distribute(msg);

// If mutlipart message is fully sent, activate all the eligible pipes.
// If multipart message is fully sent, activate all the eligible pipes.
if (!msgMore) {
active = eligible;
}
Expand Down Expand Up @@ -197,4 +198,14 @@ public boolean checkHwm()
}
return true;
}

int active()
{
return active;
}

int eligible()
{
return eligible;
}
}
89 changes: 89 additions & 0 deletions src/test/java/zmq/socket/pubsub/DistTest.java
@@ -0,0 +1,89 @@
package zmq.socket.pubsub;

import org.junit.Before;
import org.junit.Test;
import zmq.Msg;
import zmq.ZObject;
import zmq.pipe.Pipe;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

public class DistTest
{
private Dist dist;
private Pipe first;
private Pipe second;

private static final class Parent extends ZObject
{
Parent()
{
super(null, 0);
}
}

@Before
public void setup()
{
dist = new Dist();
Pipe[] pipes = Pipe.pair(new ZObject[] {new Parent(), new Parent()}, new int[2], new boolean[2]);
first = pipes[0];
second = pipes[1];
assertThat(dist.eligible(), is(0));
assertThat(dist.active(), is(0));
}

@Test
public void testActivated()
{
dist.attach(first);
assertThat(dist.eligible(), is(1));
assertThat(dist.active(), is(1));

// check that there is no modification when activating a pipe straight away
dist.activated(first);
assertThat(dist.eligible(), is(1));
assertThat(dist.active(), is(1));

// check re-entrance: no modification
dist.activated(first);
assertThat(dist.eligible(), is(1));
assertThat(dist.active(), is(1));
}

@Test
public void testAttachedWhenSendingMultipartMessage()
{
testActivated();

Msg msg = new Msg();
msg.setFlags(Msg.MORE);
dist.sendToMatching(msg);

dist.attach(second);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(1));

dist.activated(second);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(1));
}

@Test
public void testAttachedWhenSendingNoMoreMessage()
{
testActivated();

Msg msg = new Msg();
dist.sendToMatching(msg);

dist.attach(second);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(2));

dist.activated(second);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(2));
}
}
80 changes: 77 additions & 3 deletions src/test/java/zmq/socket/pubsub/PubSubHwmTest.java
@@ -1,17 +1,18 @@
package zmq.socket.pubsub;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;

import java.io.IOException;

import org.junit.Test;

import zmq.Ctx;
import zmq.Msg;
import zmq.SocketBase;
import zmq.ZError;
import zmq.ZMQ;
import zmq.util.Wire;

public class PubSubHwmTest
{
Expand Down Expand Up @@ -135,7 +136,7 @@ else if (ret == -1) {
}

@Test
public void testResetHwm() throws IOException
public void testResetHwm()
{
// hwm should apply to the messages that have already been received
// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
Expand Down Expand Up @@ -209,4 +210,77 @@ public void testResetHwm() throws IOException
ZMQ.close(pub);
ZMQ.term(ctx);
}

@Test
public void testSubscriptionsHwmTcp()
{
assertHwmSubscription(999, "tcp://*:*");
}

@Test
public void testSubscriptionsHwmInproc()
{
assertHwmSubscription(2000, "inproc://pubsub");
}

private void assertHwmSubscription(int subscribed, String endpoint)
{
int allSubscriptions = 30_000;
int subscriptionNotSent = allSubscriptions - 1;
Ctx ctx = ZMQ.createContext();

SocketBase pub = ctx.createSocket(ZMQ.ZMQ_PUB);
assertThat(pub, notNullValue());

boolean rc = ZMQ.bind(pub, endpoint);
assertThat(rc, is(true));

String host = (String) ZMQ.getSocketOptionExt(pub, ZMQ.ZMQ_LAST_ENDPOINT);
assertThat(host, notNullValue());

// Set up connect socket
SocketBase sub = ctx.createSocket(ZMQ.ZMQ_SUB);
assertThat(sub, notNullValue());

rc = ZMQ.setSocketOption(sub, ZMQ.ZMQ_RCVTIMEO, 100);
assertThat(rc, is(true));

// send a lot of subscriptions, far beyond the HWM
int idx = 0;
for (; idx < allSubscriptions; ++idx) {
rc = ZMQ.setSocketOption(sub, ZMQ.ZMQ_SUBSCRIBE, Wire.putUInt32(idx));
// at some point, sending will trigger the HWM and subscription will be dropped
assertThat(rc, is(true));
}
rc = ZMQ.connect(sub, host);
assertThat(rc, is(true));

ZMQ.msleep(500);

// Send messages
int sent = ZMQ.send(pub, Wire.putUInt32(1), 0);
assertThat(sent, is(4));
sent = ZMQ.send(pub, Wire.putUInt32(subscribed), 0);
assertThat(sent, is(4));
sent = ZMQ.send(pub, Wire.putUInt32(subscriptionNotSent), 0);
assertThat(sent, is(4));

ZMQ.msleep(500);

Msg msg = ZMQ.recv(sub, 0);
assertThat(msg, notNullValue());
assertThat(msg.data(), is(Wire.putUInt32(1)));

msg = ZMQ.recv(sub, 0);
assertThat(msg, notNullValue());
assertThat(msg.data(), is(Wire.putUInt32(subscribed)));

msg = ZMQ.recv(sub, 0);
assertThat(msg, nullValue());

// Clean up
ZMQ.close(sub);
ZMQ.close(pub);
ZMQ.term(ctx);
}
}

0 comments on commit 63ed0b7

Please sign in to comment.