Skip to content

Commit

Permalink
Extract CommandApplicationProcess from CoreState.
Browse files Browse the repository at this point in the history
CoreState now takes over storeId checking from BatchingMessageHandler.
  • Loading branch information
apcj committed Jul 29, 2016
1 parent b7f2f22 commit 05200ce
Show file tree
Hide file tree
Showing 14 changed files with 897 additions and 805 deletions.
Expand Up @@ -44,6 +44,7 @@
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal;
import org.neo4j.coreedge.raft.net.LoggingInbound;
import org.neo4j.coreedge.raft.state.CommandApplicationProcess;
import org.neo4j.coreedge.raft.state.CoreState;
import org.neo4j.coreedge.raft.state.CoreStateApplier;
import org.neo4j.coreedge.raft.state.CoreStateDownloader;
Expand Down Expand Up @@ -135,12 +136,14 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C

NotMyselfSelectionStrategy someoneElse = new NotMyselfSelectionStrategy( discoveryService, myself );

CoreState coreState = new CoreState( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(),
config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier,
logProvider, replicationModule.getProgressTracker(), lastFlushedStorage,
replicationModule.getSessionTracker(), someoneElse, coreStateApplier, downloader, inFlightMap,
platformModule.monitors );
CoreState coreState = new CoreState(
consensusModule.raftInstance(), localDatabase,
logProvider,
someoneElse, downloader,
new CommandApplicationProcess( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),

config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, logProvider, replicationModule.getProgressTracker(), lastFlushedStorage, replicationModule.getSessionTracker(), coreStateApplier,
inFlightMap, platformModule.monitors ) );

dependencies.satisfyDependency( coreState );

Expand All @@ -151,12 +154,12 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C
int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch );

BatchingMessageHandler batchingMessageHandler =
new BatchingMessageHandler( consensusModule.raftInstance(), logProvider, queueSize, maxBatch, localDatabase, coreState );
new BatchingMessageHandler( coreState, queueSize, maxBatch, logProvider );

long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout );

MembershipWaiter membershipWaiter =
new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, batchingMessageHandler, logProvider );
new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, coreState, logProvider );
long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout );
membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter,
joinCatchupTimeout, consensusModule.raftInstance(), logProvider );
Expand Down
Expand Up @@ -24,36 +24,28 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.raft.RaftMessages.RaftMessage;
import org.neo4j.coreedge.raft.net.Inbound.MessageHandler;
import org.neo4j.coreedge.raft.outcome.ConsensusOutcome;
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.SECONDS;

