Skip to content

Commit

Permalink
Introduce wait for getting Leader in RaftReplicator.
Browse files Browse the repository at this point in the history
Without a wait new TimeoutException and NoLeaderFoundException would be
generated constantly until a Leader has been elected.
  • Loading branch information
andrewkerr9000 committed Nov 14, 2017
1 parent 7206285 commit fc2f3b7
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 21 deletions.
Expand Up @@ -30,8 +30,10 @@
import org.neo4j.causalclustering.core.replication.session.GlobalSession;
import org.neo4j.causalclustering.core.replication.session.GlobalSessionTrackerState;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.helper.ConstantTimeRetryStrategy;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.core.state.storage.DurableStateStorage;
import org.neo4j.causalclustering.helper.RetryStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.io.fs.FileSystemAbstraction;
Expand All @@ -40,6 +42,7 @@
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

public class ReplicationModule
Expand Down Expand Up @@ -67,9 +70,10 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config
LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession );
progressTracker = new ProgressTrackerImpl( myGlobalSession );

ExponentialBackoffStrategy retryStrategy = new ExponentialBackoffStrategy( 10, 60, SECONDS );
RetryStrategy progressRetryStrategy = new ExponentialBackoffStrategy( 10, 60, SECONDS );
RetryStrategy leaderRetryStrategy = new ConstantTimeRetryStrategy( 500, MILLISECONDS );
replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool,
progressTracker, retryStrategy, logProvider ) );
progressTracker, progressRetryStrategy, leaderRetryStrategy, logProvider ) );
}

public RaftReplicator getReplicator()
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
Expand Down Expand Up @@ -202,7 +203,7 @@ public synchronized void setTargetMembershipSet( Set<MemberId> targetMembers )
@Override
public MemberId getLeader() throws NoLeaderFoundException
{
return waitForLeader( 0, member -> member != null );
return waitForLeader( 0, Objects::nonNull );
}

private MemberId waitForLeader( long timeoutMillis, Predicate<MemberId> predicate ) throws NoLeaderFoundException
Expand Down
Expand Up @@ -44,45 +44,49 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List
private final Outbound<MemberId,RaftMessages.RaftMessage> outbound;
private final ProgressTracker progressTracker;
private final LocalSessionPool sessionPool;
private final RetryStrategy retryStrategy;
private final RetryStrategy progressRetryStrategy;
private final LeaderLocator leaderLocator;
private final RetryStrategy leaderRetryStrategy;
private final Log log;

private volatile boolean shutdown;

public RaftReplicator( LeaderLocator leaderLocator, MemberId me,
Outbound<MemberId, RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, RetryStrategy retryStrategy, LogProvider logProvider )
public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, RetryStrategy progressRetryStrategy, RetryStrategy leaderRetryStrategy, LogProvider logProvider )
{
this.me = me;
this.outbound = outbound;
this.progressTracker = progressTracker;
this.sessionPool = sessionPool;
this.retryStrategy = retryStrategy;
this.progressRetryStrategy = progressRetryStrategy;
this.leaderRetryStrategy = leaderRetryStrategy;

this.leaderLocator = leaderLocator;
leaderLocator.registerListener( this );
log = logProvider.getLog( getClass() );
}

@Override
public Future<Object> replicate( ReplicatedContent command, boolean trackResult ) throws InterruptedException, NoLeaderFoundException

