diff --git a/community/graphdb-api/src/main/java/org/neo4j/graphdb/DatabaseShutdownException.java b/community/graphdb-api/src/main/java/org/neo4j/graphdb/DatabaseShutdownException.java index d77f26451d904..1c0cbd73000e9 100644 --- a/community/graphdb-api/src/main/java/org/neo4j/graphdb/DatabaseShutdownException.java +++ b/community/graphdb-api/src/main/java/org/neo4j/graphdb/DatabaseShutdownException.java @@ -25,4 +25,9 @@ public DatabaseShutdownException( ) { super( "This database is shutdown." ); } + + public DatabaseShutdownException( String message ) + { + super( message ); + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java index e4a6f70a5b39e..15f6f5b1d8dc2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java @@ -68,7 +68,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config progressTracker = new ProgressTrackerImpl( myGlobalSession ); replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool, - progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ) ) ); + progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ), platformModule.availabilityGuard ) ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java index 7507af2719eed..478e363b02169 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java @@ -25,11 +25,13 @@ import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.RaftMessages; -import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; import org.neo4j.causalclustering.core.replication.session.OperationContext; import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.messaging.Outbound; +import org.neo4j.graphdb.DatabaseShutdownException; +import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -43,19 +45,20 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List private final ProgressTracker progressTracker; private final LocalSessionPool sessionPool; private final RetryStrategy retryStrategy; + private final AvailabilityGuard availabilityGuard; private MemberId leader; - private volatile boolean shutdown; public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound outbound, LocalSessionPool sessionPool, - ProgressTracker progressTracker, RetryStrategy retryStrategy ) + ProgressTracker progressTracker, RetryStrategy retryStrategy, AvailabilityGuard availabilityGuard ) { this.me = me; this.outbound = outbound; this.progressTracker = progressTracker; this.sessionPool = sessionPool; this.retryStrategy = retryStrategy; + this.availabilityGuard = availabilityGuard; try { @@ -114,17 +117,11 @@ public void receive( MemberId leader ) progressTracker.triggerReplicationEvent(); } - @Override - public void shutdown() - { - shutdown = true; - } - private void assertDatabaseNotShutdown() throws InterruptedException { - if ( shutdown ) + if ( availabilityGuard.isShutdown() ) { - throw new InterruptedException( "Database has been shutdown, transaction cannot be replicated." ); + throw new DatabaseShutdownException( "Database has been shutdown, transaction cannot be replicated." ); } } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java index a6fbfe96d9eb6..3c254848fb9d7 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java @@ -19,23 +19,31 @@ */ package org.neo4j.causalclustering.core.replication; +import org.hamcrest.Matchers; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.neo4j.causalclustering.messaging.Message; import org.neo4j.causalclustering.core.consensus.LeaderLocator; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.ReplicatedInteger; -import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.causalclustering.core.replication.session.GlobalSession; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; +import org.neo4j.causalclustering.core.state.Result; import org.neo4j.causalclustering.core.state.machines.tx.ConstantTimeRetryStrategy; import org.neo4j.causalclustering.core.state.machines.tx.RetryStrategy; -import org.neo4j.causalclustering.core.state.Result; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.causalclustering.messaging.Message; +import org.neo4j.causalclustering.messaging.Outbound; +import org.neo4j.graphdb.DatabaseShutdownException; +import org.neo4j.kernel.AvailabilityGuard; +import org.neo4j.logging.NullLog; +import org.neo4j.time.Clocks; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -43,12 +51,16 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.neo4j.test.assertion.Assert.assertEventually; public class RaftReplicatorTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private static final int DEFAULT_TIMEOUT_MS = 15_000; private LeaderLocator leaderLocator = mock( LeaderLocator.class ); @@ -57,6 +69,7 @@ public class RaftReplicatorTest private GlobalSession session = new GlobalSession( UUID.randomUUID(), myself ); private LocalSessionPool sessionPool = new LocalSessionPool( session ); private RetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 1, SECONDS ); + private AvailabilityGuard availabilityGuard = new AvailabilityGuard( Clocks.systemClock(), NullLog.getInstance() ); @Test public void shouldSendReplicatedContentToLeader() throws Exception @@ -68,7 +81,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, sessionPool, - capturedProgress, retryStrategy ); + capturedProgress, retryStrategy, availabilityGuard ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -96,7 +109,7 @@ public void shouldResendAfterTimeout() throws Exception ConstantTimeRetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 100, MILLISECONDS ); RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, - sessionPool, capturedProgress, retryStrategy ); + sessionPool, capturedProgress, retryStrategy, availabilityGuard ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -120,7 +133,7 @@ public void shouldReleaseSessionWhenFinished() throws Exception CapturingOutbound outbound = new CapturingOutbound(); RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, - sessionPool, capturedProgress, retryStrategy ); + sessionPool, capturedProgress, retryStrategy, availabilityGuard ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, true ); @@ -142,13 +155,55 @@ public void shouldReleaseSessionWhenFinished() throws Exception assertEquals( 0, sessionPool.openSessionCount() ); } - private Thread replicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult ) + @Test + public void stopReplicationOnShutdown() throws NoLeaderFoundException, InterruptedException + { + when( leaderLocator.getLeader() ).thenReturn( leader ); + CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); + CapturingOutbound outbound = new CapturingOutbound(); + + RaftReplicator replicator = + new RaftReplicator( leaderLocator, myself, outbound, sessionPool, capturedProgress, retryStrategy, + availabilityGuard ); + + ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); + ReplicatingThread replicatingThread = replicatingThread( replicator, content, true ); + + // when + replicatingThread.start(); + + availabilityGuard.shutdown(); + replicatingThread.join(); + assertThat( replicatingThread.getReplicationException(), Matchers.instanceOf( DatabaseShutdownException.class ) ); + } + + private ReplicatingThread replicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult ) { - return new Thread( () -> { + return new ReplicatingThread( replicator, content, trackResult ); + } + + private class ReplicatingThread extends Thread + { + + private final RaftReplicator replicator; + private final ReplicatedInteger content; + private final boolean trackResult; + private volatile Exception replicationException; + + ReplicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult ) + { + this.replicator = replicator; + this.content = content; + this.trackResult = trackResult; + } + + @Override + public void run() + { try { Future futureResult = replicator.replicate( content, trackResult ); - if( trackResult ) + if ( trackResult ) { try { @@ -156,15 +211,21 @@ private Thread replicatingThread( RaftReplicator replicator, ReplicatedInteger c } catch ( ExecutionException e ) { + replicationException = e; throw new IllegalStateException(); } } } - catch ( InterruptedException e ) + catch ( Exception e ) { - throw new IllegalStateException(); + replicationException = e; } - } ); + } + + public Exception getReplicationException() + { + return replicationException; + } } private class CapturingProgressTracker implements ProgressTracker diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java index b49b3651df168..73bf909fb1b3b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java @@ -36,6 +36,7 @@ import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.security.WriteOperationsNotAllowedException; import org.neo4j.test.causalclustering.ClusterRule; +import org.neo4j.test.rule.SuppressOutput; import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; @@ -51,6 +52,8 @@ public class CoreReplicationIT @Rule public final ClusterRule clusterRule = new ClusterRule( getClass() ).withNumberOfCoreMembers( 3 ).withNumberOfReadReplicas( 0 ); + @Rule + public SuppressOutput suppressOutput = SuppressOutput.suppressAll(); private Cluster cluster;