Skip to content

Commit

Permalink
Couple improvements in CC token replication stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
lutovich committed Dec 14, 2018
1 parent 6cda7b6 commit 3476b65
Showing 1 changed file with 63 additions and 38 deletions.
Expand Up @@ -19,21 +19,23 @@
*/
package org.neo4j.causalclustering.scenarios;

import org.apache.commons.lang3.mutable.MutableInt;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;

import java.time.Clock;
import java.time.Duration;
import java.time.LocalTime;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.LongStream;

import org.neo4j.causalclustering.core.CoreGraphDatabase;
import org.neo4j.causalclustering.core.consensus.RaftServer;
Expand All @@ -55,12 +57,15 @@

import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertNotEquals;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.fail;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.leader_election_timeout;
import static org.neo4j.test.assertion.Assert.assertEventually;

public class TokenReplicationStressIT
{
Expand All @@ -80,13 +85,7 @@ public void tearDown()
{
if ( cluster != null )
{
try
{
cluster.shutdown();
}
catch ( Throwable ignore )
{
}
cluster.shutdown();
}
}

Expand All @@ -107,14 +106,13 @@ public void shouldReplicateTokensWithConcurrentElections() throws Throwable
stop.set( true );
allOperations.join();

verifyTokens( cluster );

// assert number of tokens on every cluster member is the same after a restart
// restart is needed to make sure tokens are persisted and not only in token caches
cluster.shutdown();
cluster.start();

verifyLabelTokens( cluster );
verifyPropertyKeyTokens( cluster );
verifyRelationshipTypeTokens( cluster );
verifyTokens( cluster );
}

private static void createTokens( Cluster cluster, LongSupplier tokenIdSupplier, AtomicBoolean stop )
Expand Down Expand Up @@ -163,20 +161,19 @@ private static void triggerElections( Cluster cluster, AtomicBoolean stop )
try
{
SECONDS.sleep( 5 );
CoreClusterMember leaderBefore = awaitLeader( cluster );
CoreClusterMember follower = randomClusterMember( cluster, leaderBefore );
CoreClusterMember leader = awaitLeader( cluster );
CoreClusterMember follower = randomClusterMember( cluster, leader );

RaftServer leaderRaftServer = raftServer( leaderBefore );
leaderRaftServer.stop();
// make the current leader unresponsive
RaftServer raftServer = raftServer( leader );
raftServer.stop();

// trigger an election and await until a new leader is elected
follower.raft().triggerElection( Clock.systemUTC() );
assertEventually( "Leader re-election did not happen", () -> awaitLeader( cluster ), not( equalTo( leader ) ), 1, MINUTES );

SECONDS.sleep( 1 );
leaderRaftServer.start();
CoreClusterMember leaderAfter = awaitLeader( cluster );

// assert that leader re-election has actually happened
assertNotEquals( leaderBefore.id(), leaderAfter.id() );
// make the previous leader responsive again
raftServer.start();
}
catch ( Throwable t )
{
Expand Down Expand Up @@ -223,6 +220,13 @@ private static CoreClusterMember randomClusterMember( Cluster cluster, CoreClust
return members[ThreadLocalRandom.current().nextInt( members.length )];
}

private void verifyTokens( Cluster cluster )
{
verifyLabelTokens( cluster );
verifyPropertyKeyTokens( cluster );
verifyRelationshipTypeTokens( cluster );
}

private void verifyLabelTokens( Cluster cluster )
{
verifyTokens( "Labels", cluster, this::allLabels );
Expand All @@ -245,22 +249,40 @@ private static void verifyTokens( String tokenType, Cluster cluster, Function<Co
.map( tokensExtractor )
.collect( toList() );

for ( List<String> tokens1 : tokensFromAllMembers )
for ( List<String> tokens : tokensFromAllMembers )
{
for ( List<String> tokens2 : tokensFromAllMembers )
{
if ( !tokens1.equals( tokens2 ) )
{
String tokensString = tokensFromAllMembers.stream()
.map( List::toString )
.collect( joining( "\n" ) );
assertTokensAreUnique( tokens );
}

fail( tokenType + " are not the same on different cluster members:\n" + tokensString );
}
}
if ( !allTokensEqual( tokensFromAllMembers ) )
{
String tokensString = tokensFromAllMembers.stream()
.map( List::toString )
.collect( joining( "\n" ) );

fail( tokenType + " are not the same on different cluster members:\n" + tokensString );
}
}

private static void assertTokensAreUnique( List<String> tokens )
{
Set<String> uniqueTokens = new HashSet<>( tokens );
if ( uniqueTokens.size() != tokens.size() )
{
fail( "Tokens contain duplicates: " + tokens );
}
}

private static boolean allTokensEqual( List<List<String>> tokensFromAllMembers )
{
long distinctSets = tokensFromAllMembers.stream()
.map( HashSet::new )
.distinct()
.count();

return distinctSets == 1;
}

private List<String> allLabels( CoreClusterMember member )
{
return allTokens( member, TokenAccess.LABELS )
Expand Down Expand Up @@ -294,14 +316,17 @@ private static <T> List<T> allTokens( CoreClusterMember member, TokenAccess<T> t

private static LongSupplier evenTokenIdsSupplier()
{
MutableInt id = new MutableInt();
return () -> id.getAndAdd( 2 );
return tokenIdsSupplier( 0 );
}

private static LongSupplier oddTokenIdsSupplier()
{
MutableInt id = new MutableInt( 1 );
return () -> id.getAndAdd( 2 );
return tokenIdsSupplier( 1 );
}

private static LongSupplier tokenIdsSupplier( long initialValue )
{
return LongStream.iterate( initialValue, i -> i + 2 ).iterator()::nextLong;
}

private static RaftServer raftServer( CoreClusterMember member )
Expand Down

0 comments on commit 3476b65

Please sign in to comment.