public Future<Object> replicate( ReplicatedContent command, boolean trackResult ) throws InterruptedException
{
OperationContext session = sessionPool.acquireSession();

DistributedOperation operation = new DistributedOperation( command, session.globalSession(), session.localOperationId() );
Progress progress = progressTracker.start( operation );

RetryStrategy.Timeout timeout = retryStrategy.newTimeout();
RetryStrategy.Timeout progressTimeout = progressRetryStrategy.newTimeout();
RetryStrategy.Timeout leaderTimeout = leaderRetryStrategy.newTimeout();
do
{
assertDatabaseNotShutdown();
try
{
outbound.send( leaderLocator.getLeader(), new RaftMessages.NewEntry.Request( me, operation ) );
progress.awaitReplication( timeout.getMillis() );
timeout.increment();

leaderTimeout = leaderRetryStrategy.newTimeout();

progress.awaitReplication( progressTimeout.getMillis() );
progressTimeout.increment();
}
catch ( InterruptedException e )
{
Expand All @@ -92,6 +96,8 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult
catch ( NoLeaderFoundException e )
{
log.debug( "Could not replicate operation " + operation + " because no leader was found. Retrying.", e );
Thread.sleep( leaderTimeout.getMillis() );
leaderTimeout.increment();
}
} while( !progress.isReplicated() );

Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.messaging.Message;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class RaftReplicatorTest
private MemberId leader = new MemberId( UUID.randomUUID() );
private GlobalSession session = new GlobalSession( UUID.randomUUID(), myself );
private LocalSessionPool sessionPool = new LocalSessionPool( session );
private RetryStrategy retryStrategy = new ConstantTimeRetryStrategy( 0, MILLISECONDS );
private RetryStrategy noWaitRetryStrategy = new ConstantTimeRetryStrategy( 0, MILLISECONDS );

@Test
public void shouldSendReplicatedContentToLeader() throws Exception
Expand All @@ -69,7 +70,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception

RaftReplicator replicator =
new RaftReplicator( leaderLocator, myself, outbound, sessionPool,
capturedProgress, retryStrategy, NullLogProvider.getInstance() );
capturedProgress, noWaitRetryStrategy, noWaitRetryStrategy, NullLogProvider.getInstance() );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false );
Expand All @@ -93,10 +94,10 @@ public void shouldResendAfterTimeout() throws Exception
// given
when( leaderLocator.getLeader() ).thenReturn( leader );
CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
CapturingOutbound outbound = new CapturingOutbound();
CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>();

RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, retryStrategy, NullLogProvider.getInstance() );
sessionPool, capturedProgress, noWaitRetryStrategy, noWaitRetryStrategy, NullLogProvider.getInstance() );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false );
Expand All @@ -111,16 +112,46 @@ public void shouldResendAfterTimeout() throws Exception
replicatingThread.join( DEFAULT_TIMEOUT_MS );
}

@Test
public void shouldRetryGettingLeader() throws Exception
{
// given
AtomicInteger leaderRetries = new AtomicInteger( 0 );

when( leaderLocator.getLeader() )
.thenThrow( new NoLeaderFoundException( ) )
.thenReturn( leader );
CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>();

RaftReplicator replicator =
new RaftReplicator( leaderLocator, myself, outbound, sessionPool,
capturedProgress, noWaitRetryStrategy, new SpyRetryStrategy( leaderRetries ), NullLogProvider.getInstance() );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false );

// when
replicatingThread.start();
// then
assertEventually( "send count", () -> outbound.count, greaterThan( 2 ), DEFAULT_TIMEOUT_MS, MILLISECONDS );
assertEquals( 1, leaderRetries.get() );

// cleanup
capturedProgress.last.setReplicated();
replicatingThread.join( DEFAULT_TIMEOUT_MS );
}

@Test
public void shouldReleaseSessionWhenFinished() throws Exception
{
// given
when( leaderLocator.getLeader() ).thenReturn( leader );
CapturingProgressTracker capturedProgress = new CapturingProgressTracker();
CapturingOutbound outbound = new CapturingOutbound();
CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>();

RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, retryStrategy, NullLogProvider.getInstance() );
sessionPool, capturedProgress, noWaitRetryStrategy, noWaitRetryStrategy, NullLogProvider.getInstance() );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, true );
Expand Down Expand Up @@ -160,7 +191,7 @@ private Thread replicatingThread( RaftReplicator replicator, ReplicatedInteger c
}
}
}
catch ( InterruptedException | NoLeaderFoundException e )
catch ( InterruptedException e )
{
throw new IllegalStateException();
}
Expand Down Expand Up @@ -209,7 +240,7 @@ public int inProgressCount()
}
}

private class CapturingOutbound<MESSAGE extends Message> implements Outbound<MemberId, MESSAGE>
private static class CapturingOutbound<MESSAGE extends Message> implements Outbound<MemberId, MESSAGE>
{
private MemberId lastTo;
private int count;
Expand All @@ -222,4 +253,33 @@ public void send( MemberId to, MESSAGE message )
}

}

private static class SpyRetryStrategy implements RetryStrategy
{
private final AtomicInteger increments;

SpyRetryStrategy( AtomicInteger increments )
{
this.increments = increments;
}

@Override
public RetryStrategy.Timeout newTimeout()
{
return new RetryStrategy.Timeout()
{
@Override
public long getMillis()
{
return 0;
}

@Override
public void increment()
{
increments.incrementAndGet();
}
};
}
}
}

0 comments on commit fc2f3b7

Please sign in to comment.