Skip to content

Commit

Permalink
Wait for current leader to become available in RaftReplicator
Browse files Browse the repository at this point in the history
Previously we have used a logic where we send to last known leader.
In this commit RaftReplicator will wait indefinitely for a valid leader
and will not try to resend the request to an out of date member.
  • Loading branch information
RagnarW committed Sep 18, 2018
1 parent f331c97 commit 1594e96
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 15 deletions.
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.core.replication;

import org.neo4j.causalclustering.identity.MemberId;

class LeaderProvider
{
private MemberId currentLeader;

synchronized MemberId awaitLeader() throws InterruptedException
{
if ( currentLeader == null )
{
while ( currentLeader == null )
{
wait( );
}
}
return currentLeader;
}

synchronized void setLeader( MemberId leader )
{
this.currentLeader = leader;
if ( currentLeader != null )
{
notifyAll();
}
}

MemberId currentLeader()
{
return currentLeader;
}
}
Expand Up @@ -55,7 +55,7 @@ public class RaftReplicator implements Replicator, LeaderListener
private final Log log;
private final ReplicationMonitor replicationMonitor;
private final long availabilityTimeoutMillis;
private volatile MemberId lastKnownLeader;
private final LeaderProvider leaderProvider;

public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<MemberId,RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool,
ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, long availabilityTimeoutMillis, AvailabilityGuard availabilityGuard,
Expand All @@ -70,20 +70,22 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound<Member
this.availabilityGuard = availabilityGuard;
this.log = logProvider.getLog( getClass() );
this.replicationMonitor = monitors.newMonitor( ReplicationMonitor.class );
this.leaderProvider = new LeaderProvider();
leaderLocator.registerListener( this );
}

@Override
public Future<Object> replicate( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException
{
if ( lastKnownLeader == null )
MemberId currentLeader = leaderProvider.currentLeader();
if ( currentLeader == null )
{
throw new ReplicationFailureException( "Replication aborted since no leader was available" );
}
return replicate0( command, trackResult );
return replicate0( command, trackResult, currentLeader );
}

private Future<Object> replicate0( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException
private Future<Object> replicate0( ReplicatedContent command, boolean trackResult, MemberId leader ) throws ReplicationFailureException
{
replicationMonitor.startReplication();
try
Expand All @@ -107,13 +109,14 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
replicationMonitor.replicationAttempt();
assertDatabaseAvailable();
// blocking at least until the send has succeeded or failed before retrying
outbound.send( lastKnownLeader, new RaftMessages.NewEntry.Request( me, operation ), true );
outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ), true );
progress.awaitReplication( progressTimeout.getMillis() );
if ( progress.isReplicated() )
{
break;
}
progressTimeout.increment();
leader = leaderProvider.awaitLeader();
}
}
catch ( InterruptedException e )
Expand Down Expand Up @@ -147,10 +150,17 @@ private Future<Object> replicate0( ReplicatedContent command, boolean trackResul
public void onLeaderSwitch( LeaderInfo leaderInfo )
{
progressTracker.triggerReplicationEvent();
if ( leaderInfo.memberId() != null )
MemberId newLeader = leaderInfo.memberId();
MemberId oldLeader = leaderProvider.currentLeader();
if ( newLeader == null && oldLeader != null )
{
lastKnownLeader = leaderInfo.memberId();
log.info( "Lost previous leader '%s'. Currently no available leader", oldLeader );
}
else if ( newLeader != null && oldLeader == null )
{
log.info( "A new leader has been detected: '%s'", newLeader );
}
leaderProvider.setLeader( newLeader );
}

private void assertDatabaseAvailable() throws ReplicationFailureException
Expand Down
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.core.replication;

import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.util.concurrent.Futures;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class LeaderProviderTest
{

private static final MemberId MEMBER_ID = new MemberId( UUID.randomUUID() );
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final LeaderProvider leaderProvider = new LeaderProvider();

@Before
public void before()
{
leaderProvider.setLeader( null );
}

@Test
public void shouldGiveCurrentLeaderIfAvailable() throws InterruptedException
{
leaderProvider.setLeader( MEMBER_ID );
assertEquals( leaderProvider.currentLeader(), MEMBER_ID );
assertEquals( leaderProvider.awaitLeader(), MEMBER_ID );
}

@Test
public void shouldWaitForNonNullValue() throws InterruptedException, ExecutionException, TimeoutException
{
// given
int threads = 3;
assertNull( leaderProvider.currentLeader() );

// when
List<Future<MemberId>> futures = new ArrayList<>();
for ( int i = 0; i < threads; i++ )
{
Future<MemberId> interrupted = executorService.submit( getCurrentLeader() );
futures.add( interrupted );
}

// then
Future<List<MemberId>> combine = Futures.combine( futures );
Thread.sleep( 100 );
assertFalse( combine.isDone() );

// when
leaderProvider.setLeader( MEMBER_ID );

List<MemberId> memberIds = combine.get( 5, TimeUnit.SECONDS );

// then
assertTrue( memberIds.stream().allMatch( memberId -> memberId.equals( MEMBER_ID ) ) );
}

private Callable<MemberId> getCurrentLeader()
{
return () ->
{
try
{
return leaderProvider.awaitLeader();
}
catch ( InterruptedException e )
{
throw new RuntimeException( "Interrupted" );
}
};
}
}
Expand Up @@ -250,9 +250,10 @@ public void shouldFailIfNoLeaderIsAvailable()
@Test
public void shouldListenToLeaderUpdates() throws ReplicationFailureException
{
CompleteProgressTracker completeProgressTracker = new CompleteProgressTracker();
OneProgressTracker oneProgressTracker = new OneProgressTracker();
oneProgressTracker.last.setReplicated();
CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>();
RaftReplicator replicator = getReplicator( outbound, completeProgressTracker, new Monitors() );
RaftReplicator replicator = getReplicator( outbound, oneProgressTracker, new Monitors() );
ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );

LeaderInfo lastLeader = leaderInfo;
Expand All @@ -267,11 +268,30 @@ public void shouldListenToLeaderUpdates() throws ReplicationFailureException
replicator.onLeaderSwitch( lastLeader );
replicator.replicate( content, false );
assertEquals( outbound.lastTo, lastLeader.memberId() );
}

