Skip to content

Commit

Permalink
Use AvailabilityGuard to track when db is shutting down instead of li…
Browse files Browse the repository at this point in the history
…fecycle.

Use availibility guard to check shutdown state to be independent from order of components in life and shutdown raft replication as soon as we detect shutdown.
  • Loading branch information
MishaDemianenko committed Dec 13, 2016
1 parent b6728f7 commit de51914
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 24 deletions.
Expand Up @@ -25,4 +25,9 @@ public DatabaseShutdownException( )
{ {
super( "This database is shutdown." ); super( "This database is shutdown." );
} }

public DatabaseShutdownException( String message )
{
super( message );
}
} }
Expand Up @@ -68,7 +68,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
progressTracker = new ProgressTrackerImpl( myGlobalSession ); progressTracker = new ProgressTrackerImpl( myGlobalSession );


replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool, replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool,
progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ) ) ); progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ), platformModule.availabilityGuard ) );


} }


Expand Down
Expand Up @@ -25,11 +25,13 @@
import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.replication.session.OperationContext; import org.neo4j.causalclustering.core.replication.session.OperationContext;
import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy; import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;


Expand All @@ -43,19 +45,20 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List
private final ProgressTracker progressTracker; private final ProgressTracker progressTracker;
private final LocalSessionPool sessionPool; private final LocalSessionPool sessionPool;
private final RetryStrategy retryStrategy; private final RetryStrategy retryStrategy;
private final AvailabilityGuard availabilityGuard;


private MemberId leader; private MemberId leader;
private volatile boolean shutdown;


public RaftReplicator( LeaderLocator leaderLocator, MemberId me, public RaftReplicator( LeaderLocator leaderLocator, MemberId me,
Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, RetryStrategy retryStrategy ) ProgressTracker progressTracker, RetryStrategy retryStrategy, AvailabilityGuard availabilityGuard )
{ {
this.me = me; this.me = me;
this.outbound = outbound; this.outbound = outbound;
this.progressTracker = progressTracker; this.progressTracker = progressTracker;
this.sessionPool = sessionPool; this.sessionPool = sessionPool;
this.retryStrategy = retryStrategy; this.retryStrategy = retryStrategy;
this.availabilityGuard = availabilityGuard;


try try
{ {
Expand Down Expand Up @@ -114,17 +117,11 @@ public void receive( MemberId leader )
progressTracker.triggerReplicationEvent(); progressTracker.triggerReplicationEvent();
} }


@Override
public void shutdown()
{
shutdown = true;
}

private void assertDatabaseNotShutdown() throws InterruptedException private void assertDatabaseNotShutdown() throws InterruptedException
{ {
if ( shutdown ) if ( availabilityGuard.isShutdown() )
{ {
throw new InterruptedException( "Database has been shutdown, transaction cannot be replicated." ); throw new DatabaseShutdownException( "Database has been shutdown, transaction cannot be replicated." );
} }
} }
} }
Expand Up @@ -19,36 +19,48 @@
*/ */
package org.neo4j.causalclustering.core.replication; package org.neo4j.causalclustering.core.replication;


import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;


import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;


import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedInteger; import org.neo4j.causalclustering.core.consensus.ReplicatedInteger;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.core.replication.session.GlobalSession; import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.core.state.machines.tx.ConstantTimeRetryStrategy; import org.neo4j.causalclustering.core.state.machines.tx.ConstantTimeRetryStrategy;
import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy; import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.logging.NullLog;
import org.neo4j.time.Clocks;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertEquals;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.test.assertion.Assert.assertEventually; import static org.neo4j.test.assertion.Assert.assertEventually;


public class RaftReplicatorTest public class RaftReplicatorTest
{ {
@Rule
public ExpectedException expectedException = ExpectedException.none();

private static final int DEFAULT_TIMEOUT_MS = 15_000; private static final int DEFAULT_TIMEOUT_MS = 15_000;


private LeaderLocator leaderLocator = mock( LeaderLocator.class ); private LeaderLocator leaderLocator = mock( LeaderLocator.class );
Expand All @@ -57,6 +69,7 @@ public class RaftReplicatorTest
private GlobalSession session = new GlobalSession( UUID.randomUUID(), myself ); private GlobalSession session = new GlobalSession( UUID.randomUUID(), myself );
private LocalSessionPool sessionPool = new LocalSessionPool( session ); private LocalSessionPool sessionPool = new LocalSessionPool( session );
private RetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 1, SECONDS ); private RetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 1, SECONDS );
private AvailabilityGuard availabilityGuard = new AvailabilityGuard( Clocks.systemClock(), NullLog.getInstance() );


