Skip to content

Commit

Permalink
Merge cf1270b into 63ed0b7
Browse files Browse the repository at this point in the history
  • Loading branch information
fredoboulo committed Dec 11, 2019
2 parents 63ed0b7 + cf1270b commit 2ad36bd
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 11 deletions.
5 changes: 5 additions & 0 deletions src/main/java/zmq/socket/pubsub/Dist.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,9 @@ int eligible()
{
return eligible;
}

int matching()
{
return matching;
}
}
80 changes: 69 additions & 11 deletions src/test/java/zmq/socket/pubsub/DistTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ private static final class Parent extends ZObject
public void setup()
{
dist = new Dist();
Pipe[] pipes = Pipe.pair(new ZObject[] {new Parent(), new Parent()}, new int[2], new boolean[2]);
Pipe[] pipes = Pipe.pair(new ZObject[] {new Parent(), new Parent()}, new int[] {0, 1}, new boolean[2]);
first = pipes[0];
second = pipes[1];
assertThat(dist.eligible(), is(0));
assertThat(dist.active(), is(0));
}

@Test
public void testActivated()
{
dist.attach(first);
assertThat(dist.eligible(), is(1));
assertThat(dist.active(), is(1));
}

@Test
public void testActivatedWhileActive()
{
// check that there is no modification when activating a pipe straight away
dist.activated(first);
assertThat(dist.eligible(), is(1));
Expand All @@ -55,35 +55,93 @@ public void testActivated()
@Test
public void testAttachedWhenSendingMultipartMessage()
{
testActivated();

Msg msg = new Msg();
msg.setFlags(Msg.MORE);
dist.sendToMatching(msg);
dist.sendToAll(msg);
// no change in eligible/active states
assertThat(dist.eligible(), is(1));
assertThat(dist.active(), is(1));

dist.attach(second);
assertThat(dist.eligible(), is(2));
// active should not have changed, as we are in the middle a sending a multi-part message
assertThat(dist.active(), is(1));

// no change in eligible/active states
dist.activated(second);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(1));

dist.sendToAll(new Msg());
// end of multi-part message
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(2));
}

@Test
public void testAttachedWhenSendingNoMoreMessage()
{
testActivated();

Msg msg = new Msg();
dist.sendToMatching(msg);
dist.sendToAll(msg);
// no change in eligible/active states
assertThat(dist.eligible(), is(1));
assertThat(dist.active(), is(1));

dist.attach(second);
assertThat(dist.eligible(), is(2));
// active should have changed, as we are NOT in the middle a sending a multi-part message
assertThat(dist.active(), is(2));

dist.sendToAll(msg);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(2));

// no change in eligible/active states
dist.activated(second);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(2));
}

@Test
public void testActivatedAfterReachingHWM()
{
dist.attach(second);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(2));

// simulate HWM
second.write(new Msg());

// mark the pipe as not active and not eligible
dist.sendToAll(new Msg());

// now, second pipe has been put out of the active and eligible ones
assertThat(dist.eligible(), is(1));
assertThat(dist.active(), is(1));

// reactivate pipe having reached HWM
dist.activated(second);
assertThat(dist.eligible(), is(2));
assertThat(dist.active(), is(2));
}

@Test
public void testMatch()
{
dist.attach(second);
assertThat(dist.matching(), is(0));

// simulate HWM
second.write(new Msg());

// mark the pipe as not active and not eligible
dist.sendToAll(new Msg());

dist.match(first);
assertThat(dist.matching(), is(1));

dist.match(second);
// second pipe is not a matching one
assertThat(dist.matching(), is(1));
}
}

0 comments on commit 2ad36bd

Please sign in to comment.