diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java index 5f888136b7447..e3eec8c423857 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/ReplicationModule.java @@ -27,7 +27,7 @@ import java.util.UUID; import org.neo4j.causalclustering.core.CausalClusteringSettings; -import org.neo4j.causalclustering.core.consensus.ConsensusModule; +import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.replication.ProgressTrackerImpl; import org.neo4j.causalclustering.core.replication.RaftReplicator; @@ -35,7 +35,6 @@ import org.neo4j.causalclustering.core.replication.session.GlobalSessionTrackerState; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; import org.neo4j.causalclustering.core.state.storage.DurableStateStorage; -import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.helper.TimeoutStrategy; import org.neo4j.causalclustering.identity.MemberId; @@ -55,8 +54,8 @@ public class ReplicationModule private final ProgressTrackerImpl progressTracker; private final SessionTracker sessionTracker; - public ReplicationModule( MemberId myself, PlatformModule platformModule, Config config, - ConsensusModule consensusModule, Outbound outbound, + public ReplicationModule( RaftMachine raftMachine, MemberId myself, PlatformModule platformModule, Config config, + Outbound outbound, File clusterStateDirectory, FileSystemAbstraction fileSystem, LogProvider logProvider, AvailabilityGuard globalAvailabilityGuard ) { LifeSupport life = platformModule.life; @@ -74,20 +73,15 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config Duration initialBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_base ); Duration upperBoundBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_limit ); - Duration leaderBackoff = config.get( CausalClusteringSettings.replication_leader_retry_timeout ); TimeoutStrategy progressRetryStrategy = new ExponentialBackoffStrategy( initialBackoff, upperBoundBackoff ); - TimeoutStrategy leaderRetryStrategy = new ConstantTimeTimeoutStrategy( leaderBackoff ); long availabilityTimeoutMillis = config.get( CausalClusteringSettings.replication_retry_timeout_base ).toMillis(); replicator = new RaftReplicator( - consensusModule.raftMachine(), + raftMachine, myself, outbound, sessionPool, - progressTracker, - progressRetryStrategy, - leaderRetryStrategy, - availabilityTimeoutMillis, + progressTracker, progressRetryStrategy, availabilityTimeoutMillis, globalAvailabilityGuard, logProvider, platformModule.monitors ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index d307d9c5d3450..713373b01dc2d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -350,10 +350,6 @@ public HostnameResolver getHostnameResolver( LogProvider logProvider, LogProvide public static final Setting replication_retry_timeout_limit = setting( "causal_clustering.replication_retry_timeout_limit", DURATION, "60s" ); - @Description( "The retry timeout for finding a leader for replication. Relevant during leader elections." ) - public static final Setting replication_leader_retry_timeout = - setting( "causal_clustering.replication_leader", DURATION, "500ms" ); - @Description( "The number of operations to be processed before the state machines flush to disk" ) public static final Setting state_machine_flush_window_size = setting( "causal_clustering.state_machine_flush_window_size", INTEGER, "4096" ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index 254019ad5b3e0..ff0f1b9ea0c9a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -297,9 +297,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, dependencies.satisfyDependency( consensusModule.raftMachine() ); - replicationModule = - new ReplicationModule( identityModule.myself(), platformModule, config, consensusModule, - loggingOutbound, clusterStateDirectory.get(), fileSystem, logProvider, globalGuard ); + replicationModule = new ReplicationModule( consensusModule.raftMachine(), identityModule.myself(), platformModule, config, loggingOutbound, + clusterStateDirectory.get(), fileSystem, logProvider, globalGuard ); coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(), platformModule, clusterStateDirectory.get(), config, replicationModule.getReplicator(), diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java index 2007fa7672d03..11d14c56b8aed 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java @@ -28,7 +28,6 @@ import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.LeaderListener; import org.neo4j.causalclustering.core.consensus.LeaderLocator; -import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; @@ -53,46 +52,38 @@ public class RaftReplicator implements Replicator, LeaderListener private final LocalSessionPool sessionPool; private final TimeoutStrategy progressTimeoutStrategy; private final AvailabilityGuard availabilityGuard; - private final LeaderLocator leaderLocator; - private final TimeoutStrategy leaderTimeoutStrategy; private final Log log; private final ReplicationMonitor replicationMonitor; private final long availabilityTimeoutMillis; + private volatile MemberId lastKnownLeader; public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound outbound, LocalSessionPool sessionPool, - ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy, long availabilityTimeoutMillis, - AvailabilityGuard availabilityGuard, LogProvider logProvider, Monitors monitors ) + ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, long availabilityTimeoutMillis, AvailabilityGuard availabilityGuard, + LogProvider logProvider, Monitors monitors ) { this.me = me; this.outbound = outbound; this.progressTracker = progressTracker; this.sessionPool = sessionPool; this.progressTimeoutStrategy = progressTimeoutStrategy; - this.leaderTimeoutStrategy = leaderTimeoutStrategy; this.availabilityTimeoutMillis = availabilityTimeoutMillis; this.availabilityGuard = availabilityGuard; - this.leaderLocator = leaderLocator; - leaderLocator.registerListener( this ); - log = logProvider.getLog( getClass() ); + this.log = logProvider.getLog( getClass() ); this.replicationMonitor = monitors.newMonitor( ReplicationMonitor.class ); + leaderLocator.registerListener( this ); } @Override public Future replicate( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException { - MemberId originalLeader; - try - { - originalLeader = leaderLocator.getLeader(); - } - catch ( NoLeaderFoundException e ) + if ( lastKnownLeader == null ) { - throw new ReplicationFailureException( "Replication aborted since no leader was available", e ); + throw new ReplicationFailureException( "Replication aborted since no leader was available" ); } - return replicate0( command, trackResult, originalLeader ); + return replicate0( command, trackResult ); } - private Future replicate0( ReplicatedContent command, boolean trackResult, MemberId leader ) throws ReplicationFailureException + private Future replicate0( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException { replicationMonitor.startReplication(); try @@ -103,7 +94,6 @@ private Future replicate0( ReplicatedContent command, boolean trackResul Progress progress = progressTracker.start( operation ); TimeoutStrategy.Timeout progressTimeout = progressTimeoutStrategy.newTimeout(); - TimeoutStrategy.Timeout leaderTimeout = leaderTimeoutStrategy.newTimeout(); int attempts = 0; try { @@ -116,24 +106,14 @@ private Future replicate0( ReplicatedContent command, boolean trackResul } replicationMonitor.replicationAttempt(); assertDatabaseAvailable(); - try + // blocking at least until the send has succeeded or failed before retrying + outbound.send( lastKnownLeader, new RaftMessages.NewEntry.Request( me, operation ), true ); + progress.awaitReplication( progressTimeout.getMillis() ); + if ( progress.isReplicated() ) { - // blocking at least until the send has succeeded or failed before retrying - outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ), true ); - progress.awaitReplication( progressTimeout.getMillis() ); - if ( progress.isReplicated() ) - { - break; - } - progressTimeout.increment(); - leader = leaderLocator.getLeader(); - } - catch ( NoLeaderFoundException e ) - { - log.debug( "Could not replicate operation " + operation + " because no leader was found. Retrying.", e ); - Thread.sleep( leaderTimeout.getMillis() ); - leaderTimeout.increment(); + break; } + progressTimeout.increment(); } } catch ( InterruptedException e ) @@ -167,6 +147,10 @@ private Future replicate0( ReplicatedContent command, boolean trackResul public void onLeaderSwitch( LeaderInfo leaderInfo ) { progressTracker.triggerReplicationEvent(); + if ( leaderInfo.memberId() != null ) + { + lastKnownLeader = leaderInfo.memberId(); + } } private void assertDatabaseAvailable() throws ReplicationFailureException diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java index bda310f6a3c1a..168cbe8c2de58 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java @@ -31,8 +31,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.LeaderLocator; -import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.ReplicatedInteger; import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor; @@ -64,7 +64,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.DEFAULT_DATABASE_NAME; import static org.neo4j.test.assertion.Assert.assertEventually; @@ -74,12 +73,10 @@ public class RaftReplicatorTest public ExpectedException expectedException = ExpectedException.none(); private static final int DEFAULT_TIMEOUT_MS = 15_000; - private static final long REPLICATION_LIMIT = 1000; private LeaderLocator leaderLocator = mock( LeaderLocator.class ); private MemberId myself = new MemberId( UUID.randomUUID() ); - private MemberId leader = new MemberId( UUID.randomUUID() ); - private MemberId anotherLeader = new MemberId( UUID.randomUUID() ); + private LeaderInfo leaderInfo = new LeaderInfo( new MemberId( UUID.randomUUID() ), 1 ); private GlobalSession session = new GlobalSession( UUID.randomUUID(), myself ); private LocalSessionPool sessionPool = new LocalSessionPool( session ); private TimeoutStrategy noWaitTimeoutStrategy = new ConstantTimeTimeoutStrategy( 0, MILLISECONDS ); @@ -93,11 +90,11 @@ public void shouldSendReplicatedContentToLeader() throws Exception Monitors monitors = new Monitors(); ReplicationMonitor replicationMonitor = mock( ReplicationMonitor.class ); monitors.addMonitorListener( replicationMonitor ); - when( leaderLocator.getLeader() ).thenReturn( leader ); CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); RaftReplicator replicator = getReplicator( outbound, capturedProgress, monitors ); + replicator.onLeaderSwitch( leaderInfo ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -112,7 +109,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception // then replicatingThread.join( DEFAULT_TIMEOUT_MS ); - assertEquals( leader, outbound.lastTo ); + assertEquals( leaderInfo.memberId(), outbound.lastTo ); verify( replicationMonitor, times( 1 ) ).startReplication(); verify( replicationMonitor, atLeast( 1 ) ).replicationAttempt(); @@ -127,11 +124,11 @@ public void shouldResendAfterTimeout() throws Exception Monitors monitors = new Monitors(); ReplicationMonitor replicationMonitor = mock( ReplicationMonitor.class ); monitors.addMonitorListener( replicationMonitor ); - when( leaderLocator.getLeader() ).thenReturn( leader ); CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); RaftReplicator replicator = getReplicator( outbound, capturedProgress, monitors ); + replicator.onLeaderSwitch( leaderInfo ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, false ); @@ -155,11 +152,11 @@ public void shouldResendAfterTimeout() throws Exception public void shouldReleaseSessionWhenFinished() throws Exception { // given - when( leaderLocator.getLeader() ).thenReturn( leader ); CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); RaftReplicator replicator = getReplicator( outbound, capturedProgress, new Monitors() ); + replicator.onLeaderSwitch( leaderInfo ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); Thread replicatingThread = replicatingThread( replicator, content, true ); @@ -181,17 +178,17 @@ public void shouldReleaseSessionWhenFinished() throws Exception } @Test - public void stopReplicationOnShutdown() throws NoLeaderFoundException, InterruptedException + public void stopReplicationOnShutdown() throws InterruptedException { // given Monitors monitors = new Monitors(); ReplicationMonitor replicationMonitor = mock( ReplicationMonitor.class ); monitors.addMonitorListener( replicationMonitor ); - when( leaderLocator.getLeader() ).thenReturn( leader ); CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); RaftReplicator replicator = getReplicator( outbound, capturedProgress, monitors ); + replicator.onLeaderSwitch( leaderInfo ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatingThread replicatingThread = replicatingThread( replicator, content, true ); @@ -209,13 +206,13 @@ public void stopReplicationOnShutdown() throws NoLeaderFoundException, Interrupt } @Test - public void stopReplicationWhenUnavailable() throws NoLeaderFoundException, InterruptedException + public void stopReplicationWhenUnavailable() throws InterruptedException { - when( leaderLocator.getLeader() ).thenReturn( leader ); CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); RaftReplicator replicator = getReplicator( outbound, capturedProgress, new Monitors() ); + replicator.onLeaderSwitch( leaderInfo ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); ReplicatingThread replicatingThread = replicatingThread( replicator, content, true ); @@ -229,11 +226,9 @@ public void stopReplicationWhenUnavailable() throws NoLeaderFoundException, Inte } @Test - public void shouldFailIfNoLeaderIsAvailable() throws NoLeaderFoundException + public void shouldFailIfNoLeaderIsAvailable() { // given - when( leaderLocator.getLeader() ).thenThrow( NoLeaderFoundException.class ); - CapturingProgressTracker capturedProgress = new CapturingProgressTracker(); CapturingOutbound outbound = new CapturingOutbound<>(); @@ -252,10 +247,37 @@ public void shouldFailIfNoLeaderIsAvailable() throws NoLeaderFoundException } } - private RaftReplicator getReplicator( CapturingOutbound outbound, CapturingProgressTracker capturedProgress, Monitors monitors ) + @Test + public void shouldListenToLeaderUpdates() throws ReplicationFailureException { - return new RaftReplicator( leaderLocator, myself, outbound, sessionPool, capturedProgress, noWaitTimeoutStrategy, noWaitTimeoutStrategy, - 10, databaseAvailabilityGuard, NullLogProvider.getInstance(), monitors ); + CompleteProgressTracker completeProgressTracker = new CompleteProgressTracker(); + CapturingOutbound outbound = new CapturingOutbound<>(); + RaftReplicator replicator = getReplicator( outbound, completeProgressTracker, new Monitors() ); + ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); + + LeaderInfo lastLeader = leaderInfo; + + // set initial leader, sens to that leader + replicator.onLeaderSwitch( lastLeader ); + replicator.replicate( content, false ); + assertEquals( outbound.lastTo, lastLeader.memberId() ); + + // update with valid new leader, sends to new leader + lastLeader = new LeaderInfo( new MemberId( UUID.randomUUID() ), 1 ); + replicator.onLeaderSwitch( lastLeader ); + replicator.replicate( content, false ); + assertEquals( outbound.lastTo, lastLeader.memberId() ); + + // update with invalid null leader, still send to previous leader + replicator.onLeaderSwitch( new LeaderInfo( null, 1 ) ); + replicator.replicate( content, false ); + assertEquals( outbound.lastTo, lastLeader.memberId() ); + } + + private RaftReplicator getReplicator( CapturingOutbound outbound, ProgressTracker progressTracker, Monitors monitors ) + { + return new RaftReplicator( leaderLocator, myself, outbound, sessionPool, progressTracker, noWaitTimeoutStrategy, 10, databaseAvailabilityGuard, + NullLogProvider.getInstance(), monitors ); } private ReplicatingThread replicatingThread( RaftReplicator replicator, ReplicatedInteger content, boolean trackResult ) @@ -309,16 +331,34 @@ Exception getReplicationException() } } - private class CapturingProgressTracker implements ProgressTracker + private class CompleteProgressTracker extends ProgressTrackerAdaptor { - private Progress last; + CompleteProgressTracker() + { + last = new Progress(); + last.setReplicated(); + } + @Override + public Progress start( DistributedOperation operation ) + { + return last; + } + } + + private class CapturingProgressTracker extends ProgressTrackerAdaptor + { @Override public Progress start( DistributedOperation operation ) { last = new Progress(); return last; } + } + + private abstract class ProgressTrackerAdaptor implements ProgressTracker + { + protected Progress last; @Override public void trackReplication( DistributedOperation operation ) @@ -341,7 +381,7 @@ public void abort( DistributedOperation operation ) @Override public void triggerReplicationEvent() { - throw new UnsupportedOperationException(); + // do nothing } @Override