Skip to content

Commit

Permalink
Fix a shutdown issue in causal clustering
Browse files Browse the repository at this point in the history
Sometimes when you shutdown the cluster and the leader is trying to
replicate a transaction, it might happen that the replication effort
is failing since the followers are already gone and the transaction
therad is stuck in the replication loop forever.

This commit will fix that by making the replication code aware of when
the database is shutdown.
  • Loading branch information
davidegrohmann committed Nov 10, 2016
1 parent 22cbf9e commit 6a64c99
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 46 deletions.
Expand Up @@ -67,8 +67,8 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession );
progressTracker = new ProgressTrackerImpl( myGlobalSession );

replicator = new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool, progressTracker,
new ExponentialBackoffStrategy( 10, SECONDS ) );
replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool,
progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ) ) );

}

Expand Down
Expand Up @@ -31,11 +31,12 @@
import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

/**
* A replicator implementation suitable in a RAFT context. Will handle resending due to timeouts and leader switches.
*/
public class RaftReplicator implements Replicator, Listener<MemberId>
public class RaftReplicator extends LifecycleAdapter implements Replicator, Listener<MemberId>
{
private final MemberId me;
private final Outbound<MemberId,RaftMessages.RaftMessage> outbound;
Expand All @@ -44,6 +45,7 @@ public class RaftReplicator implements Replicator, Listener<MemberId>
private final RetryStrategy retryStrategy;

private MemberId leader;
private volatile boolean shutdown;

public RaftReplicator( LeaderLocator leaderLocator, MemberId me,
Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
Expand Down Expand Up @@ -77,6 +79,7 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult
RetryStrategy.Timeout timeout = retryStrategy.newTimeout();
do
{
assertDatabaseNotShutdown();
outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ) );
try
{
Expand Down Expand Up @@ -110,4 +113,18 @@ public void receive( MemberId leader )
this.leader = leader;
progressTracker.triggerReplicationEvent();
}

@Override
public void shutdown()
{
shutdown = true;
}

private void assertDatabaseNotShutdown() throws InterruptedException
{
if ( shutdown )
{
throw new InterruptedException( "Database has been shutdown, transaction cannot be replicated." );
}
}
}
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -60,6 +61,7 @@

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.neo4j.concurrent.Futures.combine;
import static org.neo4j.function.Predicates.await;
import static org.neo4j.function.Predicates.awaitEx;
Expand Down Expand Up @@ -254,9 +256,15 @@ public ReadReplica findAnyReadReplica()

public CoreClusterMember getDbWithRole( Role role )
{
return getDbWithAnyRole( role );
}

public CoreClusterMember getDbWithAnyRole( Role... roles )
{
Set<Role> roleSet = Arrays.stream( roles ).collect( toSet() );
for ( CoreClusterMember coreClusterMember : coreMembers.values() )
{
if ( coreClusterMember.database() != null && coreClusterMember.database().getRole().equals( role ) )
if ( coreClusterMember.database() != null && roleSet.contains( coreClusterMember.database().getRole() ) )
{
return coreClusterMember;
}
Expand Down
Expand Up @@ -24,10 +24,12 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.CoreGraphDatabase;
import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.discovery.Cluster;
Expand Down Expand Up @@ -153,48 +155,6 @@ public void shouldNotAllowTokenCreationFromAFollowerWithNoInitialTokens() throws
}
}

@Test
public void shouldNotAllowTokenCreationFromAFollower() throws Exception
{
// given
Label personLabel = Label.label( "Person" );

CoreClusterMember leader = cluster.coreTx( ( db, tx ) ->
{
db.createNode( personLabel );
tx.success();
} );

awaitForDataToBeApplied( leader );
dataMatchesEventually( leader, cluster.coreMembers() );

CoreClusterMember followerMember = cluster.getDbWithRole( Role.FOLLOWER );
CoreGraphDatabase follower = followerMember.database();

try(Transaction tx = follower.beginTx())
{
System.out.println( "all labels = " + asList( follower.getAllLabels() ) );

for ( Node node : follower.getAllNodes() )
{
System.out.println( "node = " + node + ", " + node.getLabels().toString());
}
}

// when
try ( Transaction tx = follower.beginTx() )
{
follower.findNodes(personLabel).next().setProperty( "name", "Mark" );
tx.success();
fail( "Should have thrown exception" );
}
catch ( WriteOperationsNotAllowedException ignored )
{
// expected
assertThat( ignored.getMessage(), containsString( "No write operations are allowed" ) );
}
}

private void awaitForDataToBeApplied( CoreClusterMember leader ) throws InterruptedException, TimeoutException
{
await( () -> countNodes(leader) > 0, 10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -302,6 +262,54 @@ public void shouldReplicateTransactionsToReplacementCoreMembers() throws Excepti
dataMatchesEventually( leader, cluster.coreMembers() );
}

@Test
public void shouldBeAbleToShutdownWhenTheLeaderIsTryingToReplicateTransaction() throws Exception
{
// given
cluster.coreTx( ( db, tx ) ->
{
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
} );

CountDownLatch latch = new CountDownLatch( 1 );

// when
Thread thread = new Thread()
{
@Override
public void run()
{
try
{
cluster.coreTx( ( db, tx ) ->
{
db.createNode();
tx.success();

cluster.removeCoreMember( cluster.getDbWithAnyRole( Role.FOLLOWER, Role.CANDIDATE ) );
cluster.removeCoreMember( cluster.getDbWithAnyRole( Role.FOLLOWER, Role.CANDIDATE ) );
latch.countDown();
} );
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
}
};

thread.start();

latch.await();

// then the cluster can shutdown...
cluster.shutdown();
// ... and the thread running the tx does not get stuck
thread.join( TimeUnit.MINUTES.toMillis( 1 ) );
}

private long countNodes( CoreClusterMember member )
{
CoreGraphDatabase db = member.database();
Expand Down

0 comments on commit 6a64c99

Please sign in to comment.