public class BatchingMessageHandler implements Runnable, MessageHandler<RaftMessages.StoreIdAwareMessage>, MismatchedStoreIdService
public class BatchingMessageHandler implements Runnable, MessageHandler<RaftMessages.StoreIdAwareMessage>
{
private final Log log;
private final RaftInstance raftInstance;
private final BlockingQueue<RaftMessages.StoreIdAwareMessage> messageQueue;

private final int maxBatch;
private final List<RaftMessages.RaftMessage> batch;
private final List<RaftMessages.StoreIdAwareMessage> batch;

private final LocalDatabase localDatabase;
private RaftStateMachine raftStateMachine;
private final List<MismatchedStoreListener> listeners = new ArrayList<>( );
private MessageHandler<RaftMessages.StoreIdAwareMessage> handler;

public BatchingMessageHandler( RaftInstance raftInstance, LogProvider logProvider,
int queueSize, int maxBatch, LocalDatabase localDatabase,
RaftStateMachine raftStateMachine )
public BatchingMessageHandler( MessageHandler<RaftMessages.StoreIdAwareMessage> handler,
int queueSize, int maxBatch, LogProvider logProvider )
{
this.raftInstance = raftInstance;
this.localDatabase = localDatabase;
this.raftStateMachine = raftStateMachine;
this.handler = handler;
this.log = logProvider.getLog( getClass() );
this.maxBatch = maxBatch;

Expand Down Expand Up @@ -89,89 +81,46 @@ public void run()

if ( message != null )
{
RaftMessages.RaftMessage innerMessage = message.message();
StoreId storeId = message.storeId();

if ( message.storeId().equals( localDatabase.storeId() ) )
{
if ( messageQueue.isEmpty() )
{
innerHandle( message.message() );
}
else
{
batch.clear();
batch.add( innerMessage );
drain( messageQueue, batch, maxBatch - 1 );
collateAndHandleBatch( batch );
}
}
else
{
if ( localDatabase.isEmpty() )
{
log.info( "StoreId mismatch but store was empty so downloading new store from %s. Expected: " +
"%s, Encountered: %s. ", innerMessage.from(), storeId, localDatabase.storeId() );
raftStateMachine.downloadSnapshot( innerMessage.from() );
}
else
{
log.info( "Discarding message[%s] owing to mismatched storeId and non-empty store. " +
"Expected: %s, Encountered: %s", innerMessage, storeId, localDatabase.storeId() );
listeners.forEach( l -> {
MismatchedStoreIdException ex = new MismatchedStoreIdException( storeId, localDatabase.storeId() );
l.onMismatchedStore( ex );
} );
}

}
}
}

private void innerHandle( RaftMessage raftMessage )
{
try
{
ConsensusOutcome outcome = raftInstance.handle( raftMessage );
if ( outcome.needsFreshSnapshot() )
if ( messageQueue.isEmpty() )
{
raftStateMachine.notifyNeedFreshSnapshot();
handler.handle( message );
}
else
{
raftStateMachine.notifyCommitted( outcome.getCommitIndex());
batch.clear();
batch.add( message );
drain( messageQueue, batch, maxBatch - 1 );
collateAndHandleBatch( batch );
}
}
catch ( Throwable e )
{
raftInstance.stopTimers();
localDatabase.panic( e );
}
}

private void drain( BlockingQueue<RaftMessages.StoreIdAwareMessage> messageQueue,
List<RaftMessage> batch, int maxElements )
List<RaftMessages.StoreIdAwareMessage> batch, int maxElements )
{
List<RaftMessages.StoreIdAwareMessage> tempDraining = new ArrayList<>();
messageQueue.drainTo( tempDraining, maxElements );

for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : tempDraining )
{
batch.add( storeIdAwareMessage.message() );
batch.add( storeIdAwareMessage );
}
}

public void addMismatchedStoreListener( BatchingMessageHandler.MismatchedStoreListener listener )
{
listeners.add(listener);
}

private void collateAndHandleBatch( List<RaftMessages.RaftMessage> batch )
private void collateAndHandleBatch( List<RaftMessages.StoreIdAwareMessage> batch )
{
RaftMessages.NewEntry.Batch batchRequest = null;
StoreId storeId = batch.get( 0 ).storeId();

for ( RaftMessages.RaftMessage message : batch )
for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : batch )
{
if ( batchRequest != null && !storeIdAwareMessage.storeId().equals( storeId ))
{
handler.handle( new RaftMessages.StoreIdAwareMessage( storeId, batchRequest ) );
batchRequest = null;
}
storeId = storeIdAwareMessage.storeId();
RaftMessage message = storeIdAwareMessage.message();
if ( message instanceof RaftMessages.NewEntry.Request )
{
RaftMessages.NewEntry.Request newEntryRequest = (RaftMessages.NewEntry.Request) message;
Expand All @@ -184,13 +133,13 @@ private void collateAndHandleBatch( List<RaftMessages.RaftMessage> batch )
}
else
{
innerHandle( message );
handler.handle( storeIdAwareMessage );
}
}

if ( batchRequest != null )
{
innerHandle( batchRequest );
handler.handle( new RaftMessages.StoreIdAwareMessage( storeId, batchRequest ) );
}
}
}
Expand Up @@ -41,4 +41,6 @@ public interface RaftStateMachine
void notifyNeedFreshSnapshot();

void downloadSnapshot( MemberId from );

void innerHandle( RaftMessages.StoreIdAwareMessage raftMessage );
}
Expand Up @@ -22,7 +22,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.neo4j.coreedge.raft.BatchingMessageHandler;
import org.neo4j.coreedge.raft.MismatchedStoreIdService;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.server.MemberId;
Expand All @@ -31,6 +30,7 @@
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;

/**
Expand Down Expand Up @@ -139,7 +139,7 @@ private boolean caughtUpWithLeader()
}

@Override
public void onMismatchedStore(BatchingMessageHandler.MismatchedStoreIdException ex)
public void onMismatchedStore( MismatchedStoreIdService.MismatchedStoreIdException ex )
{
catchUpFuture.completeExceptionally( ex );
}
Expand Down

0 comments on commit 05200ce

Please sign in to comment.