Skip to content

Commit

Permalink
Use leaderListener logic in RaftReplicator
Browse files Browse the repository at this point in the history
This commit keeps the previous logic where RaftReplicator would send
to last known leader. But, instead, it uses the listener logic.
  • Loading branch information
RagnarW committed Sep 18, 2018
1 parent 87da4e7 commit f331c97
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 75 deletions.
Expand Up @@ -27,15 +27,14 @@
import java.util.UUID; import java.util.UUID;


import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.consensus.ConsensusModule; import org.neo4j.causalclustering.core.consensus.RaftMachine;
import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.ProgressTrackerImpl; import org.neo4j.causalclustering.core.replication.ProgressTrackerImpl;
import org.neo4j.causalclustering.core.replication.RaftReplicator; import org.neo4j.causalclustering.core.replication.RaftReplicator;
import org.neo4j.causalclustering.core.replication.session.GlobalSession; import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.GlobalSessionTrackerState; import org.neo4j.causalclustering.core.replication.session.GlobalSessionTrackerState;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.state.storage.DurableStateStorage; import org.neo4j.causalclustering.core.state.storage.DurableStateStorage;
import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy; import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
Expand All @@ -55,8 +54,8 @@ public class ReplicationModule
private final ProgressTrackerImpl progressTracker; private final ProgressTrackerImpl progressTracker;
private final SessionTracker sessionTracker; private final SessionTracker sessionTracker;


public ReplicationModule( MemberId myself, PlatformModule platformModule, Config config, public ReplicationModule( RaftMachine raftMachine, MemberId myself, PlatformModule platformModule, Config config,
ConsensusModule consensusModule, Outbound<MemberId,RaftMessages.RaftMessage> outbound, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
File clusterStateDirectory, FileSystemAbstraction fileSystem, LogProvider logProvider, AvailabilityGuard globalAvailabilityGuard ) File clusterStateDirectory, FileSystemAbstraction fileSystem, LogProvider logProvider, AvailabilityGuard globalAvailabilityGuard )
{ {
LifeSupport life = platformModule.life; LifeSupport life = platformModule.life;
Expand All @@ -74,20 +73,15 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config


Duration initialBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_base ); Duration initialBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_base );
Duration upperBoundBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_limit ); Duration upperBoundBackoff = config.get( CausalClusteringSettings.replication_retry_timeout_limit );
Duration leaderBackoff = config.get( CausalClusteringSettings.replication_leader_retry_timeout );


TimeoutStrategy progressRetryStrategy = new ExponentialBackoffStrategy( initialBackoff, upperBoundBackoff ); TimeoutStrategy progressRetryStrategy = new ExponentialBackoffStrategy( initialBackoff, upperBoundBackoff );
TimeoutStrategy leaderRetryStrategy = new ConstantTimeTimeoutStrategy( leaderBackoff );
long availabilityTimeoutMillis = config.get( CausalClusteringSettings.replication_retry_timeout_base ).toMillis(); long availabilityTimeoutMillis = config.get( CausalClusteringSettings.replication_retry_timeout_base ).toMillis();
replicator = new RaftReplicator( replicator = new RaftReplicator(
consensusModule.raftMachine(), raftMachine,
myself, myself,
outbound, outbound,
sessionPool, sessionPool,
progressTracker, progressTracker, progressRetryStrategy, availabilityTimeoutMillis,
progressRetryStrategy,
leaderRetryStrategy,
availabilityTimeoutMillis,
globalAvailabilityGuard, logProvider, globalAvailabilityGuard, logProvider,
platformModule.monitors ); platformModule.monitors );
} }
Expand Down
Expand Up @@ -350,10 +350,6 @@ public HostnameResolver getHostnameResolver( LogProvider logProvider, LogProvide
public static final Setting<Duration> replication_retry_timeout_limit = public static final Setting<Duration> replication_retry_timeout_limit =
setting( "causal_clustering.replication_retry_timeout_limit", DURATION, "60s" ); setting( "causal_clustering.replication_retry_timeout_limit", DURATION, "60s" );


@Description( "The retry timeout for finding a leader for replication. Relevant during leader elections." )
public static final Setting<Duration> replication_leader_retry_timeout =
setting( "causal_clustering.replication_leader", DURATION, "500ms" );

@Description( "The number of operations to be processed before the state machines flush to disk" ) @Description( "The number of operations to be processed before the state machines flush to disk" )
public static final Setting<Integer> state_machine_flush_window_size = public static final Setting<Integer> state_machine_flush_window_size =
setting( "causal_clustering.state_machine_flush_window_size", INTEGER, "4096" ); setting( "causal_clustering.state_machine_flush_window_size", INTEGER, "4096" );
Expand Down
Expand Up @@ -297,9 +297,8 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,


dependencies.satisfyDependency( consensusModule.raftMachine() ); dependencies.satisfyDependency( consensusModule.raftMachine() );


replicationModule = replicationModule = new ReplicationModule( consensusModule.raftMachine(), identityModule.myself(), platformModule, config, loggingOutbound,
new ReplicationModule( identityModule.myself(), platformModule, config, consensusModule, clusterStateDirectory.get(), fileSystem, logProvider, globalGuard );
loggingOutbound, clusterStateDirectory.get(), fileSystem, logProvider, globalGuard );


coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(), coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(),
platformModule, clusterStateDirectory.get(), config, replicationModule.getReplicator(), platformModule, clusterStateDirectory.get(), config, replicationModule.getReplicator(),
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.neo4j.causalclustering.core.consensus.LeaderInfo; import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.LeaderListener; import org.neo4j.causalclustering.core.consensus.LeaderListener;
import org.neo4j.causalclustering.core.consensus.LeaderLocator; import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor; import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool; import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
Expand All @@ -53,46 +52,38 @@ public class RaftReplicator implements Replicator, LeaderListener
private final LocalSessionPool sessionPool; private final LocalSessionPool sessionPool;
private final TimeoutStrategy progressTimeoutStrategy; private final TimeoutStrategy progressTimeoutStrategy;
private final AvailabilityGuard availabilityGuard; private final AvailabilityGuard availabilityGuard;
private final LeaderLocator leaderLocator;
private final TimeoutStrategy leaderTimeoutStrategy;
private final Log log; private final Log log;
private final ReplicationMonitor replicationMonitor; private final ReplicationMonitor replicationMonitor;
private final long availabilityTimeoutMillis; private final long availabilityTimeoutMillis;
private volatile MemberId lastKnownLeader;


public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool, public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy, long availabilityTimeoutMillis, ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, long availabilityTimeoutMillis, AvailabilityGuard availabilityGuard,
AvailabilityGuard availabilityGuard, LogProvider logProvider, Monitors monitors ) LogProvider logProvider, Monitors monitors )
{ {
this.me = me; this.me = me;
this.outbound = outbound; this.outbound = outbound;
this.progressTracker = progressTracker; this.progressTracker = progressTracker;
this.sessionPool = sessionPool; this.sessionPool = sessionPool;
this.progressTimeoutStrategy = progressTimeoutStrategy; this.progressTimeoutStrategy = progressTimeoutStrategy;
this.leaderTimeoutStrategy = leaderTimeoutStrategy;
this.availabilityTimeoutMillis = availabilityTimeoutMillis; this.availabilityTimeoutMillis = availabilityTimeoutMillis;
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.leaderLocator = leaderLocator; this.log = logProvider.getLog( getClass() );
leaderLocator.registerListener( this );
log = logProvider.getLog( getClass() );
this.replicationMonitor = monitors.newMonitor( ReplicationMonitor.class ); this.replicationMonitor = monitors.newMonitor( ReplicationMonitor.class );
leaderLocator.registerListener( this );
} }