@Test @Test
public void shouldSendReplicatedContentToLeader() throws Exception public void shouldSendReplicatedContentToLeader() throws Exception
Expand All @@ -68,7 +81,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception


RaftReplicator replicator = RaftReplicator replicator =
new RaftReplicator( leaderLocator, myself, outbound, sessionPool, new RaftReplicator( leaderLocator, myself, outbound, sessionPool,
capturedProgress, retryStrategy ); capturedProgress, retryStrategy, availabilityGuard );


ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false ); Thread replicatingThread = replicatingThread( replicator, content, false );
Expand Down Expand Up @@ -96,7 +109,7 @@ public void shouldResendAfterTimeout() throws Exception


ConstantTimeRetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 100, MILLISECONDS ); ConstantTimeRetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 100, MILLISECONDS );
RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, retryStrategy ); sessionPool, capturedProgress, retryStrategy, availabilityGuard );


ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false ); Thread replicatingThread = replicatingThread( replicator, content, false );
Expand All @@ -120,7 +133,7 @@ public void shouldReleaseSessionWhenFinished() throws Exception
CapturingOutbound outbound = new CapturingOutbound(); CapturingOutbound outbound = new CapturingOutbound();


RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, retryStrategy ); sessionPool, capturedProgress, retryStrategy, availabilityGuard );


ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, true ); Thread replicatingThread = replicatingThread( replicator, content, true );
Expand All @@ -142,29 +155,77 @@ public void shouldReleaseSessionWhenFinished() throws Exception
assertEquals( 0, sessionPool.openSessionCount() ); assertEquals( 0, sessionPool.openSessionCount() );
} }


private Thread replicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult ) @Test
public void stopReplicationOnShutdown() throws NoLeaderFoundException, InterruptedException
{
when( leaderLocator.getLeader() ).thenReturn( leader );
CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
CapturingOutbound outbound = new CapturingOutbound();

RaftReplicator replicator =
new RaftReplicator( leaderLocator, myself, outbound, sessionPool, capturedProgress, retryStrategy,
availabilityGuard );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
ReplicatingThread replicatingThread = replicatingThread( replicator, content, true );

// when
replicatingThread.start();

availabilityGuard.shutdown();
replicatingThread.join();
assertThat( replicatingThread.getReplicationException(), Matchers.instanceOf( DatabaseShutdownException.class ) );
}

private ReplicatingThread replicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult )
{ {
return new Thread( () -> { return new ReplicatingThread( replicator, content, trackResult );
}

private class ReplicatingThread extends Thread
{

private final RaftReplicator replicator;
private final ReplicatedInteger content;
private final boolean trackResult;
private volatile Exception replicationException;

ReplicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult )
{
this.replicator = replicator;
this.content = content;
this.trackResult = trackResult;
}

@Override
public void run()
{
try try
{ {
Future<Object> futureResult = replicator.replicate( content, trackResult ); Future<Object> futureResult = replicator.replicate( content, trackResult );
if( trackResult ) if ( trackResult )
{ {
try try
{ {
futureResult.get(); futureResult.get();
} }
catch ( ExecutionException e ) catch ( ExecutionException e )
{ {
replicationException = e;
throw new IllegalStateException(); throw new IllegalStateException();
} }
} }
} }
catch ( InterruptedException e ) catch ( Exception e )
{ {
throw new IllegalStateException(); replicationException = e;
} }
} ); }

public Exception getReplicationException()
{
return replicationException;
}
} }


private class CapturingProgressTracker implements ProgressTracker private class CapturingProgressTracker implements ProgressTracker
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException; import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
import org.neo4j.test.causalclustering.ClusterRule; import org.neo4j.test.causalclustering.ClusterRule;
import org.neo4j.test.rule.SuppressOutput;


import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
Expand All @@ -51,6 +52,8 @@ public class CoreReplicationIT
@Rule @Rule
public final ClusterRule clusterRule = public final ClusterRule clusterRule =
new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 0 ); new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 0 );
@Rule
public SuppressOutput suppressOutput = SuppressOutput.suppressAll();


private Cluster cluster; private Cluster cluster;


Expand Down

0 comments on commit de51914

Please sign in to comment.