Skip to content

Commit

Permalink
Attempt to fix a lifecycle issue
Browse files Browse the repository at this point in the history
Stop the batching message handler to foward new messages on shutdown
to the raft server. This is necessary to allow the raft server to
cleanly shutdown instead to wait for new messages to be processesed.
Indeed, such new messages may never being consumed, since we stop the
thread processing them.
  • Loading branch information
davidegrohmann committed Sep 26, 2016
1 parent 4d044c7 commit 582401d
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 24 deletions.
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.coreedge.core.consensus;

import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
Expand All @@ -44,7 +43,7 @@ public ContinuousJob( JobScheduler scheduler, JobScheduler.Group group, Runnable
this.scheduler = scheduler;
this.group = group;
this.task = task;
log = logProvider.getLog( getClass() );
this.log = logProvider.getLog( getClass() );
}

@Override
Expand Down
Expand Up @@ -28,35 +28,48 @@
import org.neo4j.coreedge.core.consensus.RaftMessages.RaftMessage;
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.messaging.Inbound.MessageHandler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.SECONDS;

public class BatchingMessageHandler implements Runnable, MessageHandler<RaftMessages.ClusterIdAwareMessage>
public class BatchingMessageHandler extends LifecycleAdapter
implements Runnable, MessageHandler<RaftMessages.ClusterIdAwareMessage>
{
private final MessageHandler<RaftMessages.ClusterIdAwareMessage> handler;
private final Log log;
private final BlockingQueue<RaftMessages.ClusterIdAwareMessage> messageQueue;

private final int maxBatch;
private final List<RaftMessages.ClusterIdAwareMessage> batch;
private final BlockingQueue<RaftMessages.ClusterIdAwareMessage> messageQueue;

private MessageHandler<RaftMessages.ClusterIdAwareMessage> handler;
private volatile boolean stopped;

public BatchingMessageHandler( MessageHandler<RaftMessages.ClusterIdAwareMessage> handler,
int queueSize, int maxBatch, LogProvider logProvider )
BatchingMessageHandler( MessageHandler<RaftMessages.ClusterIdAwareMessage> handler, int queueSize, int maxBatch,
LogProvider logProvider )
{
this.handler = handler;
this.log = logProvider.getLog( getClass() );
this.maxBatch = maxBatch;

this.batch = new ArrayList<>( maxBatch );
this.messageQueue = new ArrayBlockingQueue<>( queueSize );
}

@Override
public void stop()
{
stopped = true;
}

@Override
public void handle( RaftMessages.ClusterIdAwareMessage message )
{
if ( stopped )
{
log.warn( "This handler has been stopped, dropping the message: %s", message );
return;
}

try
{
messageQueue.put( message );
Expand Down
Expand Up @@ -92,6 +92,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
final FileSystemAbstraction fileSystem = platformModule.fileSystem;
final LifeSupport life = platformModule.life;
final Monitors monitors = platformModule.monitors;
final JobScheduler jobScheduler = platformModule.jobScheduler;

LogProvider logProvider = logging.getInternalLogProvider();
LogProvider userLogProvider = logging.getUserLogProvider();
Expand Down Expand Up @@ -153,20 +154,20 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
} );
}

CoreState coreState = new CoreState(
consensusModule.raftMachine(), localDatabase, clusteringModule.clusterIdentity(),
logProvider,
downloader,
CommandApplicationProcess commandApplicationProcess =
new CommandApplicationProcess( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(),
config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ),
databaseHealthSupplier, logProvider, replicationModule.getProgressTracker(),
lastFlushedStorage, replicationModule.getSessionTracker(), coreStateApplier,
consensusModule.inFlightMap(), platformModule.monitors ) );
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier,
logProvider, replicationModule.getProgressTracker(), lastFlushedStorage,
replicationModule.getSessionTracker(), coreStateApplier, consensusModule.inFlightMap(),
platformModule.monitors );
CoreState coreState =
new CoreState( consensusModule.raftMachine(), localDatabase, clusteringModule.clusterIdentity(),
logProvider, downloader, commandApplicationProcess );

dependencies.satisfyDependency( coreState );

life.add( new PruningScheduler( coreState, platformModule.jobScheduler,
life.add( new PruningScheduler( coreState, jobScheduler,
config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ), logProvider ) );

int queueSize = config.get( CoreEdgeClusterSettings.raft_in_queue_size );
Expand All @@ -178,8 +179,8 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data
long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout );

MembershipWaiter membershipWaiter =
new MembershipWaiter( identityModule.myself(), platformModule.jobScheduler, dbHealthSupplier,
electionTimeout * 4, logProvider );
new MembershipWaiter( identityModule.myself(), jobScheduler, dbHealthSupplier, electionTimeout * 4,
logProvider );
long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout );
membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter,
joinCatchupTimeout, consensusModule.raftMachine(), logProvider );
Expand All @@ -196,7 +197,8 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data

life.add( raftServer );
life.add( catchupServer );
life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ),
life.add( batchingMessageHandler );
life.add( new ContinuousJob( jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ),
batchingMessageHandler, logProvider ) );
life.add( coreState );
}
Expand Down
Expand Up @@ -25,11 +25,10 @@
public class LoggingInbound<M extends Message> implements Inbound<M>
{
private final Inbound<M> inbound;
private final MessageLogger messageLogger;
private final MessageLogger<MemberId> messageLogger;
private final MemberId me;

public LoggingInbound( Inbound<M> inbound, MessageLogger messageLogger,
MemberId me )
public LoggingInbound( Inbound<M> inbound, MessageLogger<MemberId> messageLogger, MemberId me )
{
this.inbound = inbound;
this.messageLogger = messageLogger;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.coreedge.identity.ClusterId;
import org.neo4j.coreedge.identity.ClusterIdentity;
import org.neo4j.coreedge.messaging.Inbound.MessageHandler;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.NullLogProvider;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -160,4 +161,26 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep
verify( raftStateMachine ).handle( messageB );
verify( raftStateMachine ).handle( messageD );
}

@Test
public void shouldDropMessagesAfterBeingStopped() throws Exception
{
// given
AssertableLogProvider logProvider = new AssertableLogProvider();
BatchingMessageHandler batchHandler = new BatchingMessageHandler(
raftStateMachine, QUEUE_SIZE, MAX_BATCH, logProvider );

RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage(
localClusterId, new RaftMessages.NewEntry.Request( null, null ) );
batchHandler.stop();

// when
batchHandler.handle( message );
batchHandler.run();

// then
verifyZeroInteractions( raftStateMachine );
logProvider.assertAtLeastOnce( AssertableLogProvider.inLog( BatchingMessageHandler.class )
.warn( "This handler has been stopped, dropping the message: %s", message ) );
}
}

0 comments on commit 582401d

Please sign in to comment.