Skip to content

Commit

Permalink
Fix LossToleranceTest
Browse files Browse the repository at this point in the history
Because of the behaviour change introduced in
hazelcast#16303, when the requested
sequence is larger than the largest sequence (tailSequence) + 1, we
don't listen from the oldest sequence (headSequence) but rather from the
tailSequence + 1. Both approaches are fine and both approaches work
better in some scenarios. Since the listener is loss tolerant, we can
skip items from headSequence..tailSequence+1 anyway.

Fixed the test to adhere to the new behaviour. We assume that eventually
as we publish an item, it will reach the listener.

A better fix would be to introduce unique IDs per ringbuffer, where we
would then be able to distinguish between a completely lost ringbuffer
and a ringbuffer which has not received the last few items and
appropriately reset the requested sequence to the headSequence or
tailSequence.

Fixes: hazelcast#16430
  • Loading branch information
Matko Medenjak committed Jan 8, 2020
1 parent 0a19592 commit 9584a09
Showing 1 changed file with 20 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

package com.hazelcast.topic.impl.reliable;

import com.hazelcast.cluster.Member;
import com.hazelcast.config.Config;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cluster.Member;
import com.hazelcast.instance.impl.TestUtil;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelJVMTest;
Expand All @@ -32,6 +31,8 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.UUID;

import static com.hazelcast.ringbuffer.impl.RingbufferService.TOPIC_RB_PREFIX;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -80,20 +81,12 @@ public void whenNotLossTolerant_thenTerminate() {
topic.publish("foo");

// we add so many items that the items the listener wants to listen to, doesn't exist anymore
for (; ; ) {
do {
topic.publish("item");
if (ringbuffer.headSequence() > listener.initialSequence) {
break;
}
}
} while (ringbuffer.headSequence() <= listener.initialSequence);
topic.addMessageListener(listener);

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertTrue(topic.runnersMap.isEmpty());
}
});
assertTrueEventually(() -> assertTrue(topic.runnersMap.isEmpty()));
}

@Test
Expand All @@ -103,22 +96,16 @@ public void whenLossTolerant_thenContinue() {
listener.isLossTolerant = true;

// we add so many items that the items the listener wants to listen to, doesn't exist anymore
for (; ; ) {
do {
topic.publish("item");
if (ringbuffer.headSequence() > listener.initialSequence) {
break;
}
}
} while (ringbuffer.headSequence() <= listener.initialSequence);

topic.addMessageListener(listener);
topic.publish("newItem");

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertContains(listener.objects, "newItem");
assertFalse(topic.runnersMap.isEmpty());
}
assertTrueEventually(() -> {
assertContains(listener.objects, "newItem");
assertFalse(topic.runnersMap.isEmpty());
});
}

Expand All @@ -130,24 +117,19 @@ public void whenLossTolerant_andOwnerCrashes_thenContinue() {
topic.publish("item1");
topic.publish("item2");

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertContains(listener.objects, "item1");
assertContains(listener.objects, "item2");
}
assertTrueEventually(() -> {
assertContains(listener.objects, "item1");
assertContains(listener.objects, "item2");
});
TestUtil.terminateInstance(topicOwnerInstance);

topic.publish("newItem");


assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertContains(listener.objects, "newItem");
assertTrueEventually(() -> {
String item = "newItem " + UUID.randomUUID();
topic.publish(item);
assertTrueEventually(() -> {
assertContains(listener.objects, item);
assertFalse(topic.runnersMap.isEmpty());
}
}, 5);
});
}
}

0 comments on commit 9584a09

Please sign in to comment.