@Test
public void shouldSuccefulltSendIfLeaderIsLostAndFound() throws InterruptedException
{
OneProgressTracker capturedProgress = new OneProgressTracker();
CapturingOutbound<RaftMessages.RaftMessage> outbound = new CapturingOutbound<>();

RaftReplicator replicator = getReplicator( outbound, capturedProgress, new Monitors() );
replicator.onLeaderSwitch( leaderInfo );

ReplicatedInteger content = ReplicatedInteger.valueOf( 5 );
ReplicatingThread replicatingThread = replicatingThread( replicator, content, false );

// update with invalid null leader, still send to previous leader
// when
replicatingThread.start();

// then
assertEventually( "send count", () -> outbound.count, greaterThan( 1 ), DEFAULT_TIMEOUT_MS, MILLISECONDS );
replicator.onLeaderSwitch( new LeaderInfo( null, 1 ) );
replicator.replicate( content, false );
assertEquals( outbound.lastTo, lastLeader.memberId() );
capturedProgress.last.setReplicated();
replicator.onLeaderSwitch( leaderInfo );

replicatingThread.join( DEFAULT_TIMEOUT_MS );
}

private RaftReplicator getReplicator( CapturingOutbound<RaftMessages.RaftMessage> outbound, ProgressTracker progressTracker, Monitors monitors )
Expand Down Expand Up @@ -331,12 +351,11 @@ Exception getReplicationException()
}
}

private class CompleteProgressTracker extends ProgressTrackerAdaptor
private class OneProgressTracker extends ProgressTrackerAdaptor
{
CompleteProgressTracker()
OneProgressTracker()
{
last = new Progress();
last.setReplicated();
}

@Override
Expand Down

0 comments on commit 1594e96

Please sign in to comment.