Skip to content

Commit

Permalink
core-edge: stop raft log-shipper earlier
Browse files Browse the repository at this point in the history
Was previously destroyed in shutdown phase for unknown reason.

Also renames start/stop to pause/resume in log shipper to not
confuse it with the lifecycle methods.
  • Loading branch information
martinfurmanski committed Jul 7, 2016
1 parent e661f96 commit 812c036
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 29 deletions.
Expand Up @@ -263,11 +263,11 @@ private void handleLogShipping( Outcome outcome ) throws IOException
LeaderContext leaderContext = new LeaderContext( outcome.getTerm(), outcome.getLeaderCommit() ); LeaderContext leaderContext = new LeaderContext( outcome.getTerm(), outcome.getLeaderCommit() );
if ( outcome.isElectedLeader() ) if ( outcome.isElectedLeader() )
{ {
logShipping.start( leaderContext ); logShipping.resume( leaderContext );
} }
else if ( outcome.isSteppingDown() ) else if ( outcome.isSteppingDown() )
{ {
logShipping.stop(); logShipping.pause();
} }


if ( outcome.getRole() == LEADER ) if ( outcome.getRole() == LEADER )
Expand Down
Expand Up @@ -169,7 +169,7 @@ public synchronized void stop()
} }
catch ( Throwable e ) catch ( Throwable e )
{ {
log.error( "Failed to start log shipper " + statusAsString(), e ); log.error( "Failed to stop log shipper " + statusAsString(), e );
} }
abortTimeout(); abortTimeout();
} }
Expand Down
Expand Up @@ -25,8 +25,6 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;


import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.LeaderContext; import org.neo4j.coreedge.raft.LeaderContext;
import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.log.RaftLogEntry;
Expand All @@ -36,11 +34,12 @@
import org.neo4j.coreedge.raft.net.Outbound; import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.outcome.ShipCommand; import org.neo4j.coreedge.raft.outcome.ShipCommand;
import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static java.lang.String.format; import static java.lang.String.format;


public class RaftLogShippingManager implements RaftMembership.Listener public class RaftLogShippingManager extends LifecycleAdapter implements RaftMembership.Listener
{ {
private final Outbound<CoreMember, RaftMessages.RaftMessage> outbound; private final Outbound<CoreMember, RaftMessages.RaftMessage> outbound;
private final LogProvider logProvider; private final LogProvider logProvider;
Expand All @@ -58,7 +57,7 @@ public class RaftLogShippingManager implements RaftMembership.Listener
private LeaderContext lastLeaderContext; private LeaderContext lastLeaderContext;


private boolean running; private boolean running;
private boolean destroyed = false; private boolean stopped = false;


public RaftLogShippingManager( Outbound<CoreMember,RaftMessages.RaftMessage> outbound, LogProvider logProvider, public RaftLogShippingManager( Outbound<CoreMember,RaftMessages.RaftMessage> outbound, LogProvider logProvider,
ReadableRaftLog raftLog, ReadableRaftLog raftLog,
Expand All @@ -79,9 +78,23 @@ public RaftLogShippingManager( Outbound<CoreMember,RaftMessages.RaftMessage> out
membership.registerListener( this ); membership.registerListener( this );
} }


public synchronized void start( LeaderContext initialLeaderContext ) /**
* Paused when stepping down from leader role.
*/
public synchronized void pause()
{ {
if( destroyed ) running = false;

logShippers.values().forEach( RaftLogShipper::stop );
logShippers.clear();
}

/**
* Resumed when becoming leader.
*/
public synchronized void resume( LeaderContext initialLeaderContext )
{
if( stopped )
{ {
return; return;
} }
Expand All @@ -96,18 +109,11 @@ public synchronized void start( LeaderContext initialLeaderContext )
lastLeaderContext = initialLeaderContext; lastLeaderContext = initialLeaderContext;
} }


public synchronized void destroy() @Override
{
stop();
destroyed = true;
}

public synchronized void stop() public synchronized void stop()
{ {
running = false; pause();

stopped = true;
logShippers.values().forEach( RaftLogShipper::stop );
logShippers.clear();
} }


private RaftLogShipper ensureLogShipperRunning( CoreMember member, LeaderContext leaderContext ) private RaftLogShipper ensureLogShipperRunning( CoreMember member, LeaderContext leaderContext )
Expand Down
Expand Up @@ -575,6 +575,7 @@ expectedClusterSize, electionTimeout, systemUTC(),
myself, raftMembershipManager, electionTimeout, myself, raftMembershipManager, electionTimeout,
config.get( CoreEdgeClusterSettings.catchup_batch_size ), config.get( CoreEdgeClusterSettings.catchup_batch_size ),
config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap ); config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap );
life.add( logShipping );


RaftInstance raftInstance = RaftInstance raftInstance =
new RaftInstance( myself, termState, voteState, raftLog, raftStateMachine, electionTimeout, new RaftInstance( myself, termState, voteState, raftLog, raftStateMachine, electionTimeout,
Expand All @@ -593,15 +594,6 @@ expectedClusterSize, electionTimeout, systemUTC(),


life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) ); life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) );


life.add( new LifecycleAdapter()
{
@Override
public void shutdown() throws Throwable
{
logShipping.destroy();
}
} );

return raftInstance; return raftInstance;
} }


Expand Down
Expand Up @@ -109,7 +109,7 @@ public void teardown() throws InterruptedException
} }
for ( RaftInstance raft : rafts ) for ( RaftInstance raft : rafts )
{ {
raft.logShippingManager().destroy(); raft.logShippingManager().stop();
} }
} }
} }

0 comments on commit 812c036

Please sign in to comment.