@Override @Override
public Future<Object> replicate( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException public Future<Object> replicate( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException
{ {
MemberId originalLeader; if ( lastKnownLeader == null )
try
{
originalLeader = leaderLocator.getLeader();
}
catch ( NoLeaderFoundException e )
{ {
throw new ReplicationFailureException( "Replication aborted since no leader was available", e ); throw new ReplicationFailureException( "Replication aborted since no leader was available" );
} }
return replicate0( command, trackResult, originalLeader ); return replicate0( command, trackResult );
} }


private Future<Object> replicate0( ReplicatedContent command, boolean trackResult, MemberId leader ) throws ReplicationFailureException private Future<Object> replicate0( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException
{ {
replicationMonitor.startReplication(); replicationMonitor.startReplication();
try try
Expand All @@ -103,7 +94,6 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
Progress progress = progressTracker.start( operation ); Progress progress = progressTracker.start( operation );


TimeoutStrategy.Timeout progressTimeout = progressTimeoutStrategy.newTimeout(); TimeoutStrategy.Timeout progressTimeout = progressTimeoutStrategy.newTimeout();
TimeoutStrategy.Timeout leaderTimeout = leaderTimeoutStrategy.newTimeout();
int attempts = 0; int attempts = 0;
try try
{ {
Expand All @@ -116,24 +106,14 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
} }
replicationMonitor.replicationAttempt(); replicationMonitor.replicationAttempt();
assertDatabaseAvailable(); assertDatabaseAvailable();
try // blocking at least until the send has succeeded or failed before retrying
outbound.send( lastKnownLeader, new RaftMessages.NewEntry.Request( me, operation ), true );
progress.awaitReplication( progressTimeout.getMillis() );
if ( progress.isReplicated() )
{ {
// blocking at least until the send has succeeded or failed before retrying break;
outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ), true );
progress.awaitReplication( progressTimeout.getMillis() );
if ( progress.isReplicated() )
{
break;
}
progressTimeout.increment();
leader = leaderLocator.getLeader();
}
catch ( NoLeaderFoundException e )
{
log.debug( "Could not replicate operation " + operation + " because no leader was found. Retrying.", e );
Thread.sleep( leaderTimeout.getMillis() );
leaderTimeout.increment();
} }
progressTimeout.increment();
} }
} }
catch ( InterruptedException e ) catch ( InterruptedException e )
Expand Down Expand Up @@ -167,6 +147,10 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
public void onLeaderSwitch( LeaderInfo leaderInfo ) public void onLeaderSwitch( LeaderInfo leaderInfo )
{ {
progressTracker.triggerReplicationEvent(); progressTracker.triggerReplicationEvent();
if ( leaderInfo.memberId() != null )
{
lastKnownLeader = leaderInfo.memberId();
}
} }


private void assertDatabaseAvailable() throws ReplicationFailureException private void assertDatabaseAvailable() throws ReplicationFailureException
Expand Down

0 comments on commit f331c97

Please sign in to comment.