Skip to content

Commit

Permalink
Debug log when distributed operation is skipped and not applied
Browse files Browse the repository at this point in the history
  • Loading branch information
lutovich committed Dec 14, 2018
1 parent a6bbb88 commit 6cda7b6
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ private long handleOperations( long commandIndex, List<DistributedOperation> ope
{
if ( !sessionTracker.validateOperation( operation.globalSession(), operation.operationId() ) )
{
if ( log.isDebugEnabled() )
{
log.debug( "Skipped an invalid distributed operation: " + operation + ". Session tracker state: " + sessionTracker.snapshot() );
}
commandIndex++;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.LongSupplier;

import org.neo4j.causalclustering.core.CoreGraphDatabase;
import org.neo4j.causalclustering.core.consensus.RaftServer;
import org.neo4j.causalclustering.discovery.Cluster;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
Expand All @@ -45,6 +47,10 @@
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.impl.api.TokenAccess;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.test.causalclustering.ClusterRule;

import static java.util.concurrent.CompletableFuture.allOf;
Expand All @@ -60,24 +66,6 @@ public class TokenReplicationStressIT
{
private static final int EXECUTION_TIME_SECONDS = Integer.getInteger( "TokenReplicationStressTestExecutionTimeSeconds", 30 );

private static final String LIST_LABELS_QUERY = "CALL db.labels() " +
"YIELD label " +
"WITH label " +
"ORDER BY label " +
"RETURN collect(label) AS result";

private static final String LIST_PROPERTY_KEYS_QUERY = "CALL db.propertyKeys() " +
"YIELD propertyKey " +
"WITH propertyKey " +
"ORDER BY propertyKey " +
"RETURN collect(propertyKey) AS result";

private static final String LIST_RELATIONSHIP_TYPES_QUERY = "CALL db.relationshipTypes() " +
"YIELD relationshipType " +
"WITH relationshipType " +
"ORDER BY relationshipType " +
"RETURN collect(relationshipType) AS result";

@Rule
public final ClusterRule clusterRule = new ClusterRule( getClass() )
.withNumberOfCoreMembers( 3 )
Expand Down Expand Up @@ -178,7 +166,7 @@ private static void triggerElections( Cluster cluster, AtomicBoolean stop )
CoreClusterMember leaderBefore = awaitLeader( cluster );
CoreClusterMember follower = randomClusterMember( cluster, leaderBefore );

RaftServer leaderRaftServer = raftServerOf( leaderBefore );
RaftServer leaderRaftServer = raftServer( leaderBefore );
leaderRaftServer.stop();

follower.raft().triggerElection( Clock.systemUTC() );
Expand Down Expand Up @@ -225,11 +213,6 @@ private static CoreClusterMember awaitLeader( Cluster cluster )
}
}

private static RaftServer raftServerOf( CoreClusterMember leader )
{
return leader.database().getDependencyResolver().resolveDependency( RaftServer.class );
}

private static CoreClusterMember randomClusterMember( Cluster cluster, CoreClusterMember except )
{
CoreClusterMember[] members = cluster.coreMembers()
Expand All @@ -240,30 +223,26 @@ private static CoreClusterMember randomClusterMember( Cluster cluster, CoreClust
return members[ThreadLocalRandom.current().nextInt( members.length )];
}

private static void verifyLabelTokens( Cluster cluster )
private void verifyLabelTokens( Cluster cluster )
{
verifyTokens( "Labels", cluster, LIST_LABELS_QUERY );
verifyTokens( "Labels", cluster, this::allLabels );
}

private static void verifyPropertyKeyTokens( Cluster cluster )
private void verifyPropertyKeyTokens( Cluster cluster )
{
verifyTokens( "Property keys", cluster, LIST_PROPERTY_KEYS_QUERY );
verifyTokens( "Property keys", cluster, this::allPropertyKeys );
}

private static void verifyRelationshipTypeTokens( Cluster cluster )
private void verifyRelationshipTypeTokens( Cluster cluster )
{
verifyTokens( "Relationship types", cluster, LIST_RELATIONSHIP_TYPES_QUERY );
verifyTokens( "Relationship types", cluster, this::allRelationshipTypes );
}

@SuppressWarnings( "unchecked" )
private static void verifyTokens( String tokenType, Cluster cluster, String listTokensQuery )
private static void verifyTokens( String tokenType, Cluster cluster, Function<CoreClusterMember,List<String>> tokensExtractor )
{
List<List<String>> tokensFromAllMembers = cluster.coreMembers()
.stream()
.map( member -> member.database().execute( listTokensQuery ) )
.map( Iterators::single )
.map( record -> record.get( "result" ) )
.map( value -> (List<String>) value )
.map( tokensExtractor )
.collect( toList() );

for ( List<String> tokens1 : tokensFromAllMembers )
Expand All @@ -282,6 +261,37 @@ private static void verifyTokens( String tokenType, Cluster cluster, String list
}
}

private List<String> allLabels( CoreClusterMember member )
{
return allTokens( member, TokenAccess.LABELS )
.stream()
.map( Label::name )
.collect( toList() );
}

private List<String> allPropertyKeys( CoreClusterMember member )
{
return allTokens( member, TokenAccess.PROPERTY_KEYS );
}

private List<String> allRelationshipTypes( CoreClusterMember member )
{
return allTokens( member, TokenAccess.RELATIONSHIP_TYPES )
.stream()
.map( RelationshipType::name )
.collect( toList() );
}

private static <T> List<T> allTokens( CoreClusterMember member, TokenAccess<T> tokenAccess )
{
CoreGraphDatabase db = member.database();
try ( Transaction ignore = db.beginTx();
Statement statement = currentKernelTx( member ).acquireStatement() )
{
return Iterators.asList( tokenAccess.all( statement ) );
}
}

private static LongSupplier evenTokenIdsSupplier()
{
MutableInt id = new MutableInt();
Expand All @@ -293,4 +303,15 @@ private static LongSupplier oddTokenIdsSupplier()
MutableInt id = new MutableInt( 1 );
return () -> id.getAndAdd( 2 );
}

private static RaftServer raftServer( CoreClusterMember member )
{
return member.database().getDependencyResolver().resolveDependency( RaftServer.class );
}

private static KernelTransaction currentKernelTx( CoreClusterMember member )
{
ThreadToStatementContextBridge bridge = member.database().getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class );
return bridge.getKernelTransactionBoundToThisThread( true );
}
}

0 comments on commit 6cda7b6

Please sign in to comment.