Skip to content

Commit

Permalink
Improved test for catching up rejoined follower.
Browse files Browse the repository at this point in the history
The improved test explicitly checks the logs that have accumulated
and also the logs which have been pruned and coordinates the
shutdown and startup of the follower accordingly.
  • Loading branch information
Max Sumrall committed Jun 1, 2016
1 parent 7d5af57 commit 693322c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 45 deletions.
Expand Up @@ -127,7 +127,7 @@ public static <MEMBER> void appendNewEntry( ReadableRaftState<MEMBER> ctx, Outco
}

public static <MEMBER> void appendNewEntries( ReadableRaftState<MEMBER> ctx, Outcome<MEMBER> outcome,
List<ReplicatedContent> contents ) throws IOException, RaftLogCompactedException
List<ReplicatedContent> contents ) throws IOException
{
long prevLogIndex = ctx.entryLog().appendIndex();
long prevLogTerm = prevLogIndex == -1 ? -1 :
Expand Down
Expand Up @@ -25,27 +25,34 @@

import java.io.File;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeoutException;

import org.neo4j.coreedge.discovery.Cluster;
import org.neo4j.coreedge.discovery.TestOnlyDiscoveryServiceFactory;
import org.neo4j.coreedge.raft.log.segmented.FileNames;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.server.CoreEdgeClusterSettings;
import org.neo4j.coreedge.server.core.CoreGraphDatabase;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;
import org.neo4j.test.DbRepresentation;
import org.neo4j.test.rule.TargetDirectory;

import static java.util.Collections.emptyMap;
import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertThat;
import static org.neo4j.coreedge.raft.log.segmented.SegmentedRaftLog.SEGMENTED_LOG_DIRECTORY_NAME;
import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_log_pruning_frequency;
import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_log_pruning_strategy;
import static org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME;
import static org.neo4j.helpers.collection.MapUtil.stringMap;

public class CoreToCoreCopySnapshotIT
Expand Down Expand Up @@ -76,7 +83,7 @@ public void shouldBeAbleToDownloadFreshEmptySnapshot() throws Exception
CoreGraphDatabase leader = cluster.awaitLeader( TIMEOUT_MS );

// when
CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( 5000, Role.FOLLOWER );
CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER );
follower.downloadSnapshot( leader.id().getCoreAddress() );

// then
Expand Down Expand Up @@ -161,62 +168,47 @@ public void shouldBeAbleToDownloadToRejoinedInstanceAfterPruning() throws Except
File dbDir = dir.directory();

Map<String,String> coreParams = stringMap();
int pruneFrequencyMs = 100;
coreParams.put( raft_log_pruning_strategy.name(), "keep_none" );
coreParams.put( raft_log_pruning_frequency.name(), pruneFrequencyMs + "ms" );
coreParams.put( raft_log_pruning_frequency.name(), "100ms" );
int numberOfTransactions = 100;

//Start the cluster and accumulate some log files.
try ( Cluster cluster = Cluster
.start( dbDir, 3, 0, new TestOnlyDiscoveryServiceFactory(), coreParams, emptyMap(),
StandardV3_0.NAME ) )
{

AtomicBoolean running = new AtomicBoolean( true );
Thread thread = new Thread()
CoreGraphDatabase leader = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.LEADER );
int followersLastLog = getMostRecentLogIdOn( leader );
while ( followersLastLog < 2 )
{
@Override
public synchronized void run()
{
while ( running.get() )
{
try
{
cluster.coreTx( ( db, tx ) -> {
Node node = db.createNode();
node.setProperty( "that's a bam", string( 1024 ) );
tx.success();
} );
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
}
}
};

thread.start();

Thread.sleep( pruneFrequencyMs * 100 );
CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( 5_000, Role.FOLLOWER );
doSomeTransactions( cluster, numberOfTransactions );
followersLastLog = getMostRecentLogIdOn( leader );
}

CoreGraphDatabase follower = cluster.awaitCoreGraphDatabaseWithRole( TIMEOUT_MS, Role.FOLLOWER );
follower.shutdown();
Config config = follower.getDependencyResolver().resolveDependency( Config.class );

Thread.sleep( pruneFrequencyMs * 100 );
/**
* After a follower is shutdown, wait until we accumulate some logs such that the oldest log is older than
* the most recent log when the follower was removed. We can then be sure that the follower won't be able
* to catch up to the leader without a snapshot.
*/

cluster.addCoreServerWithServerId( config.get( CoreEdgeClusterSettings.server_id ), 3 );
running.set( false );
thread.join();
}
}
//when
int leadersOldestLog = getOldestLogIdOn( leader );
while ( leadersOldestLog < followersLastLog + 10 )
{
doSomeTransactions( cluster, numberOfTransactions );
leadersOldestLog = getOldestLogIdOn( leader );
}

private String string( int numberOfCharacters )
{
StringBuffer s = new StringBuffer();
for ( int i = 0; i < numberOfCharacters; i++ )
{
s.append( String.valueOf( i ) );
//then
assertThat( leadersOldestLog, greaterThan( followersLastLog ) );
//The cluster member should join. Otherwise this line will hang forever.
cluster.addCoreServerWithServerId( config.get( CoreEdgeClusterSettings.server_id ), 3 );
}
return s.toString();
}

static void createData( GraphDatabaseService db, int size )
Expand All @@ -233,4 +225,51 @@ static void createData( GraphDatabaseService db, int size )
rel.setProperty( "this", "that" );
}
}

private Integer getOldestLogIdOn( CoreGraphDatabase clusterMember ) throws TimeoutException
{
return getLogFileNames( clusterMember ).firstKey().intValue();
}

private Integer getMostRecentLogIdOn( CoreGraphDatabase clusterMember ) throws TimeoutException
{
return getLogFileNames( clusterMember ).lastKey().intValue();
}

private SortedMap<Long,File> getLogFileNames( CoreGraphDatabase clusterMember )
{
File clusterDir = new File( clusterMember.getStoreDir(), CLUSTER_STATE_DIRECTORY_NAME );
File logFilesDir = new File( clusterDir, SEGMENTED_LOG_DIRECTORY_NAME );
return new FileNames( logFilesDir ).getAllFiles( new DefaultFileSystemAbstraction(), null );
}

private void doSomeTransactions( Cluster cluster, int count )
{
try
{
for ( int i = 0; i < count; i++ )
{
cluster.coreTx( ( db, tx ) -> {
Node node = db.createNode();
node.setProperty( "that's a bam", string( 1024 ) );
tx.success();
} );

}
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
}

private String string( int numberOfCharacters )
{
StringBuffer s = new StringBuffer();
for ( int i = 0; i < numberOfCharacters; i++ )
{
s.append( String.valueOf( i ) );
}
return s.toString();
}
}

0 comments on commit 693322c

Please sign in to comment.