Skip to content

Commit

Permalink
Clean up on migration / partition listener tests
Browse files Browse the repository at this point in the history
* Hold the migrations until all nodes join so that there will be no retries / failed migrations etc.

Fixes hazelcast#10346
Fixes hazelcast#10626
Fixes hazelcast#10625
  • Loading branch information
metanet committed May 22, 2017
1 parent bf88e92 commit dee2e82
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 50 deletions.
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.partition.MigrationInfo;
import com.hazelcast.internal.partition.impl.InternalMigrationListener.MigrationParticipant;
import com.hazelcast.internal.partition.impl.MigrationCommitTest.DelayMigrationStart;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
Expand All @@ -32,6 +33,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.junit.Assert.assertEquals;

Expand All @@ -46,6 +48,10 @@ public void shouldInvokeInternalMigrationListenerOnSuccessfulMigration() {
final Config config1 = new Config();
config1.setProperty(GroupProperty.PARTITION_COUNT.getName(), String.valueOf(PARTITION_COUNT));

// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
final CountDownLatch migrationStartLatch = new CountDownLatch(1);
config1.addListenerConfig(new ListenerConfig(new DelayMigrationStart(migrationStartLatch)));

final TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(2);
final HazelcastInstance hz1 = factory.newHazelcastInstance(config1);
warmUpPartitions(hz1);
Expand All @@ -56,6 +62,8 @@ public void shouldInvokeInternalMigrationListenerOnSuccessfulMigration() {
config2.addListenerConfig(new ListenerConfig(listener));
final HazelcastInstance hz2 = factory.newHazelcastInstance(config2);

migrationStartLatch.countDown();

waitAllForSafeState(hz1, hz2);

final List<Integer> hz2PartitionIds = getNodeEngineImpl(hz2).getPartitionService().getMemberPartitions(getAddress(hz2));
Expand Down
Expand Up @@ -143,9 +143,12 @@ public void shouldCommitMigrationWhenMasterIsNotMigrationEndpoint() {

@Test
public void shouldRollbackMigrationWhenMasterCrashesBeforeCommit() {
CountDownLatch migrationStartLatch = new CountDownLatch(1);
Config config1 = createConfig();
config1.setLiteMember(true);
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
CountDownLatch migrationStartLatch = new CountDownLatch(1);
DelayMigrationStart masterListener = new DelayMigrationStart(migrationStartLatch);
config1.addListenerConfig(new ListenerConfig(masterListener));

HazelcastInstance hz1 = factory.newHazelcastInstance(config1);

Expand All @@ -154,17 +157,17 @@ public void shouldRollbackMigrationWhenMasterCrashesBeforeCommit() {
warmUpPartitions(hz1, hz2);
waitAllForSafeState(hz1, hz2);

Config config3 = createConfig();
final TerminateOtherMemberOnMigrationComplete listener3
= new TerminateOtherMemberOnMigrationComplete(migrationStartLatch);
listener3.other = hz1;
migrationStartLatch.countDown();

Config config3 = createConfig();
config3.addListenerConfig(new ListenerConfig(listener3));
HazelcastInstance hz3 = factory.newHazelcastInstance(config3);

assertClusterSizeEventually(2, hz2);
assertClusterSize(2, hz3);
assertClusterSizeEventually(3, hz2);
assertClusterSize(3, hz1, hz3);

migrationStartLatch.countDown();

assertTrueEventually(new AssertTask() {
@Override
Expand All @@ -181,9 +184,10 @@ public void run()

@Test
public void shouldRollbackMigrationWhenDestinationCrashesBeforeCommit() {
CountDownLatch migrationStartLatch = new CountDownLatch(1);
Config config1 = createConfig();
config1.setLiteMember(true);
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
CountDownLatch migrationStartLatch = new CountDownLatch(1);
TerminateOtherMemberOnMigrationComplete masterListener = new TerminateOtherMemberOnMigrationComplete(
migrationStartLatch);
config1.addListenerConfig(new ListenerConfig(masterListener));
Expand All @@ -196,15 +200,13 @@ public void shouldRollbackMigrationWhenDestinationCrashesBeforeCommit() {
waitAllForSafeState(hz1, hz2);

HazelcastInstance hz3 = factory.newHazelcastInstance(createConfig());
masterListener.other = hz3;

assertClusterSize(3, hz1, hz3);
assertClusterSizeEventually(3, hz2);

masterListener.other = hz3;
migrationStartLatch.countDown();

sleepAtLeastSeconds(10);

waitAllForSafeState(hz1, hz2);

InternalPartition partition0 = getPartitionService(hz2).getPartition(0);
Expand All @@ -219,23 +221,26 @@ public void shouldRollbackMigrationWhenDestinationCrashesBeforeCommit() {

@Test
public void shouldCommitMigrationWhenMasterCrashesAfterDestinationCommit() {
CountDownLatch migrationStartLatch = new CountDownLatch(1);
Config config1 = createConfig();
config1.setLiteMember(true);
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
CountDownLatch migrationStartLatch = new CountDownLatch(1);
DelayMigrationStart masterListener = new DelayMigrationStart(migrationStartLatch);
config1.addListenerConfig(new ListenerConfig(masterListener));

HazelcastInstance hz1 = factory.newHazelcastInstance(config1);

Config config2 = createConfig();
final CollectMigrationTaskOnCommit sourceListener = new CollectMigrationTaskOnCommit();
final CollectMigrationTaskOnCommit sourceListener = new CollectMigrationTaskOnCommit(migrationStartLatch);
config2.addListenerConfig(new ListenerConfig(sourceListener));
HazelcastInstance hz2 = factory.newHazelcastInstance(config2);

warmUpPartitions(hz1, hz2);
waitAllForSafeState(hz1, hz2);

Config config3 = createConfig();
TerminateOtherMemberOnMigrationCommit destinationListener = new TerminateOtherMemberOnMigrationCommit(
migrationStartLatch);

TerminateOtherMemberOnMigrationCommit destinationListener = new TerminateOtherMemberOnMigrationCommit();
destinationListener.other = hz1;
config3.addListenerConfig(new ListenerConfig(destinationListener));
HazelcastInstance hz3 = factory.newHazelcastInstance(config3);
Expand Down Expand Up @@ -266,9 +271,10 @@ public void run()

@Test
public void shouldCommitMigrationWhenSourceFailsDuringCommit() {
CountDownLatch migrationStartLatch = new CountDownLatch(1);
Config config1 = createConfig();
config1.setLiteMember(true);
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
CountDownLatch migrationStartLatch = new CountDownLatch(1);
TerminateOtherMemberOnMigrationComplete masterListener = new TerminateOtherMemberOnMigrationComplete(migrationStartLatch);
config1.addListenerConfig(new ListenerConfig(masterListener));

Expand Down Expand Up @@ -308,9 +314,10 @@ public void shouldCommitMigrationWhenSourceFailsDuringCommit() {

@Test
public void shouldRollbackMigrationWhenDestinationCrashesDuringCommit() {
CountDownLatch migrationStartLatch = new CountDownLatch(1);
Config config1 = createConfig();
config1.setLiteMember(true);
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
CountDownLatch migrationStartLatch = new CountDownLatch(1);
DelayMigrationStartOnMaster masterListener = new DelayMigrationStartOnMaster(migrationStartLatch);
config1.addListenerConfig(new ListenerConfig(masterListener));

Expand Down Expand Up @@ -350,9 +357,10 @@ public void shouldRollbackMigrationWhenDestinationCrashesDuringCommit() {

@Test
public void shouldRetryMigrationIfParticipantPartitionTableVersionFallsBehind() {
CountDownLatch migrationStartLatch = new CountDownLatch(1);
Config config1 = createConfig();
config1.setLiteMember(true);
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
CountDownLatch migrationStartLatch = new CountDownLatch(1);
final IncrementPartitionTableOnMigrationStart masterListener
= new IncrementPartitionTableOnMigrationStart(migrationStartLatch);
config1.addListenerConfig(new ListenerConfig(masterListener));
Expand Down Expand Up @@ -387,10 +395,11 @@ public void run()

@Test
public void shouldEvictCompletedMigrationsWhenAllMembersAckPublishedPartitionTableAfterSuccessfulMigration() {
CountDownLatch migrationStartLatch = new CountDownLatch(1);
Config config1 = createConfig();
config1.setLiteMember(true);
final CollectMigrationTaskOnCommit masterListener = new CollectMigrationTaskOnCommit();
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
CountDownLatch migrationStartLatch = new CountDownLatch(1);
final CollectMigrationTaskOnCommit masterListener = new CollectMigrationTaskOnCommit(migrationStartLatch);
config1.addListenerConfig(new ListenerConfig(masterListener));

HazelcastInstance hz1 = factory.newHazelcastInstance(config1);
Expand All @@ -403,11 +412,7 @@ public void shouldEvictCompletedMigrationsWhenAllMembersAckPublishedPartitionTab
warmUpPartitions(hz1, hz2);
waitAllForSafeState(hz1, hz2);

Config config3 = createConfig();
DelayMigrationStart destinationListener = new DelayMigrationStart(migrationStartLatch);

config3.addListenerConfig(new ListenerConfig(destinationListener));
HazelcastInstance hz3 = factory.newHazelcastInstance(config3);
HazelcastInstance hz3 = factory.newHazelcastInstance(createConfig());

assertClusterSize(3, hz1, hz3);
assertClusterSizeEventually(3, hz2);
Expand All @@ -427,15 +432,16 @@ public void run() throws Exception {

@Test
public void shouldNotEvictCompletedMigrationsWhenSomeMembersDoNotAckPublishedPartitionTableAfterSuccessfulMigration() {
CountDownLatch migrationStartLatch = new CountDownLatch(1);
CountDownLatch migrationCommitLatch = new CountDownLatch(1);
Config config1 = createConfig();
config1.setLiteMember(true);
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
CountDownLatch migrationStartLatch = new CountDownLatch(1);
config1.addListenerConfig(new ListenerConfig(new DelayMigrationStart(migrationStartLatch)));

final HazelcastInstance hz1 = factory.newHazelcastInstance(config1);

Config config2 = createConfig();
CountDownLatch migrationCommitLatch = new CountDownLatch(1);
config2.addListenerConfig(new ListenerConfig(new DelayMigrationCommit(migrationCommitLatch)));
HazelcastInstance hz2 = factory.newHazelcastInstance(config2);

Expand Down Expand Up @@ -581,13 +587,13 @@ public void setHazelcastInstance(HazelcastInstance instance) {
}
}

private static class DelayMigrationStart extends InternalMigrationListener implements HazelcastInstanceAware {
public static class DelayMigrationStart extends InternalMigrationListener implements HazelcastInstanceAware {

private final CountDownLatch migrationStartLatch;

private volatile HazelcastInstance instance;

DelayMigrationStart(CountDownLatch migrationStartLatch) {
public DelayMigrationStart(CountDownLatch migrationStartLatch) {
this.migrationStartLatch = migrationStartLatch;
}

Expand Down Expand Up @@ -688,20 +694,9 @@ public void setHazelcastInstance(HazelcastInstance instance) {
private static class TerminateOtherMemberOnMigrationCommit
extends InternalMigrationListener implements HazelcastInstanceAware {

private final CountDownLatch migrationStartLatch;

private volatile HazelcastInstance instance;
private volatile HazelcastInstance other;

TerminateOtherMemberOnMigrationCommit(CountDownLatch migrationStartLatch) {
this.migrationStartLatch = migrationStartLatch;
}

@Override
public void onMigrationStart(MigrationParticipant participant, MigrationInfo migrationInfo) {
assertOpenEventually(migrationStartLatch);
}

@Override
public void onMigrationCommit(MigrationParticipant participant, MigrationInfo migrationInfo) {
int memberCount = instance.getCluster().getMembers().size();
Expand Down Expand Up @@ -753,13 +748,19 @@ public void setHazelcastInstance(HazelcastInstance instance) {

private static class CollectMigrationTaskOnCommit extends InternalMigrationListener implements HazelcastInstanceAware {

private final CountDownLatch migrationStartLatch;
private final AtomicReference<MigrationInfo> migrationInfoRef = new AtomicReference<MigrationInfo>();

private volatile boolean commit;
private volatile HazelcastInstance instance;

public CollectMigrationTaskOnCommit(CountDownLatch migrationStartLatch) {
this.migrationStartLatch = migrationStartLatch;
}

@Override
public void onMigrationStart(MigrationParticipant participant, MigrationInfo migrationInfo) {
assertOpenEventually(migrationStartLatch);
if (commit) {
System.err.println("Ignoring new migration start: " + migrationInfo + " as participant: " + participant
+ " since expected migration is already committed");
Expand Down
Expand Up @@ -17,10 +17,12 @@
package com.hazelcast.partition;

import com.hazelcast.config.Config;
import com.hazelcast.config.ListenerConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MigrationEvent;
import com.hazelcast.core.MigrationListener;
import com.hazelcast.core.PartitionService;
import com.hazelcast.internal.partition.impl.MigrationCommitTest.DelayMigrationStart;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
Expand Down Expand Up @@ -56,19 +58,18 @@ public void testMigrationListenerCalledOnlyOnceWhenMigrationHappens() throws Exc
final int partitionCount = 10;
config.setProperty(GroupProperty.PARTITION_COUNT.getName(), String.valueOf(partitionCount));

HazelcastInstance instance = factory.newHazelcastInstance(config);
// hold the migrations until all nodes join so that there will be no retries / failed migrations etc.
final CountDownLatch migrationStartLatch = new CountDownLatch(1);
config.addListenerConfig(new ListenerConfig(new DelayMigrationStart(migrationStartLatch)));

CountDownLatch migrationStartLatch = new CountDownLatch(1);
HazelcastInstance instance = factory.newHazelcastInstance(config);
warmUpPartitions(instance);

final CountingMigrationListener migrationListener = new CountingMigrationListener(migrationStartLatch, partitionCount);
instance.getPartitionService().addMigrationListener(migrationListener);

warmUpPartitions(instance);

final HazelcastInstance instance2 = factory.newHazelcastInstance(config);

assertNodeStartedEventually(instance2);

migrationStartLatch.countDown();

waitAllForSafeState(instance2, instance);
Expand Down Expand Up @@ -167,14 +168,11 @@ private void assertAllLessThanOrEqual(AtomicInteger[] integers, int expected) {

private static class CountingMigrationListener implements MigrationListener {

CountDownLatch migrationStartLatch;

AtomicInteger[] migrationStarted;
AtomicInteger[] migrationCompleted;
AtomicInteger[] migrationFailed;

CountingMigrationListener(CountDownLatch migrationStartLatch, int partitionCount) {
this.migrationStartLatch = migrationStartLatch;
migrationStarted = new AtomicInteger[partitionCount];
migrationCompleted = new AtomicInteger[partitionCount];
migrationFailed = new AtomicInteger[partitionCount];
Expand All @@ -187,7 +185,6 @@ private static class CountingMigrationListener implements MigrationListener {

@Override
public void migrationStarted(MigrationEvent migrationEvent) {
assertOpenEventually(migrationStartLatch);
migrationStarted[migrationEvent.getPartitionId()].incrementAndGet();
}

Expand Down
Expand Up @@ -151,7 +151,7 @@ private void createInstances(Config config) {
h4 = factory.newHazelcastInstance(config);
h5 = factory.newHazelcastInstance(config);

assertClusterSize(5, h1, h4);
assertClusterSize(5, h1, h5);
assertClusterSizeEventually(5, h2, h3, h4);
}

Expand Down

0 comments on commit dee2e82

Please sign in to comment.