Skip to content

Commit

Permalink
core-edge: increase robustness of core snapshot IT
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Aug 11, 2016
1 parent 8ae3804 commit b4a03bf
Showing 1 changed file with 56 additions and 31 deletions.
Expand Up @@ -19,12 +19,14 @@
*/
package org.neo4j.coreedge.scenarios;

import java.util.Map;
import java.util.concurrent.TimeoutException;

import org.junit.Rule;
import org.junit.Test;

import java.time.Clock;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.neo4j.coreedge.catchup.storecopy.StoreFiles;
import org.neo4j.coreedge.core.CoreEdgeClusterSettings;
import org.neo4j.coreedge.core.CoreGraphDatabase;
Expand All @@ -36,12 +38,12 @@
import org.neo4j.test.DbRepresentation;
import org.neo4j.test.coreedge.ClusterRule;

import static org.hamcrest.Matchers.greaterThan;
import static java.time.Clock.systemUTC;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_frequency;
import static org.neo4j.coreedge.core.CoreEdgeClusterSettings.raft_log_pruning_strategy;
import static org.neo4j.coreedge.discovery.Cluster.dataMatchesEventually;
import static org.neo4j.coreedge.scenarios.SampleData.createData;
import static org.neo4j.helpers.collection.MapUtil.stringMap;

Expand Down Expand Up @@ -117,39 +119,60 @@ public void shouldBeAbleToDownloadToRejoinedInstanceAfterPruning() throws Except
coreParams.put( raft_log_pruning_strategy.name(), "keep_none" );
coreParams.put( raft_log_pruning_frequency.name(), "100ms" );
int numberOfTransactions = 100;
Timeout timeout = new Timeout( systemUTC(), 60, SECONDS );

//Start the cluster and accumulate some log files.
// start the cluster
Cluster cluster = clusterRule.withSharedCoreParams( coreParams ).startCluster();

CoreClusterMember leader = cluster.awaitCoreMemberWithRole( TIMEOUT_MS, Role.LEADER );
int followersLastLog = getMostRecentLogIdOn( leader );
while ( followersLastLog < 2 )
// accumulate some log files
int firstServerLogFileCount;
CoreClusterMember firstServer;
do
{
timeout.assertNotTimedOut();
firstServer = doSomeTransactions( cluster, numberOfTransactions );
firstServerLogFileCount = getMostRecentLogIdOn( firstServer );
}
while ( firstServerLogFileCount < 5 );
firstServer.shutdown();

/* After shutdown we wait until we accumulate enough logs, and so that enough of the old ones
* have been pruned, so that the rejoined instance won't be able to catch up to without a snapshot. */
int oldestLogOnSecondServer;
CoreClusterMember secondServer;
do
{
doSomeTransactions( cluster, numberOfTransactions );
followersLastLog = getMostRecentLogIdOn( leader );
timeout.assertNotTimedOut();
secondServer = doSomeTransactions( cluster, numberOfTransactions );
oldestLogOnSecondServer = getOldestLogIdOn( secondServer );
}
while ( oldestLogOnSecondServer < firstServerLogFileCount + 5 );

CoreClusterMember follower = cluster.awaitCoreMemberWithRole( TIMEOUT_MS, Role.FOLLOWER );
follower.shutdown();
// when
firstServer.start();

// then
dataMatchesEventually( firstServer, cluster.coreMembers() );
}

/*
* After a follower is shutdown, wait until we accumulate some logs such that the oldest log is older than
* the most recent log when the follower was removed. We can then be sure that the follower won't be able
* to catch up to the leader without a snapshot.
*/
private class Timeout
{
private final Clock clock;
private final long absoluteTimeoutMillis;

//when
int leadersOldestLog = getOldestLogIdOn( leader );
while ( leadersOldestLog < followersLastLog + 10 )
Timeout( Clock clock, long time, TimeUnit unit )
{
doSomeTransactions( cluster, numberOfTransactions );
leadersOldestLog = getOldestLogIdOn( leader );
this.clock = clock;
this.absoluteTimeoutMillis = clock.millis() + unit.toMillis( time );
}

//then
assertThat( leadersOldestLog, greaterThan( followersLastLog ) );
//The cluster member should join. Otherwise this line will hang forever.
follower.start();
void assertNotTimedOut()
{
if ( clock.millis() > absoluteTimeoutMillis )
{
throw new AssertionError( "Timed out" );
}
}
}

private int getOldestLogIdOn( CoreClusterMember clusterMember ) throws TimeoutException
Expand All @@ -162,19 +185,21 @@ private int getMostRecentLogIdOn( CoreClusterMember clusterMember ) throws Timeo
return clusterMember.getLogFileNames().lastKey().intValue();
}

private void doSomeTransactions( Cluster cluster, int count )
private CoreClusterMember doSomeTransactions( Cluster cluster, int count )
{
try
{
CoreClusterMember last = null;
for ( int i = 0; i < count; i++ )
{
cluster.coreTx( ( db, tx ) -> {
last = cluster.coreTx( ( db, tx ) ->
{
Node node = db.createNode();
node.setProperty( "that's a bam", string( 1024 ) );
tx.success();
} );

}
return last;
}
catch ( Exception e )
{
Expand Down

0 comments on commit b4a03bf

Please sign in to comment.