From b4a03bf933f0084e54925029e43dbd9f0208d0af Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 11 Aug 2016 16:24:33 +0200 Subject: [PATCH] core-edge: increase robustness of core snapshot IT --- .../scenarios/CoreToCoreCopySnapshotIT.java | 87 ++++++++++++------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java index f0287bafb8416..bfd382d8bf88d 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java @@ -19,12 +19,14 @@ */ package org.neo4j.coreedge.scenarios; -import java.util.Map; -import java.util.concurrent.TimeoutException; - import org.junit.Rule; import org.junit.Test; +import java.time.Clock; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import org.neo4j.coreedge.catchup.storecopy.StoreFiles; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.core.CoreGraphDatabase; @@ -36,12 +38,12 @@ import org.neo4j.test.DbRepresentation; import org.neo4j.test.coreedge.ClusterRule; -import static org.hamcrest.Matchers.greaterThan; +import static java.time.Clock.systemUTC; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_frequency; import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_strategy; +import static org.neo4j.coreedge.discovery.Cluster.dataMatchesEventually; import static org.neo4j.coreedge.scenarios.SampleData.createData; import static org.neo4j.helpers.collection.MapUtil.stringMap; @@ -117,39 +119,60 @@ public void shouldBeAbleToDownloadToRejoinedInstanceAfterPruning() throws Except coreParams.put( raft_log_pruning_strategy.name(), "keep_none" ); coreParams.put( raft_log_pruning_frequency.name(), "100ms" ); int numberOfTransactions = 100; + Timeout timeout = new Timeout( systemUTC(), 60, SECONDS ); - //Start the cluster and accumulate some log files. + // start the cluster Cluster cluster = clusterRule.withSharedCoreParams( coreParams ).startCluster(); - CoreClusterMember leader = cluster.awaitCoreMemberWithRole( TIMEOUT_MS, Role.LEADER ); - int followersLastLog = getMostRecentLogIdOn( leader ); - while ( followersLastLog < 2 ) + // accumulate some log files + int firstServerLogFileCount; + CoreClusterMember firstServer; + do + { + timeout.assertNotTimedOut(); + firstServer = doSomeTransactions( cluster, numberOfTransactions ); + firstServerLogFileCount = getMostRecentLogIdOn( firstServer ); + } + while ( firstServerLogFileCount < 5 ); + firstServer.shutdown(); + + /* After shutdown we wait until we accumulate enough logs, and so that enough of the old ones + * have been pruned, so that the rejoined instance won't be able to catch up to without a snapshot. */ + int oldestLogOnSecondServer; + CoreClusterMember secondServer; + do { - doSomeTransactions( cluster, numberOfTransactions ); - followersLastLog = getMostRecentLogIdOn( leader ); + timeout.assertNotTimedOut(); + secondServer = doSomeTransactions( cluster, numberOfTransactions ); + oldestLogOnSecondServer = getOldestLogIdOn( secondServer ); } + while ( oldestLogOnSecondServer < firstServerLogFileCount + 5 ); - CoreClusterMember follower = cluster.awaitCoreMemberWithRole( TIMEOUT_MS, Role.FOLLOWER ); - follower.shutdown(); + // when + firstServer.start(); + + // then + dataMatchesEventually( firstServer, cluster.coreMembers() ); + } - /* - * After a follower is shutdown, wait until we accumulate some logs such that the oldest log is older than - * the most recent log when the follower was removed. We can then be sure that the follower won't be able - * to catch up to the leader without a snapshot. - */ + private class Timeout + { + private final Clock clock; + private final long absoluteTimeoutMillis; - //when - int leadersOldestLog = getOldestLogIdOn( leader ); - while ( leadersOldestLog < followersLastLog + 10 ) + Timeout( Clock clock, long time, TimeUnit unit ) { - doSomeTransactions( cluster, numberOfTransactions ); - leadersOldestLog = getOldestLogIdOn( leader ); + this.clock = clock; + this.absoluteTimeoutMillis = clock.millis() + unit.toMillis( time ); } - //then - assertThat( leadersOldestLog, greaterThan( followersLastLog ) ); - //The cluster member should join. Otherwise this line will hang forever. - follower.start(); + void assertNotTimedOut() + { + if ( clock.millis() > absoluteTimeoutMillis ) + { + throw new AssertionError( "Timed out" ); + } + } } private int getOldestLogIdOn( CoreClusterMember clusterMember ) throws TimeoutException @@ -162,19 +185,21 @@ private int getMostRecentLogIdOn( CoreClusterMember clusterMember ) throws Timeo return clusterMember.getLogFileNames().lastKey().intValue(); } - private void doSomeTransactions( Cluster cluster, int count ) + private CoreClusterMember doSomeTransactions( Cluster cluster, int count ) { try { + CoreClusterMember last = null; for ( int i = 0; i < count; i++ ) { - cluster.coreTx( ( db, tx ) -> { + last = cluster.coreTx( ( db, tx ) -> + { Node node = db.createNode(); node.setProperty( "that's a bam", string( 1024 ) ); tx.success(); } ); - } + return last; } catch ( Exception e ) {