diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/LeaderProvider.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/LeaderProvider.java new file mode 100644 index 0000000000000..75134522152e7 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/LeaderProvider.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.core.replication; + +import org.neo4j.causalclustering.identity.MemberId; + +class LeaderProvider +{ + private MemberId currentLeader; + + synchronized MemberId awaitLeader() throws InterruptedException + { + if ( currentLeader == null ) + { + while ( currentLeader == null ) + { + wait( ); + } + } + return currentLeader; + } + + synchronized void setLeader( MemberId leader ) + { + this.currentLeader = leader; + if ( currentLeader != null ) + { + notifyAll(); + } + } + + MemberId currentLeader() + { + return currentLeader; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java index 11d14c56b8aed..9484a44aa68c7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/replication/RaftReplicator.java @@ -55,7 +55,7 @@ public class RaftReplicator implements Replicator, LeaderListener private final Log log; private final ReplicationMonitor replicationMonitor; private final long availabilityTimeoutMillis; - private volatile MemberId lastKnownLeader; + private final LeaderProvider leaderProvider; public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound outbound, LocalSessionPool sessionPool, ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, long availabilityTimeoutMillis, AvailabilityGuard availabilityGuard, @@ -70,20 +70,22 @@ public RaftReplicator( LeaderLocator leaderLocator, MemberId me, Outbound replicate( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException { - if ( lastKnownLeader == null ) + MemberId currentLeader = leaderProvider.currentLeader(); + if ( currentLeader == null ) { throw new ReplicationFailureException( "Replication aborted since no leader was available" ); } - return replicate0( command, trackResult ); + return replicate0( command, trackResult, currentLeader ); } - private Future replicate0( ReplicatedContent command, boolean trackResult ) throws ReplicationFailureException + private Future replicate0( ReplicatedContent command, boolean trackResult, MemberId leader ) throws ReplicationFailureException { replicationMonitor.startReplication(); try @@ -107,13 +109,14 @@ private Future replicate0( ReplicatedContent command, boolean trackResul replicationMonitor.replicationAttempt(); assertDatabaseAvailable(); // blocking at least until the send has succeeded or failed before retrying - outbound.send( lastKnownLeader, new RaftMessages.NewEntry.Request( me, operation ), true ); + outbound.send( leader, new RaftMessages.NewEntry.Request( me, operation ), true ); progress.awaitReplication( progressTimeout.getMillis() ); if ( progress.isReplicated() ) { break; } progressTimeout.increment(); + leader = leaderProvider.awaitLeader(); } } catch ( InterruptedException e ) @@ -147,10 +150,17 @@ private Future replicate0( ReplicatedContent command, boolean trackResul public void onLeaderSwitch( LeaderInfo leaderInfo ) { progressTracker.triggerReplicationEvent(); - if ( leaderInfo.memberId() != null ) + MemberId newLeader = leaderInfo.memberId(); + MemberId oldLeader = leaderProvider.currentLeader(); + if ( newLeader == null && oldLeader != null ) { - lastKnownLeader = leaderInfo.memberId(); + log.info( "Lost previous leader '%s'. Currently no available leader", oldLeader ); } + else if ( newLeader != null && oldLeader == null ) + { + log.info( "A new leader has been detected: '%s'", newLeader ); + } + leaderProvider.setLeader( newLeader ); } private void assertDatabaseAvailable() throws ReplicationFailureException diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/LeaderProviderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/LeaderProviderTest.java new file mode 100644 index 0000000000000..b3dcd13ab0373 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/LeaderProviderTest.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.core.replication; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.util.concurrent.Futures; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class LeaderProviderTest +{ + + private static final MemberId MEMBER_ID = new MemberId( UUID.randomUUID() ); + private final ExecutorService executorService = Executors.newCachedThreadPool(); + private final LeaderProvider leaderProvider = new LeaderProvider(); + + @Before + public void before() + { + leaderProvider.setLeader( null ); + } + + @Test + public void shouldGiveCurrentLeaderIfAvailable() throws InterruptedException + { + leaderProvider.setLeader( MEMBER_ID ); + assertEquals( leaderProvider.currentLeader(), MEMBER_ID ); + assertEquals( leaderProvider.awaitLeader(), MEMBER_ID ); + } + + @Test + public void shouldWaitForNonNullValue() throws InterruptedException, ExecutionException, TimeoutException + { + // given + int threads = 3; + assertNull( leaderProvider.currentLeader() ); + + // when + List> futures = new ArrayList<>(); + for ( int i = 0; i < threads; i++ ) + { + Future interrupted = executorService.submit( getCurrentLeader() ); + futures.add( interrupted ); + } + + // then + Future> combine = Futures.combine( futures ); + Thread.sleep( 100 ); + assertFalse( combine.isDone() ); + + // when + leaderProvider.setLeader( MEMBER_ID ); + + List memberIds = combine.get( 5, TimeUnit.SECONDS ); + + // then + assertTrue( memberIds.stream().allMatch( memberId -> memberId.equals( MEMBER_ID ) ) ); + } + + private Callable getCurrentLeader() + { + return () -> + { + try + { + return leaderProvider.awaitLeader(); + } + catch ( InterruptedException e ) + { + throw new RuntimeException( "Interrupted" ); + } + }; + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java index 168cbe8c2de58..e484c23cf7cc4 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/replication/RaftReplicatorTest.java @@ -250,9 +250,10 @@ public void shouldFailIfNoLeaderIsAvailable() @Test public void shouldListenToLeaderUpdates() throws ReplicationFailureException { - CompleteProgressTracker completeProgressTracker = new CompleteProgressTracker(); + OneProgressTracker oneProgressTracker = new OneProgressTracker(); + oneProgressTracker.last.setReplicated(); CapturingOutbound outbound = new CapturingOutbound<>(); - RaftReplicator replicator = getReplicator( outbound, completeProgressTracker, new Monitors() ); + RaftReplicator replicator = getReplicator( outbound, oneProgressTracker, new Monitors() ); ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); LeaderInfo lastLeader = leaderInfo; @@ -267,11 +268,30 @@ public void shouldListenToLeaderUpdates() throws ReplicationFailureException replicator.onLeaderSwitch( lastLeader ); replicator.replicate( content, false ); assertEquals( outbound.lastTo, lastLeader.memberId() ); + } + + @Test + public void shouldSuccefulltSendIfLeaderIsLostAndFound() throws InterruptedException + { + OneProgressTracker capturedProgress = new OneProgressTracker(); + CapturingOutbound outbound = new CapturingOutbound<>(); + + RaftReplicator replicator = getReplicator( outbound, capturedProgress, new Monitors() ); + replicator.onLeaderSwitch( leaderInfo ); + + ReplicatedInteger content = ReplicatedInteger.valueOf( 5 ); + ReplicatingThread replicatingThread = replicatingThread( replicator, content, false ); - // update with invalid null leader, still send to previous leader + // when + replicatingThread.start(); + + // then + assertEventually( "send count", () -> outbound.count, greaterThan( 1 ), DEFAULT_TIMEOUT_MS, MILLISECONDS ); replicator.onLeaderSwitch( new LeaderInfo( null, 1 ) ); - replicator.replicate( content, false ); - assertEquals( outbound.lastTo, lastLeader.memberId() ); + capturedProgress.last.setReplicated(); + replicator.onLeaderSwitch( leaderInfo ); + + replicatingThread.join( DEFAULT_TIMEOUT_MS ); } private RaftReplicator getReplicator( CapturingOutbound outbound, ProgressTracker progressTracker, Monitors monitors ) @@ -331,12 +351,11 @@ Exception getReplicationException() } } - private class CompleteProgressTracker extends ProgressTrackerAdaptor + private class OneProgressTracker extends ProgressTrackerAdaptor { - CompleteProgressTracker() + OneProgressTracker() { last = new Progress(); - last.setReplicated(); } @Override