From 693322ca1a6a419a8d27378e65829600b00badd4 Mon Sep 17 00:00:00 2001 From: Max Sumrall Date: Tue, 31 May 2016 16:37:55 +0200 Subject: [PATCH] Improved test for catching up rejoined follower. The improved test explicitly checks the logs that have accumulated and also the logs which have been pruned and coordinates the shutdown and startup of the follower accordingly. --- .../neo4j/coreedge/raft/roles/Appending.java | 2 +- .../scenarios/CoreToCoreCopySnapshotIT.java | 127 ++++++++++++------ 2 files changed, 84 insertions(+), 45 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java index 95ef8281712fa..4d92a72c11b9b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java @@ -127,7 +127,7 @@ public static void appendNewEntry( ReadableRaftState ctx, Outco } public static void appendNewEntries( ReadableRaftState ctx, Outcome outcome, - List contents ) throws IOException, RaftLogCompactedException + List contents ) throws IOException { long prevLogIndex = ctx.entryLog().appendIndex(); long prevLogTerm = prevLogIndex == -1 ? -1 : diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java index 8448cf33f9869..4a08268e4333e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java @@ -25,11 +25,13 @@ import java.io.File; import java.util.Map; +import java.util.SortedMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeoutException; import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.discovery.TestOnlyDiscoveryServiceFactory; +import org.neo4j.coreedge.raft.log.segmented.FileNames; import org.neo4j.coreedge.raft.roles.Role; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.core.CoreGraphDatabase; @@ -37,6 +39,7 @@ import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Relationship; import org.neo4j.graphdb.RelationshipType; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.test.DbRepresentation; @@ -44,8 +47,12 @@ import static java.util.Collections.emptyMap; import static junit.framework.TestCase.assertEquals; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; +import static org.neo4j.coreedge.raft.log.segmented.SegmentedRaftLog.SEGMENTED_LOG_DIRECTORY_NAME; import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_log_pruning_frequency; import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_log_pruning_strategy; +import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME; import static org.neo4j.helpers.collection.MapUtil.stringMap; public class CoreToCoreCopySnapshotIT @@ -76,7 +83,7 @@ public void shouldBeAbleToDownloadFreshEmptySnapshot() throws Exception CoreGraphDatabase leader = cluster.awaitLeader( TIMEOUT_MS ); // when - CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( 5000, Role.FOLLOWER ); + CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); follower.downloadSnapshot( leader.id().getCoreAddress() ); // then @@ -161,62 +168,47 @@ public void shouldBeAbleToDownloadToRejoinedInstanceAfterPruning() throws Except File dbDir = dir.directory(); Map coreParams = stringMap(); - int pruneFrequencyMs = 100; coreParams.put( raft_log_pruning_strategy.name(), "keep_none" ); - coreParams.put( raft_log_pruning_frequency.name(), pruneFrequencyMs + "ms" ); + coreParams.put( raft_log_pruning_frequency.name(), "100ms" ); + int numberOfTransactions = 100; + //Start the cluster and accumulate some log files. try ( Cluster cluster = Cluster .start( dbDir, 3, 0, new TestOnlyDiscoveryServiceFactory(), coreParams, emptyMap(), StandardV3_0.NAME ) ) { - AtomicBoolean running = new AtomicBoolean( true ); - Thread thread = new Thread() + CoreGraphDatabase leader = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.LEADER ); + int followersLastLog = getMostRecentLogIdOn( leader ); + while ( followersLastLog < 2 ) { - @Override - public synchronized void run() - { - while ( running.get() ) - { - try - { - cluster.coreTx( ( db, tx ) -> { - Node node = db.createNode(); - node.setProperty( "that's a bam", string( 1024 ) ); - tx.success(); - } ); - } - catch ( Exception e ) - { - throw new RuntimeException( e ); - } - } - } - }; - - thread.start(); - - Thread.sleep( pruneFrequencyMs * 100 ); - CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( 5_000, Role.FOLLOWER ); + doSomeTransactions( cluster, numberOfTransactions ); + followersLastLog = getMostRecentLogIdOn( leader ); + } + + CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER ); follower.shutdown(); Config config = follower.getDependencyResolver().resolveDependency( Config.class ); - Thread.sleep( pruneFrequencyMs * 100 ); + /** + * After a follower is shutdown, wait until we accumulate some logs such that the oldest log is older than + * the most recent log when the follower was removed. We can then be sure that the follower won't be able + * to catch up to the leader without a snapshot. + */ - cluster.addCoreServerWithServerId( config.get( CoreEdgeClusterSettings.server_id ), 3 ); - running.set( false ); - thread.join(); - } - } + //when + int leadersOldestLog = getOldestLogIdOn( leader ); + while ( leadersOldestLog < followersLastLog + 10 ) + { + doSomeTransactions( cluster, numberOfTransactions ); + leadersOldestLog = getOldestLogIdOn( leader ); + } - private String string( int numberOfCharacters ) - { - StringBuffer s = new StringBuffer(); - for ( int i = 0; i < numberOfCharacters; i++ ) - { - s.append( String.valueOf( i ) ); + //then + assertThat( leadersOldestLog, greaterThan( followersLastLog ) ); + //The cluster member should join. Otherwise this line will hang forever. + cluster.addCoreServerWithServerId( config.get( CoreEdgeClusterSettings.server_id ), 3 ); } - return s.toString(); } static void createData( GraphDatabaseService db, int size ) @@ -233,4 +225,51 @@ static void createData( GraphDatabaseService db, int size ) rel.setProperty( "this", "that" ); } } + + private Integer getOldestLogIdOn( CoreGraphDatabase clusterMember ) throws TimeoutException + { + return getLogFileNames( clusterMember ).firstKey().intValue(); + } + + private Integer getMostRecentLogIdOn( CoreGraphDatabase clusterMember ) throws TimeoutException + { + return getLogFileNames( clusterMember ).lastKey().intValue(); + } + + private SortedMap getLogFileNames( CoreGraphDatabase clusterMember ) + { + File clusterDir = new File( clusterMember.getStoreDir(), CLUSTER_STATE_DIRECTORY_NAME ); + File logFilesDir = new File( clusterDir, SEGMENTED_LOG_DIRECTORY_NAME ); + return new FileNames( logFilesDir ).getAllFiles( new DefaultFileSystemAbstraction(), null ); + } + + private void doSomeTransactions( Cluster cluster, int count ) + { + try + { + for ( int i = 0; i < count; i++ ) + { + cluster.coreTx( ( db, tx ) -> { + Node node = db.createNode(); + node.setProperty( "that's a bam", string( 1024 ) ); + tx.success(); + } ); + + } + } + catch ( Exception e ) + { + throw new RuntimeException( e ); + } + } + + private String string( int numberOfCharacters ) + { + StringBuffer s = new StringBuffer(); + for ( int i = 0; i < numberOfCharacters; i++ ) + { + s.append( String.valueOf( i ) ); + } + return s.toString(); + } }