From 3476b65ae1dee2c2ca01fa2d08734e115d2954d4 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 13 Dec 2018 12:30:20 +0100 Subject: [PATCH] Couple improvements in CC token replication stress test --- .../scenarios/TokenReplicationStressIT.java | 101 +++++++++++------- 1 file changed, 63 insertions(+), 38 deletions(-) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/TokenReplicationStressIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/TokenReplicationStressIT.java index c34a87f7251de..47e57ffe8ae7c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/TokenReplicationStressIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/TokenReplicationStressIT.java @@ -19,7 +19,6 @@ */ package org.neo4j.causalclustering.scenarios; -import org.apache.commons.lang3.mutable.MutableInt; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -27,13 +26,16 @@ import java.time.Clock; import java.time.Duration; import java.time.LocalTime; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.stream.LongStream; import org.neo4j.causalclustering.core.CoreGraphDatabase; import org.neo4j.causalclustering.core.consensus.RaftServer; @@ -55,12 +57,15 @@ import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; -import static org.junit.Assert.assertNotEquals; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.fail; import static org.neo4j.causalclustering.core.CausalClusteringSettings.leader_election_timeout; +import static org.neo4j.test.assertion.Assert.assertEventually; public class TokenReplicationStressIT { @@ -80,13 +85,7 @@ public void tearDown() { if ( cluster != null ) { - try - { - cluster.shutdown(); - } - catch ( Throwable ignore ) - { - } + cluster.shutdown(); } } @@ -107,14 +106,13 @@ public void shouldReplicateTokensWithConcurrentElections() throws Throwable stop.set( true ); allOperations.join(); + verifyTokens( cluster ); + // assert number of tokens on every cluster member is the same after a restart // restart is needed to make sure tokens are persisted and not only in token caches cluster.shutdown(); cluster.start(); - - verifyLabelTokens( cluster ); - verifyPropertyKeyTokens( cluster ); - verifyRelationshipTypeTokens( cluster ); + verifyTokens( cluster ); } private static void createTokens( Cluster cluster, LongSupplier tokenIdSupplier, AtomicBoolean stop ) @@ -163,20 +161,19 @@ private static void triggerElections( Cluster cluster, AtomicBoolean stop ) try { SECONDS.sleep( 5 ); - CoreClusterMember leaderBefore = awaitLeader( cluster ); - CoreClusterMember follower = randomClusterMember( cluster, leaderBefore ); + CoreClusterMember leader = awaitLeader( cluster ); + CoreClusterMember follower = randomClusterMember( cluster, leader ); - RaftServer leaderRaftServer = raftServer( leaderBefore ); - leaderRaftServer.stop(); + // make the current leader unresponsive + RaftServer raftServer = raftServer( leader ); + raftServer.stop(); + // trigger an election and await until a new leader is elected follower.raft().triggerElection( Clock.systemUTC() ); + assertEventually( "Leader re-election did not happen", () -> awaitLeader( cluster ), not( equalTo( leader ) ), 1, MINUTES ); - SECONDS.sleep( 1 ); - leaderRaftServer.start(); - CoreClusterMember leaderAfter = awaitLeader( cluster ); - - // assert that leader re-election has actually happened - assertNotEquals( leaderBefore.id(), leaderAfter.id() ); + // make the previous leader responsive again + raftServer.start(); } catch ( Throwable t ) { @@ -223,6 +220,13 @@ private static CoreClusterMember randomClusterMember( Cluster cluster, CoreClust return members[ThreadLocalRandom.current().nextInt( members.length )]; } + private void verifyTokens( Cluster cluster ) + { + verifyLabelTokens( cluster ); + verifyPropertyKeyTokens( cluster ); + verifyRelationshipTypeTokens( cluster ); + } + private void verifyLabelTokens( Cluster cluster ) { verifyTokens( "Labels", cluster, this::allLabels ); @@ -245,22 +249,40 @@ private static void verifyTokens( String tokenType, Cluster cluster, Function tokens1 : tokensFromAllMembers ) + for ( List tokens : tokensFromAllMembers ) { - for ( List tokens2 : tokensFromAllMembers ) - { - if ( !tokens1.equals( tokens2 ) ) - { - String tokensString = tokensFromAllMembers.stream() - .map( List::toString ) - .collect( joining( "\n" ) ); + assertTokensAreUnique( tokens ); + } - fail( tokenType + " are not the same on different cluster members:\n" + tokensString ); - } - } + if ( !allTokensEqual( tokensFromAllMembers ) ) + { + String tokensString = tokensFromAllMembers.stream() + .map( List::toString ) + .collect( joining( "\n" ) ); + + fail( tokenType + " are not the same on different cluster members:\n" + tokensString ); + } + } + + private static void assertTokensAreUnique( List tokens ) + { + Set uniqueTokens = new HashSet<>( tokens ); + if ( uniqueTokens.size() != tokens.size() ) + { + fail( "Tokens contain duplicates: " + tokens ); } } + private static boolean allTokensEqual( List> tokensFromAllMembers ) + { + long distinctSets = tokensFromAllMembers.stream() + .map( HashSet::new ) + .distinct() + .count(); + + return distinctSets == 1; + } + private List allLabels( CoreClusterMember member ) { return allTokens( member, TokenAccess.LABELS ) @@ -294,14 +316,17 @@ private static List allTokens( CoreClusterMember member, TokenAccess t private static LongSupplier evenTokenIdsSupplier() { - MutableInt id = new MutableInt(); - return () -> id.getAndAdd( 2 ); + return tokenIdsSupplier( 0 ); } private static LongSupplier oddTokenIdsSupplier() { - MutableInt id = new MutableInt( 1 ); - return () -> id.getAndAdd( 2 ); + return tokenIdsSupplier( 1 ); + } + + private static LongSupplier tokenIdsSupplier( long initialValue ) + { + return LongStream.iterate( initialValue, i -> i + 2 ).iterator()::nextLong; } private static RaftServer raftServer( CoreClusterMember member )