diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java index 6f0ac9d1caa24..dfd41862bbd1d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java @@ -69,7 +69,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config ExponentialBackoffStrategy retryStrategy = new ExponentialBackoffStrategy( 10, 60, SECONDS ); replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool, - progressTracker, retryStrategy ) ); + progressTracker, retryStrategy, logProvider ) ); } public RaftReplicator getReplicator() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Leader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Leader.java index 3f41a167308fb..50bab3bf9ccfd 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Leader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Leader.java @@ -281,5 +281,6 @@ private void stepDownToFollower( Outcome outcome ) receivedHeartbeats = false; outcome.steppingDown(); outcome.setNextRole( FOLLOWER ); + outcome.setLeader( null ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java index 8b3cee17b5708..118f000ae3102 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java @@ -32,6 +32,8 @@ import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; /** * A replicator implementation suitable in a RAFT context. Will handle resending due to timeouts and leader switches. @@ -43,13 +45,14 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List private final ProgressTracker progressTracker; private final LocalSessionPool sessionPool; private final RetryStrategy retryStrategy; + private final LeaderLocator leaderLocator; + private final Log log; - private MemberId leader; private volatile boolean shutdown; public RaftReplicator( LeaderLocator leaderLocator, MemberId me, - Outbound outbound, LocalSessionPool sessionPool, - ProgressTracker progressTracker, RetryStrategy retryStrategy ) + Outbound outbound, LocalSessionPool sessionPool, + ProgressTracker progressTracker, RetryStrategy retryStrategy, LogProvider logProvider ) { this.me = me; this.outbound = outbound; @@ -57,19 +60,14 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, this.sessionPool = sessionPool; this.retryStrategy = retryStrategy; - try - { - this.leader = leaderLocator.getLeader(); - } - catch ( NoLeaderFoundException e ) - { - this.leader = null; - } + this.leaderLocator = leaderLocator; leaderLocator.registerListener( this ); + log = logProvider.getLog( getClass() ); } @Override - public Future replicate( ReplicatedContent command, boolean trackResult ) throws InterruptedException + public Future replicate( ReplicatedContent command, boolean trackResult ) throws InterruptedException, NoLeaderFoundException + { OperationContext session = sessionPool.acquireSession(); @@ -80,9 +78,9 @@ public Future replicate( ReplicatedContent command, boolean trackResult do { assertDatabaseNotShutdown(); - outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ) ); try { + outbound.send( leaderLocator.getLeader(), new RaftMessages.NewEntry.Request( me, operation ) ); progress.awaitReplication( timeout.getMillis() ); timeout.increment(); } @@ -91,6 +89,10 @@ public Future replicate( ReplicatedContent command, boolean trackResult progressTracker.abort( operation ); throw e; } + catch ( NoLeaderFoundException e ) + { + log.debug( "Could not replicate operation " + operation + " because no leader was found. Retrying.", e ); + } } while( !progress.isReplicated() ); BiConsumer cleanup = ( ignored1, ignored2 ) -> sessionPool.releaseSession( session ); @@ -110,7 +112,6 @@ public Future replicate( ReplicatedContent command, boolean trackResult @Override public void receive( MemberId leader ) { - this.leader = leader; progressTracker.triggerReplicationEvent(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/Replicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/Replicator.java index b78a97be0f537..19b7120369206 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/Replicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/Replicator.java @@ -21,6 +21,8 @@ import java.util.concurrent.Future; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; + /** * Replicate content across a cluster of servers. */ @@ -36,5 +38,5 @@ public interface Replicator * * @return A future that will receive the result when available. Only valid if trackResult is set. */ - Future replicate( ReplicatedContent content, boolean trackResult ) throws InterruptedException; + Future replicate( ReplicatedContent content, boolean trackResult ) throws InterruptedException, NoLeaderFoundException; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/id/ReplicatedIdRangeAcquirer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/id/ReplicatedIdRangeAcquirer.java index e9c316893d8a0..1f7a269cdeaf4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/id/ReplicatedIdRangeAcquirer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/id/ReplicatedIdRangeAcquirer.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.replication.Replicator; import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.kernel.impl.store.id.IdRange; @@ -82,7 +83,7 @@ private boolean replicateIdAllocationRequest( IdType idType, ReplicatedIdAllocat { return (Boolean) replicator.replicate( idAllocationRequest, true ).get(); } - catch ( InterruptedException | ExecutionException e ) + catch ( InterruptedException | ExecutionException | NoLeaderFoundException e ) { log.error( format( "Failed to acquire id range for idType %s", idType ), e ); throw new IdGenerationException( e ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java index 39aaad66bb59a..86063f80a36dc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java @@ -112,6 +112,10 @@ private synchronized int acquireTokenOrThrow() { throw new AcquireLockTimeoutException( e, "Interrupted acquiring token.", Interrupted ); } + catch ( NoLeaderFoundException e ) + { + throw new AcquireLockTimeoutException( e, "Could not acquire lock token because no leader was found.", NoLeaderAvailable ); + } try { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolder.java index 568050deb177c..0e64d30a3a9e1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolder.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.replication.Replicator; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.schema.ConstraintValidationKernelException; @@ -98,7 +99,7 @@ private int requestToken( String tokenName ) Future future = replicator.replicate( tokenRequest, true ); return (int) future.get(); } - catch ( InterruptedException e ) + catch ( InterruptedException | NoLeaderFoundException e ) { throw new org.neo4j.graphdb.TransactionFailureException( "Could not create token", e ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionCommitProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionCommitProcess.java index bdb40127367d6..d52a24dfcb4e4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionCommitProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/ReplicatedTransactionCommitProcess.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.replication.Replicator; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.impl.api.TransactionCommitProcess; @@ -55,6 +56,10 @@ public long commit( final TransactionToApply tx, { throw new TransactionFailureException( "Interrupted replicating transaction.", e ); } + catch ( NoLeaderFoundException e ) + { + throw new TransactionFailureException( "No leader found while replicating transaction.", e ); + } try { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java index 056d86437b786..02e50a3830653 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineTest.java @@ -20,6 +20,7 @@ package org.neo4j.causalclustering.core.consensus; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; @@ -35,11 +36,13 @@ import org.neo4j.causalclustering.identity.RaftTestMemberSetBuilder; import org.neo4j.causalclustering.messaging.Inbound; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; +import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.KernelEventHandlers; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.NullLog; +import static junit.framework.TestCase.assertFalse; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.instanceOf; diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/LeaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/LeaderTest.java index 6cd8e7a0361f0..66aea41b26159 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/LeaderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/LeaderTest.java @@ -49,6 +49,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -372,6 +373,7 @@ public void leaderShouldStepDownWhenLackingHeartbeatResponses() throws Exception // given RaftState state = raftState() .votingMembers( asSet( myself, member1, member2 ) ) + .leader( myself ) .build(); Leader leader = new Leader(); @@ -383,6 +385,7 @@ public void leaderShouldStepDownWhenLackingHeartbeatResponses() throws Exception // then assertThat( outcome.getRole(), not( LEADER ) ); + assertNull( outcome.getLeader() ); } @Test diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java index f8caf719d0d3f..bc0c3daac0e16 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.messaging.Message; import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.RaftMessages; @@ -36,6 +37,7 @@ import org.neo4j.causalclustering.helper.RetryStrategy; import org.neo4j.causalclustering.core.state.Result; import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.logging.NullLogProvider; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static junit.framework.TestCase.assertEquals; @@ -67,7 +69,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, sessionPool, - capturedProgress, retryStrategy ); + capturedProgress, retryStrategy, NullLogProvider.getInstance() ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -94,7 +96,7 @@ public void shouldResendAfterTimeout() throws Exception CapturingOutbound outbound = new CapturingOutbound(); RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, - sessionPool, capturedProgress, retryStrategy ); + sessionPool, capturedProgress, retryStrategy, NullLogProvider.getInstance() ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -118,7 +120,7 @@ public void shouldReleaseSessionWhenFinished() throws Exception CapturingOutbound outbound = new CapturingOutbound(); RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound, - sessionPool, capturedProgress, retryStrategy ); + sessionPool, capturedProgress, retryStrategy, NullLogProvider.getInstance() ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, true ); @@ -158,7 +160,7 @@ private Thread replicatingThread( RaftReplicator replicator, ReplicatedInteger c } } } - catch ( InterruptedException e ) + catch ( InterruptedException | NoLeaderFoundException e ) { throw new IllegalStateException(); }