Skip to content

Commit

Permalink
clear the leader if we're stepping down
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Needham committed Jan 25, 2017
1 parent b4bc6a0 commit 9e9c52f
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 22 deletions.
Expand Up @@ -69,7 +69,7 @@ public ReplicationModule( MemberId myself, PlatformModule platformModule, Config

ExponentialBackoffStrategy retryStrategy = new ExponentialBackoffStrategy( 10, 60, SECONDS );
replicator = life.add( new RaftReplicator( consensusModule.raftMachine(), myself, outbound, sessionPool,
progressTracker, retryStrategy ) );
progressTracker, retryStrategy, logProvider ) );
}

public RaftReplicator getReplicator()
Expand Down
Expand Up @@ -281,5 +281,6 @@ private void stepDownToFollower( Outcome outcome )
receivedHeartbeats = false;
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
outcome.setLeader( null );
}
}
Expand Up @@ -32,6 +32,8 @@
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/**
* A replicator implementation suitable in a RAFT context. Will handle resending due to timeouts and leader switches.
Expand All @@ -43,33 +45,29 @@ public class RaftReplicator extends LifecycleAdapter implements Replicator, List
private final ProgressTracker progressTracker;
private final LocalSessionPool sessionPool;
private final RetryStrategy retryStrategy;
private final LeaderLocator leaderLocator;
private final Log log;

private MemberId leader;
private volatile boolean shutdown;

public RaftReplicator( LeaderLocator leaderLocator, MemberId me,
Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, RetryStrategy retryStrategy )
Outbound<MemberId, RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, RetryStrategy retryStrategy, LogProvider logProvider )
{
this.me = me;
this.outbound = outbound;
this.progressTracker = progressTracker;
this.sessionPool = sessionPool;
this.retryStrategy = retryStrategy;

try
{
this.leader = leaderLocator.getLeader();
}
catch ( NoLeaderFoundException e )
{
this.leader = null;
}
this.leaderLocator = leaderLocator;
leaderLocator.registerListener( this );
log = logProvider.getLog( getClass() );
}

@Override
public Future<Object> replicate( ReplicatedContent command, boolean trackResult ) throws InterruptedException
public Future<Object> replicate( ReplicatedContent command, boolean trackResult ) throws InterruptedException, NoLeaderFoundException

{
OperationContext session = sessionPool.acquireSession();

Expand All @@ -80,9 +78,9 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult
do
{
assertDatabaseNotShutdown();
outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ) );
try
{
outbound.send( leaderLocator.getLeader(), new RaftMessages.NewEntry.Request( me, operation ) );
progress.awaitReplication( timeout.getMillis() );
timeout.increment();
}
Expand All @@ -91,6 +89,10 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult
progressTracker.abort( operation );
throw e;
}
catch ( NoLeaderFoundException e )
{
log.debug( "Could not replicate operation " + operation + " because no leader was found. Retrying.", e );
}
} while( !progress.isReplicated() );

BiConsumer<Object,Throwable> cleanup = ( ignored1, ignored2 ) -> sessionPool.releaseSession( session );
Expand All @@ -110,7 +112,6 @@ public Future<Object> replicate( ReplicatedContent command, boolean trackResult
@Override
public void receive( MemberId leader )
{
this.leader = leader;
progressTracker.triggerReplicationEvent();
}

Expand Down
Expand Up @@ -21,6 +21,8 @@

import java.util.concurrent.Future;

import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;

/**
* Replicate content across a cluster of servers.
*/
Expand All @@ -36,5 +38,5 @@ public interface Replicator
*
* @return A future that will receive the result when available. Only valid if trackResult is set.
*/
Future<Object> replicate( ReplicatedContent content, boolean trackResult ) throws InterruptedException;
Future<Object> replicate( ReplicatedContent content, boolean trackResult ) throws InterruptedException, NoLeaderFoundException;
}
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.replication.Replicator;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.kernel.impl.store.id.IdRange;
Expand Down Expand Up @@ -82,7 +83,7 @@ private boolean replicateIdAllocationRequest( IdType idType, ReplicatedIdAllocat
{
return (Boolean) replicator.replicate( idAllocationRequest, true ).get();
}
catch ( InterruptedException | ExecutionException e )
catch ( InterruptedException | ExecutionException | NoLeaderFoundException e )
{
log.error( format( "Failed to acquire id range for idType %s", idType ), e );
throw new IdGenerationException( e );
Expand Down
Expand Up @@ -112,6 +112,10 @@ private synchronized int acquireTokenOrThrow()
{
throw new AcquireLockTimeoutException( e, "Interrupted acquiring token.", Interrupted );
}
catch ( NoLeaderFoundException e )
{
throw new AcquireLockTimeoutException( e, "Could not acquire lock token because no leader was found.", NoLeaderAvailable );
}

try
{
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.replication.Replicator;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.exceptions.schema.ConstraintValidationKernelException;
Expand Down Expand Up @@ -98,7 +99,7 @@ private int requestToken( String tokenName )
Future<Object> future = replicator.replicate( tokenRequest, true );
return (int) future.get();
}
catch ( InterruptedException e )
catch ( InterruptedException | NoLeaderFoundException e )
{
throw new org.neo4j.graphdb.TransactionFailureException( "Could not create token", e );
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.replication.Replicator;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
Expand Down Expand Up @@ -55,6 +56,10 @@ public long commit( final TransactionToApply tx,
{
throw new TransactionFailureException( "Interrupted replicating transaction.", e );
}
catch ( NoLeaderFoundException e )
{
throw new TransactionFailureException( "No leader found while replicating transaction.", e );
}

try
{
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.causalclustering.core.consensus;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

Expand All @@ -35,11 +36,13 @@
import org.neo4j.causalclustering.identity.RaftTestMemberSetBuilder;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.internal.KernelEventHandlers;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.NullLog;

import static junit.framework.TestCase.assertFalse;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down
Expand Up @@ -49,6 +49,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -372,6 +373,7 @@ public void leaderShouldStepDownWhenLackingHeartbeatResponses() throws Exception
// given
RaftState state = raftState()
.votingMembers( asSet( myself, member1, member2 ) )
.leader( myself )
.build();

Leader leader = new Leader();
Expand All @@ -383,6 +385,7 @@ public void leaderShouldStepDownWhenLackingHeartbeatResponses() throws Exception

// then
assertThat( outcome.getRole(), not( LEADER ) );
assertNull( outcome.getLeader() );
}

@Test
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
Expand All @@ -36,6 +37,7 @@
import org.neo4j.causalclustering.helper.RetryStrategy;
import org.neo4j.causalclustering.core.state.Result;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.logging.NullLogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static junit.framework.TestCase.assertEquals;
Expand Down Expand Up @@ -67,7 +69,7 @@ public void shouldSendReplicatedContentToLeader() throws Exception

RaftReplicator replicator =
new RaftReplicator( leaderLocator, myself, outbound, sessionPool,
capturedProgress, retryStrategy );
capturedProgress, retryStrategy, NullLogProvider.getInstance() );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false );
Expand All @@ -94,7 +96,7 @@ public void shouldResendAfterTimeout() throws Exception
CapturingOutbound outbound = new CapturingOutbound();

RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, retryStrategy );
sessionPool, capturedProgress, retryStrategy, NullLogProvider.getInstance() );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, false );
Expand All @@ -118,7 +120,7 @@ public void shouldReleaseSessionWhenFinished() throws Exception
CapturingOutbound outbound = new CapturingOutbound();

RaftReplicator replicator = new RaftReplicator( leaderLocator, myself, outbound,
sessionPool, capturedProgress, retryStrategy );
sessionPool, capturedProgress, retryStrategy, NullLogProvider.getInstance() );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
Thread replicatingThread = replicatingThread( replicator, content, true );
Expand Down Expand Up @@ -158,7 +160,7 @@ private Thread replicatingThread( RaftReplicator replicator, ReplicatedInteger c
}
}
}
catch ( InterruptedException e )
catch ( InterruptedException | NoLeaderFoundException e )
{
throw new IllegalStateException();
}
Expand Down

0 comments on commit 9e9c52f

Please sign in to comment.