From 812c036dd7e442c8a751025755b74f38364efcb8 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 7 Jul 2016 10:35:37 +0200 Subject: [PATCH] core-edge: stop raft log-shipper earlier Was previously destroyed in shutdown phase for unknown reason. Also renames start/stop to pause/resume in log shipper to not confuse it with the lifecycle methods. --- .../org/neo4j/coreedge/raft/RaftInstance.java | 4 +- .../replication/shipping/RaftLogShipper.java | 2 +- .../shipping/RaftLogShippingManager.java | 38 +++++++++++-------- .../core/EnterpriseCoreEditionModule.java | 10 +---- .../coreedge/raft/elections/Fixture.java | 2 +- 5 files changed, 27 insertions(+), 29 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index e39260d0ef539..7889b2c2300e1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -263,11 +263,11 @@ private void handleLogShipping( Outcome outcome ) throws IOException LeaderContext leaderContext = new LeaderContext( outcome.getTerm(), outcome.getLeaderCommit() ); if ( outcome.isElectedLeader() ) { - logShipping.start( leaderContext ); + logShipping.resume( leaderContext ); } else if ( outcome.isSteppingDown() ) { - logShipping.stop(); + logShipping.pause(); } if ( outcome.getRole() == LEADER ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java index 78b7f9d5f3ee5..46c18fa7575f2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.java @@ -169,7 +169,7 @@ public synchronized void stop() } catch ( Throwable e ) { - log.error( "Failed to start log shipper " + statusAsString(), e ); + log.error( "Failed to stop log shipper " + statusAsString(), e ); } abortTimeout(); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java index c479c2088a8f1..fc414b2a0ba92 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.java @@ -25,8 +25,6 @@ import java.util.HashSet; import java.util.Map; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.network.Message; import org.neo4j.coreedge.raft.LeaderContext; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -36,11 +34,12 @@ import org.neo4j.coreedge.raft.net.Outbound; import org.neo4j.coreedge.raft.outcome.ShipCommand; import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.LogProvider; import static java.lang.String.format; -public class RaftLogShippingManager implements RaftMembership.Listener +public class RaftLogShippingManager extends LifecycleAdapter implements RaftMembership.Listener { private final Outbound outbound; private final LogProvider logProvider; @@ -58,7 +57,7 @@ public class RaftLogShippingManager implements RaftMembership.Listener private LeaderContext lastLeaderContext; private boolean running; - private boolean destroyed = false; + private boolean stopped = false; public RaftLogShippingManager( Outbound outbound, LogProvider logProvider, ReadableRaftLog raftLog, @@ -79,9 +78,23 @@ public RaftLogShippingManager( Outbound out membership.registerListener( this ); } - public synchronized void start( LeaderContext initialLeaderContext ) + /** + * Paused when stepping down from leader role. + */ + public synchronized void pause() { - if( destroyed ) + running = false; + + logShippers.values().forEach( RaftLogShipper::stop ); + logShippers.clear(); + } + + /** + * Resumed when becoming leader. + */ + public synchronized void resume( LeaderContext initialLeaderContext ) + { + if( stopped ) { return; } @@ -96,18 +109,11 @@ public synchronized void start( LeaderContext initialLeaderContext ) lastLeaderContext = initialLeaderContext; } - public synchronized void destroy() - { - stop(); - destroyed = true; - } - + @Override public synchronized void stop() { - running = false; - - logShippers.values().forEach( RaftLogShipper::stop ); - logShippers.clear(); + pause(); + stopped = true; } private RaftLogShipper ensureLogShipperRunning( CoreMember member, LeaderContext leaderContext ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index ba13c38fe7652..e886ff240ded8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -575,6 +575,7 @@ expectedClusterSize, electionTimeout, systemUTC(), myself, raftMembershipManager, electionTimeout, config.get( CoreEdgeClusterSettings.catchup_batch_size ), config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap ); + life.add( logShipping ); RaftInstance raftInstance = new RaftInstance( myself, termState, voteState, raftLog, raftStateMachine, electionTimeout, @@ -593,15 +594,6 @@ expectedClusterSize, electionTimeout, systemUTC(), life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) ); - life.add( new LifecycleAdapter() - { - @Override - public void shutdown() throws Throwable - { - logShipping.destroy(); - } - } ); - return raftInstance; } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/Fixture.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/Fixture.java index 2bc9b34afaa5b..27f352c8bb429 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/Fixture.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/Fixture.java @@ -109,7 +109,7 @@ public void teardown() throws InterruptedException } for ( RaftInstance raft : rafts ) { - raft.logShippingManager().destroy(); + raft.logShippingManager().stop(); } } }