diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/CoreServerModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/CoreServerModule.java index 7b5e116d65ccd..eb73df5da4c91 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/CoreServerModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/CoreServerModule.java @@ -44,6 +44,7 @@ import org.neo4j.coreedge.raft.membership.MembershipWaiter; import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal; import org.neo4j.coreedge.raft.net.LoggingInbound; +import org.neo4j.coreedge.raft.state.CommandApplicationProcess; import org.neo4j.coreedge.raft.state.CoreState; import org.neo4j.coreedge.raft.state.CoreStateApplier; import org.neo4j.coreedge.raft.state.CoreStateDownloader; @@ -135,12 +136,14 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C NotMyselfSelectionStrategy someoneElse = new NotMyselfSelectionStrategy( discoveryService, myself ); - CoreState coreState = new CoreState( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(), - config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), - config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, - logProvider, replicationModule.getProgressTracker(), lastFlushedStorage, - replicationModule.getSessionTracker(), someoneElse, coreStateApplier, downloader, inFlightMap, - platformModule.monitors ); + CoreState coreState = new CoreState( + consensusModule.raftInstance(), localDatabase, + logProvider, + someoneElse, downloader, + new CommandApplicationProcess( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), + + config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, logProvider, replicationModule.getProgressTracker(), lastFlushedStorage, replicationModule.getSessionTracker(), coreStateApplier, + inFlightMap, platformModule.monitors ) ); dependencies.satisfyDependency( coreState ); @@ -151,12 +154,12 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch ); BatchingMessageHandler batchingMessageHandler = - new BatchingMessageHandler( consensusModule.raftInstance(), logProvider, queueSize, maxBatch, localDatabase, coreState ); + new BatchingMessageHandler( coreState, queueSize, maxBatch, logProvider ); long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); MembershipWaiter membershipWaiter = - new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, batchingMessageHandler, logProvider ); + new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, coreState, logProvider ); long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout ); membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, consensusModule.raftInstance(), logProvider ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java index fb56b3d546cb0..891e9b939a73e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java @@ -24,36 +24,28 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages.RaftMessage; import org.neo4j.coreedge.raft.net.Inbound.MessageHandler; -import org.neo4j.coreedge.raft.outcome.ConsensusOutcome; import org.neo4j.coreedge.server.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import static java.util.concurrent.TimeUnit.SECONDS; -public class BatchingMessageHandler implements Runnable, MessageHandler, MismatchedStoreIdService +public class BatchingMessageHandler implements Runnable, MessageHandler { private final Log log; - private final RaftInstance raftInstance; private final BlockingQueue messageQueue; private final int maxBatch; - private final List batch; + private final List batch; - private final LocalDatabase localDatabase; - private RaftStateMachine raftStateMachine; - private final List listeners = new ArrayList<>( ); + private MessageHandler handler; - public BatchingMessageHandler( RaftInstance raftInstance, LogProvider logProvider, - int queueSize, int maxBatch, LocalDatabase localDatabase, - RaftStateMachine raftStateMachine ) + public BatchingMessageHandler( MessageHandler handler, + int queueSize, int maxBatch, LogProvider logProvider ) { - this.raftInstance = raftInstance; - this.localDatabase = localDatabase; - this.raftStateMachine = raftStateMachine; + this.handler = handler; this.log = logProvider.getLog( getClass() ); this.maxBatch = maxBatch; @@ -89,89 +81,46 @@ public void run() if ( message != null ) { - RaftMessages.RaftMessage innerMessage = message.message(); - StoreId storeId = message.storeId(); - - if ( message.storeId().equals( localDatabase.storeId() ) ) - { - if ( messageQueue.isEmpty() ) - { - innerHandle( message.message() ); - } - else - { - batch.clear(); - batch.add( innerMessage ); - drain( messageQueue, batch, maxBatch - 1 ); - collateAndHandleBatch( batch ); - } - } - else - { - if ( localDatabase.isEmpty() ) - { - log.info( "StoreId mismatch but store was empty so downloading new store from %s. Expected: " + - "%s, Encountered: %s. ", innerMessage.from(), storeId, localDatabase.storeId() ); - raftStateMachine.downloadSnapshot( innerMessage.from() ); - } - else - { - log.info( "Discarding message[%s] owing to mismatched storeId and non-empty store. " + - "Expected: %s, Encountered: %s", innerMessage, storeId, localDatabase.storeId() ); - listeners.forEach( l -> { - MismatchedStoreIdException ex = new MismatchedStoreIdException( storeId, localDatabase.storeId() ); - l.onMismatchedStore( ex ); - } ); - } - - } - } - } - - private void innerHandle( RaftMessage raftMessage ) - { - try - { - ConsensusOutcome outcome = raftInstance.handle( raftMessage ); - if ( outcome.needsFreshSnapshot() ) + if ( messageQueue.isEmpty() ) { - raftStateMachine.notifyNeedFreshSnapshot(); + handler.handle( message ); } else { - raftStateMachine.notifyCommitted( outcome.getCommitIndex()); + batch.clear(); + batch.add( message ); + drain( messageQueue, batch, maxBatch - 1 ); + collateAndHandleBatch( batch ); } } - catch ( Throwable e ) - { - raftInstance.stopTimers(); - localDatabase.panic( e ); - } } private void drain( BlockingQueue messageQueue, - List batch, int maxElements ) + List batch, int maxElements ) { List tempDraining = new ArrayList<>(); messageQueue.drainTo( tempDraining, maxElements ); for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : tempDraining ) { - batch.add( storeIdAwareMessage.message() ); + batch.add( storeIdAwareMessage ); } } - public void addMismatchedStoreListener( BatchingMessageHandler.MismatchedStoreListener listener ) - { - listeners.add(listener); - } - - private void collateAndHandleBatch( List batch ) + private void collateAndHandleBatch( List batch ) { RaftMessages.NewEntry.Batch batchRequest = null; + StoreId storeId = batch.get( 0 ).storeId(); - for ( RaftMessages.RaftMessage message : batch ) + for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : batch ) { + if ( batchRequest != null && !storeIdAwareMessage.storeId().equals( storeId )) + { + handler.handle( new RaftMessages.StoreIdAwareMessage( storeId, batchRequest ) ); + batchRequest = null; + } + storeId = storeIdAwareMessage.storeId(); + RaftMessage message = storeIdAwareMessage.message(); if ( message instanceof RaftMessages.NewEntry.Request ) { RaftMessages.NewEntry.Request newEntryRequest = (RaftMessages.NewEntry.Request) message; @@ -184,13 +133,13 @@ private void collateAndHandleBatch( List batch ) } else { - innerHandle( message ); + handler.handle( storeIdAwareMessage ); } } if ( batchRequest != null ) { - innerHandle( batchRequest ); + handler.handle( new RaftMessages.StoreIdAwareMessage( storeId, batchRequest ) ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java index bf866daf19582..b6d55357adc1d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftStateMachine.java @@ -41,4 +41,6 @@ public interface RaftStateMachine void notifyNeedFreshSnapshot(); void downloadSnapshot( MemberId from ); + + void innerHandle( RaftMessages.StoreIdAwareMessage raftMessage ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java index a8ced62d74542..97cd81f521dd4 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java @@ -22,7 +22,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; -import org.neo4j.coreedge.raft.BatchingMessageHandler; import org.neo4j.coreedge.raft.MismatchedStoreIdService; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.coreedge.server.MemberId; @@ -31,6 +30,7 @@ import org.neo4j.logging.LogProvider; import static java.util.concurrent.TimeUnit.MILLISECONDS; + import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED; /** @@ -139,7 +139,7 @@ private boolean caughtUpWithLeader() } @Override - public void onMismatchedStore(BatchingMessageHandler.MismatchedStoreIdException ex) + public void onMismatchedStore( MismatchedStoreIdService.MismatchedStoreIdException ex ) { catchUpFuture.completeExceptionally( ex ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CommandApplicationProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CommandApplicationProcess.java new file mode 100644 index 0000000000000..f52df2f643a48 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CommandApplicationProcess.java @@ -0,0 +1,297 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.state; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +import org.neo4j.coreedge.SessionTracker; +import org.neo4j.coreedge.raft.log.RaftLog; +import org.neo4j.coreedge.raft.log.RaftLogEntry; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; +import org.neo4j.coreedge.raft.log.segmented.InFlightMap; +import org.neo4j.coreedge.raft.replication.DistributedOperation; +import org.neo4j.coreedge.raft.replication.ProgressTracker; +import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; +import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.logging.Log; +import org.neo4j.logging.LogProvider; + +import static java.lang.Math.max; +import static java.lang.String.format; + +public class CommandApplicationProcess extends LifecycleAdapter +{ + private static final long NOTHING = -1; + private final RaftLog raftLog; + private final StateStorage lastFlushedStorage; + private final int flushEvery; + private final ProgressTracker progressTracker; + private final SessionTracker sessionTracker; + private final Supplier dbHealth; + private final InFlightMap inFlightMap; + private final Log log; + private final CoreStateApplier applier; + private final RaftLogCommitIndexMonitor commitIndexMonitor; + private final OperationBatcher batcher; + + private CoreStateMachines coreStateMachines; + + private long lastApplied = NOTHING; + private long lastSeenCommitIndex = NOTHING; + private long lastFlushed = NOTHING; + + public CommandApplicationProcess( + CoreStateMachines coreStateMachines, + RaftLog raftLog, + int maxBatchSize, + int flushEvery, + Supplier dbHealth, + LogProvider logProvider, + ProgressTracker progressTracker, + StateStorage lastFlushedStorage, + SessionTracker sessionTracker, + CoreStateApplier applier, + InFlightMap inFlightMap, + Monitors monitors ) + { + this.coreStateMachines = coreStateMachines; + this.raftLog = raftLog; + this.lastFlushedStorage = lastFlushedStorage; + this.flushEvery = flushEvery; + this.progressTracker = progressTracker; + this.sessionTracker = sessionTracker; + this.applier = applier; + this.log = logProvider.getLog( getClass() ); + this.dbHealth = dbHealth; + this.inFlightMap = inFlightMap; + this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() ); + this.batcher = new OperationBatcher( maxBatchSize ); + } + + public synchronized void notifyCommitted( long commitIndex ) + { + assert this.lastSeenCommitIndex <= commitIndex; + if ( this.lastSeenCommitIndex < commitIndex ) + { + this.lastSeenCommitIndex = commitIndex; + submitApplyJob( commitIndex ); + commitIndexMonitor.commitIndex( commitIndex ); + } + } + + private void submitApplyJob( long lastToApply ) + { + applier.submit( ( status ) -> () -> { + try ( InFlightLogEntryReader logEntrySupplier = new InFlightLogEntryReader( raftLog, inFlightMap, true ) ) + { + for ( long logIndex = lastApplied + 1; !status.isCancelled() && logIndex <= lastToApply; logIndex++ ) + { + RaftLogEntry entry = logEntrySupplier.get( logIndex ); + if ( entry == null ) + { + throw new IllegalStateException( "Committed log entry must exist." ); + } + + if ( entry.content() instanceof DistributedOperation ) + { + DistributedOperation distributedOperation = (DistributedOperation) entry.content(); + progressTracker.trackReplication( distributedOperation ); + batcher.add( logIndex, distributedOperation ); + } + else + { + batcher.flush(); + lastApplied = logIndex; + } + } + batcher.flush(); + } + catch ( Throwable e ) + { + log.error( "Failed to apply up to index " + lastToApply, e ); + dbHealth.get().panic( e ); + } + } ); + } + + public synchronized long lastApplied() + { + return lastApplied; + } + + public void sync() throws InterruptedException + { + applier.sync( true ); + } + + private class OperationBatcher + { + private List batch; + private int maxBatchSize; + private long lastIndex; + + OperationBatcher( int maxBatchSize ) + { + this.batch = new ArrayList<>( maxBatchSize ); + this.maxBatchSize = maxBatchSize; + } + + private void add( long index, DistributedOperation operation ) throws Exception + { + if ( batch.size() > 0 ) + { + assert index == (lastIndex + 1); + } + + batch.add( operation ); + lastIndex = index; + + if ( batch.size() == maxBatchSize ) + { + flush(); + } + } + + private void flush() throws Exception + { + if ( batch.size() == 0 ) + { + return; + } + + long startIndex = lastIndex - batch.size() + 1; + handleOperations( startIndex, batch ); + lastApplied = lastIndex; + + batch.clear(); + maybeFlush(); + } + } + + void prune() throws IOException + { + raftLog.prune( lastFlushed ); + } + + private void handleOperations( long commandIndex, List operations ) + { + try ( CommandDispatcher dispatcher = coreStateMachines.commandDispatcher() ) + { + for ( DistributedOperation operation : operations ) + { + if ( !sessionTracker.validateOperation( operation.globalSession(), operation.operationId() ) ) + { + commandIndex++; + continue; + } + + CoreReplicatedContent command = (CoreReplicatedContent) operation.content(); + command.dispatch( dispatcher, commandIndex, + result -> progressTracker.trackResult( operation, result ) ); + + sessionTracker.update( operation.globalSession(), operation.operationId(), commandIndex ); + commandIndex++; + } + } + } + + private void maybeFlush() throws IOException + { + if ( (lastApplied - lastFlushed) > flushEvery ) + { + flush(); + } + } + + private void flush() throws IOException + { + coreStateMachines.flush(); + sessionTracker.flush(); + lastFlushedStorage.persistStoreData( lastApplied ); + lastFlushed = lastApplied; + } + + @Override + public synchronized void start() throws IOException, InterruptedException + { + lastFlushed = lastApplied = lastFlushedStorage.getInitialState(); + log.info( format( "Restoring last applied index to %d", lastApplied ) ); + sessionTracker.start(); + + /* Considering the order in which state is flushed, the state machines will + * always be furthest ahead and indicate the furthest possible state to + * which we must replay to reach a consistent state. */ + long lastPossiblyApplying = max( coreStateMachines.getLastAppliedIndex(), sessionTracker.getLastAppliedIndex() ); + + if ( lastPossiblyApplying > lastApplied ) + { + log.info( "Recovering up to: " + lastPossiblyApplying ); + submitApplyJob( lastPossiblyApplying ); + applier.sync( false ); + } + } + + @Override + public synchronized void stop() throws InterruptedException, IOException + { + applier.sync( true ); + flush(); + } + + public synchronized CoreSnapshot snapshot() throws IOException, InterruptedException + { + applier.sync( false ); + + long prevIndex = lastApplied; + long prevTerm = raftLog.readEntryTerm( prevIndex ); + CoreSnapshot coreSnapshot = new CoreSnapshot( prevIndex, prevTerm ); + + coreStateMachines.addSnapshots( coreSnapshot ); + sessionTracker.addSnapshots( coreSnapshot ); + + return coreSnapshot; + } + + synchronized void installSnapshot( CoreSnapshot coreSnapshot ) + { + coreStateMachines.installSnapshots( coreSnapshot ); + long snapshotPrevIndex = coreSnapshot.prevIndex(); + try + { + if ( snapshotPrevIndex > 1 ) + { + raftLog.skip( snapshotPrevIndex, coreSnapshot.prevTerm() ); + } + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + this.lastApplied = this.lastFlushed = snapshotPrevIndex; + log.info( format( "Skipping lastApplied index forward to %d", snapshotPrevIndex ) ); + + sessionTracker.installSnapshots( coreSnapshot ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index ce2b05fc5599e..686b8fd259343 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -22,183 +22,107 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.function.Supplier; -import org.neo4j.coreedge.SessionTracker; +import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; import org.neo4j.coreedge.discovery.CoreMemberSelectionException; -import org.neo4j.coreedge.raft.RaftStateMachine; -import org.neo4j.coreedge.raft.log.RaftLog; -import org.neo4j.coreedge.raft.log.RaftLogEntry; -import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; +import org.neo4j.coreedge.raft.MismatchedStoreIdService; +import org.neo4j.coreedge.raft.RaftInstance; +import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.log.pruning.LogPruner; -import org.neo4j.coreedge.raft.log.segmented.InFlightMap; -import org.neo4j.coreedge.raft.replication.DistributedOperation; -import org.neo4j.coreedge.raft.replication.ProgressTracker; -import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; +import org.neo4j.coreedge.raft.net.Inbound.MessageHandler; +import org.neo4j.coreedge.raft.outcome.ConsensusOutcome; import org.neo4j.coreedge.server.MemberId; +import org.neo4j.coreedge.server.StoreId; import org.neo4j.coreedge.server.edge.CoreMemberSelectionStrategy; -import org.neo4j.kernel.internal.DatabaseHealth; -import org.neo4j.kernel.lifecycle.LifecycleAdapter; -import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import static java.lang.Math.max; -import static java.lang.String.format; - -public class CoreState extends LifecycleAdapter implements RaftStateMachine, LogPruner +public class CoreState implements MessageHandler, LogPruner, MismatchedStoreIdService, Lifecycle { - private static final long NOTHING = -1; - private final RaftLog raftLog; - private final StateStorage lastFlushedStorage; - private final int flushEvery; - private final ProgressTracker progressTracker; - private final SessionTracker sessionTracker; - private final Supplier dbHealth; - private final InFlightMap inFlightMap; + private final RaftInstance raftInstance; + private final LocalDatabase localDatabase; private final Log log; - private final CoreStateApplier applier; private final CoreMemberSelectionStrategy someoneElse; private final CoreStateDownloader downloader; - private final RaftLogCommitIndexMonitor commitIndexMonitor; - private final OperationBatcher batcher; - - private CoreStateMachines coreStateMachines; - - private long lastApplied = NOTHING; - private long lastSeenCommitIndex = NOTHING; - private long lastFlushed = NOTHING; + private final List listeners = new ArrayList<>( ); + private final CommandApplicationProcess applicationProcess; public CoreState( - CoreStateMachines coreStateMachines, RaftLog raftLog, - int maxBatchSize, - int flushEvery, - Supplier dbHealth, + RaftInstance raftInstance, + LocalDatabase localDatabase, LogProvider logProvider, - ProgressTracker progressTracker, - StateStorage lastFlushedStorage, - SessionTracker sessionTracker, CoreMemberSelectionStrategy someoneElse, - CoreStateApplier applier, CoreStateDownloader downloader, - InFlightMap inFlightMap, - Monitors monitors ) + CommandApplicationProcess commandApplicationProcess ) { - this.coreStateMachines = coreStateMachines; - this.raftLog = raftLog; - this.lastFlushedStorage = lastFlushedStorage; - this.flushEvery = flushEvery; - this.progressTracker = progressTracker; - this.sessionTracker = sessionTracker; + this.raftInstance = raftInstance; + this.localDatabase = localDatabase; this.someoneElse = someoneElse; - this.applier = applier; this.downloader = downloader; this.log = logProvider.getLog( getClass() ); - this.dbHealth = dbHealth; - this.inFlightMap = inFlightMap; - this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() ); - this.batcher = new OperationBatcher( maxBatchSize ); + this.applicationProcess = commandApplicationProcess; } - @Override - public synchronized void notifyCommitted( long commitIndex ) + public void handle( RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) { - assert this.lastSeenCommitIndex <= commitIndex; - if ( this.lastSeenCommitIndex < commitIndex ) + // Break out each if branch into a new CoreState instance + StoreId storeId = storeIdAwareMessage.storeId(); + if ( storeId.equals( localDatabase.storeId() ) ) { - this.lastSeenCommitIndex = commitIndex; - submitApplyJob( commitIndex ); - commitIndexMonitor.commitIndex( commitIndex ); - } - } - - private void submitApplyJob( long lastToApply ) - { - applier.submit( ( status ) -> () -> { - try ( InFlightLogEntryReader logEntrySupplier = new InFlightLogEntryReader( raftLog, inFlightMap, true ) ) + try { - for ( long logIndex = lastApplied + 1; !status.isCancelled() && logIndex <= lastToApply; logIndex++ ) + ConsensusOutcome outcome = raftInstance.handle( storeIdAwareMessage.message() ); + if ( outcome.needsFreshSnapshot() ) { - RaftLogEntry entry = logEntrySupplier.get( logIndex ); - if ( entry == null ) - { - throw new IllegalStateException( "Committed log entry must exist." ); - } - - if ( entry.content() instanceof DistributedOperation ) - { - DistributedOperation distributedOperation = (DistributedOperation) entry.content(); - progressTracker.trackReplication( distributedOperation ); - batcher.add( logIndex, distributedOperation ); - } - else - { - batcher.flush(); - lastApplied = logIndex; - } + notifyNeedFreshSnapshot(); + } + else + { + notifyCommitted( outcome.getCommitIndex()); } - batcher.flush(); } catch ( Throwable e ) { - log.error( "Failed to apply up to index " + lastToApply, e ); - dbHealth.get().panic( e ); + raftInstance.stopTimers(); + localDatabase.panic( e ); } - } ); - } - public synchronized long lastApplied() - { - return lastApplied; - } - - private class OperationBatcher - { - private List batch; - private int maxBatchSize; - private long lastIndex; - - OperationBatcher( int maxBatchSize ) - { - this.batch = new ArrayList<>( maxBatchSize ); - this.maxBatchSize = maxBatchSize; } - - private void add( long index, DistributedOperation operation ) throws Exception + else { - if ( batch.size() > 0 ) + RaftMessages.RaftMessage message = storeIdAwareMessage.message(); + if ( localDatabase.isEmpty() ) { - assert index == (lastIndex + 1); + log.info( "StoreId mismatch but store was empty so downloading new store from %s. Expected: " + + "%s, Encountered: %s. ", message.from(), storeId, localDatabase.storeId() ); + downloadSnapshot( message.from() ); } - - batch.add( operation ); - lastIndex = index; - - if ( batch.size() == maxBatchSize ) + else { - flush(); + log.info( "Discarding message[%s] owing to mismatched storeId and non-empty store. " + + "Expected: %s, Encountered: %s", message, storeId, localDatabase.storeId() ); + listeners.forEach( l -> { + MismatchedStoreIdService.MismatchedStoreIdException ex = new MismatchedStoreIdService.MismatchedStoreIdException( storeId, localDatabase.storeId() ); + l.onMismatchedStore( ex ); + } ); } - } - private void flush() throws Exception - { - if ( batch.size() == 0 ) - { - return; - } + } + } - long startIndex = lastIndex - batch.size() + 1; - handleOperations( startIndex, batch ); - lastApplied = lastIndex; + public void addMismatchedStoreListener( MismatchedStoreListener listener ) + { + listeners.add(listener); + } - batch.clear(); - maybeFlush(); - } + private synchronized void notifyCommitted( long commitIndex ) + { + applicationProcess.notifyCommitted( commitIndex ); } - @Override - public synchronized void notifyNeedFreshSnapshot() + private synchronized void notifyNeedFreshSnapshot() { try { @@ -210,11 +134,6 @@ public synchronized void notifyNeedFreshSnapshot() } } - public void compact() throws IOException - { - raftLog.prune( lastFlushed ); - } - /** * Attempts to download a fresh snapshot from another core instance. * @@ -224,7 +143,7 @@ public synchronized void downloadSnapshot( MemberId source ) { try { - applier.sync( true ); + applicationProcess.sync(); downloader.downloadSnapshot( source, this ); } catch ( InterruptedException | StoreCopyFailedException e ) @@ -233,109 +152,43 @@ public synchronized void downloadSnapshot( MemberId source ) } } - private void handleOperations( long commandIndex, List operations ) - { - try ( CommandDispatcher dispatcher = coreStateMachines.commandDispatcher() ) - { - for ( DistributedOperation operation : operations ) - { - if ( !sessionTracker.validateOperation( operation.globalSession(), operation.operationId() ) ) - { - commandIndex++; - continue; - } - - CoreReplicatedContent command = (CoreReplicatedContent) operation.content(); - command.dispatch( dispatcher, commandIndex, - result -> progressTracker.trackResult( operation, result ) ); - - sessionTracker.update( operation.globalSession(), operation.operationId(), commandIndex ); - commandIndex++; - } - } - } - - private void maybeFlush() throws IOException + public synchronized CoreSnapshot snapshot() throws IOException, InterruptedException { - if ( (lastApplied - lastFlushed) > flushEvery ) - { - flush(); - } + return applicationProcess.snapshot(); } - private void flush() throws IOException + synchronized void installSnapshot( CoreSnapshot coreSnapshot ) { - coreStateMachines.flush(); - sessionTracker.flush(); - lastFlushedStorage.persistStoreData( lastApplied ); - lastFlushed = lastApplied; + applicationProcess.installSnapshot( coreSnapshot ); } @Override - public synchronized void start() throws IOException, InterruptedException + public void prune() throws IOException { - lastFlushed = lastApplied = lastFlushedStorage.getInitialState(); - log.info( format( "Restoring last applied index to %d", lastApplied ) ); - sessionTracker.start(); - - /* Considering the order in which state is flushed, the state machines will - * always be furthest ahead and indicate the furthest possible state to - * which we must replay to reach a consistent state. */ - long lastPossiblyApplying = max( coreStateMachines.getLastAppliedIndex(), sessionTracker.getLastAppliedIndex() ); - - if ( lastPossiblyApplying > lastApplied ) - { - log.info( "Recovering up to: " + lastPossiblyApplying ); - submitApplyJob( lastPossiblyApplying ); - applier.sync( false ); - } + applicationProcess.prune(); } @Override - public synchronized void stop() throws Throwable + public void start() throws IOException, InterruptedException { - applier.sync( true ); - flush(); + applicationProcess.start(); } - public synchronized CoreSnapshot snapshot() throws IOException, InterruptedException + @Override + public void stop() throws IOException, InterruptedException { - applier.sync( false ); - - long prevIndex = lastApplied; - long prevTerm = raftLog.readEntryTerm( prevIndex ); - CoreSnapshot coreSnapshot = new CoreSnapshot( prevIndex, prevTerm ); - - coreStateMachines.addSnapshots( coreSnapshot ); - sessionTracker.addSnapshots( coreSnapshot ); - - return coreSnapshot; + applicationProcess.stop(); } - synchronized void installSnapshot( CoreSnapshot coreSnapshot ) + @Override + public void init() throws Throwable { - coreStateMachines.installSnapshots( coreSnapshot ); - long snapshotPrevIndex = coreSnapshot.prevIndex(); - try - { - if ( snapshotPrevIndex > 1 ) - { - raftLog.skip( snapshotPrevIndex, coreSnapshot.prevTerm() ); - } - } - catch ( IOException e ) - { - throw new RuntimeException( e ); - } - this.lastApplied = this.lastFlushed = snapshotPrevIndex; - log.info( format( "Skipping lastApplied index forward to %d", snapshotPrevIndex ) ); - - sessionTracker.installSnapshots( coreSnapshot ); + applicationProcess.init(); } @Override - public void prune() throws IOException + public void shutdown() throws Throwable { - compact(); + applicationProcess.shutdown(); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java index e121300307f58..d05a3f1b4e134 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java @@ -166,7 +166,7 @@ public void laggingFollowerShouldDownloadSnapshot() throws Exception for ( CoreClusterMember db : cluster.coreMembers() ) { - db.coreState().compact(); + db.coreState().prune(); } // WHEN @@ -201,7 +201,7 @@ public void badFollowerShouldNotJoinCluster() throws Exception for ( CoreClusterMember db : cluster.coreMembers() ) { - db.coreState().compact(); + db.coreState().prune(); } // WHEN @@ -231,7 +231,7 @@ public void aNewServerShouldJoinTheClusterByDownloadingASnapshot() throws Except for ( CoreClusterMember db : cluster.coreMembers() ) { - db.coreState().compact(); + db.coreState().prune(); } // WHEN diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java index 022686cc2dd9e..77b95ee9e4ffc 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java @@ -19,25 +19,20 @@ */ package org.neo4j.coreedge.raft; -import org.junit.Before; -import org.junit.Test; - import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.junit.Test; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.raft.outcome.ConsensusOutcome; +import org.neo4j.coreedge.raft.net.Inbound.MessageHandler; import org.neo4j.coreedge.server.StoreId; -import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.NullLogProvider; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; @@ -46,7 +41,7 @@ public class BatchingMessageHandlerTest private static final int MAX_BATCH = 16; private static final int QUEUE_SIZE = 64; private LocalDatabase localDatabase = mock( LocalDatabase.class ); - private RaftStateMachine raftStateMachine = mock( RaftStateMachine.class ); + private MessageHandler raftStateMachine = mock( MessageHandler.class ); private StoreId localStoreId = new StoreId( 1, 2, 3, 4 ); @Before @@ -55,112 +50,33 @@ public void setup() when( localDatabase.storeId() ).thenReturn( localStoreId ); } - @Test - public void shouldDownloadSnapshotOnStoreIdMismatch() throws Exception - { - // given - RaftInstance innerHandler = mock( RaftInstance.class ); - when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); - - when( localDatabase.isEmpty() ).thenReturn( true ); - BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); - RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null ); - - StoreId otherStoreId = new StoreId( 5, 6, 7, 8 ); - - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( otherStoreId, message ) ); - - // when - batchHandler.run(); - - // then - verifyNoMoreInteractions( innerHandler ); - verify( raftStateMachine ).downloadSnapshot( message.from() ); - } - - @Test - public void shouldLogOnStoreIdMismatchAndNonEmptyStore() throws Exception - { - // given - RaftInstance innerHandler = mock( RaftInstance.class ); - when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); - - when( localDatabase.isEmpty() ).thenReturn( false ); - AssertableLogProvider logProvider = new AssertableLogProvider(); - BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, logProvider, QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); - RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null ); - - StoreId otherStoreId = new StoreId( 5, 6, 7, 8 ); - - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( otherStoreId, message ) ); - - // when - batchHandler.run(); - - // then - verifyNoMoreInteractions( innerHandler ); - logProvider.assertContainsLogCallContaining( "Discarding message" ); - } - - @Test - public void shouldInformListenersOnStoreIdMismatch() throws Exception - { - // given - RaftInstance innerHandler = mock( RaftInstance.class ); - when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); - - when( localDatabase.isEmpty() ).thenReturn( false ); - BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); - RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null ); - - StoreId otherStoreId = new StoreId( 5, 6, 7, 8 ); - - AtomicBoolean listenerInvoked = new AtomicBoolean( false ); - batchHandler.addMismatchedStoreListener( ex -> listenerInvoked.set( true ) ); - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( otherStoreId, message ) ); - - // when - batchHandler.run(); - - // then - verifyNoMoreInteractions( innerHandler ); - assertTrue(listenerInvoked.get()); - } - @Test public void shouldInvokeInnerHandlerWhenRun() throws Exception { // given - RaftInstance innerHandler = mock( RaftInstance.class ); - when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); - BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); - RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null ); + raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance() ); - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, message ) ); - verifyZeroInteractions( innerHandler ); + RaftMessages.StoreIdAwareMessage message = new RaftMessages.StoreIdAwareMessage( + localStoreId, new RaftMessages.NewEntry.Request( null, null ) ); + batchHandler.handle( message ); + verifyZeroInteractions( raftStateMachine ); // when batchHandler.run(); // then - verify( innerHandler ).handle( message ); + verify( raftStateMachine ).handle( message ); } @Test public void shouldInvokeHandlerOnQueuedMessage() throws Exception { // given - RaftInstance innerHandler = mock( RaftInstance.class ); - when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); - BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); - RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null ); + raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance() ); + RaftMessages.StoreIdAwareMessage message = new RaftMessages.StoreIdAwareMessage( localStoreId, + new RaftMessages.NewEntry.Request( null, null ) ); ExecutorService executor = Executors.newCachedThreadPool(); Future future = executor.submit( batchHandler ); @@ -172,22 +88,19 @@ public void shouldInvokeHandlerOnQueuedMessage() throws Exception Thread.sleep( 50 ); // when - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, message ) ); + batchHandler.handle( message ); // then future.get(); - verify( innerHandler ).handle( message ); + verify( raftStateMachine ).handle( message ); } @Test public void shouldBatchRequests() throws Exception { // given - RaftInstance innerHandler = mock( RaftInstance.class ); - when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); - BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); + raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance() ); ReplicatedString contentA = new ReplicatedString( "A" ); ReplicatedString contentB = new ReplicatedString( "B" ); RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request( null, contentA ); @@ -195,7 +108,7 @@ public void shouldBatchRequests() throws Exception batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageA ) ); batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageB ) ); - verifyZeroInteractions( innerHandler ); + verifyZeroInteractions( raftStateMachine ); // when batchHandler.run(); @@ -204,31 +117,33 @@ public void shouldBatchRequests() throws Exception RaftMessages.NewEntry.Batch batch = new RaftMessages.NewEntry.Batch( 2 ); batch.add( contentA ); batch.add( contentB ); - verify( innerHandler ).handle( batch ); + verify( raftStateMachine ).handle( new RaftMessages.StoreIdAwareMessage( localStoreId, batch ) ); } @Test public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Exception { // given - RaftInstance innerHandler = mock( RaftInstance.class ); - when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); - BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); + raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance() ); + ReplicatedString contentA = new ReplicatedString( "A" ); ReplicatedString contentC = new ReplicatedString( "C" ); - RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request( null, contentA ); - RaftMessages.Heartbeat messageB = new RaftMessages.Heartbeat( null, 0, 0, 0 ); - RaftMessages.NewEntry.Request messageC = new RaftMessages.NewEntry.Request( null, contentC ); - RaftMessages.Heartbeat messageD = new RaftMessages.Heartbeat( null, 1, 1, 1 ); - - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageA ) ); - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageB ) ); - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageC ) ); - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageD ) ); - verifyZeroInteractions( innerHandler ); + RaftMessages.StoreIdAwareMessage messageA = new RaftMessages.StoreIdAwareMessage( localStoreId, + new RaftMessages.NewEntry.Request( null, contentA ) ); + RaftMessages.StoreIdAwareMessage messageB = new RaftMessages.StoreIdAwareMessage( localStoreId, + new RaftMessages.Heartbeat( null, 0, 0, 0 ) ); + RaftMessages.StoreIdAwareMessage messageC = new RaftMessages.StoreIdAwareMessage( localStoreId, + new RaftMessages.NewEntry.Request( null, contentC ) ); + RaftMessages.StoreIdAwareMessage messageD = new RaftMessages.StoreIdAwareMessage( localStoreId, + new RaftMessages.Heartbeat( null, 1, 1, 1 ) ); + + batchHandler.handle( messageA ); + batchHandler.handle( messageB ); + batchHandler.handle( messageC ); + batchHandler.handle( messageD ); + verifyZeroInteractions( raftStateMachine ); // when batchHandler.run(); @@ -238,8 +153,8 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep batch.add( contentA ); batch.add( contentC ); - verify( innerHandler ).handle( batch ); - verify( innerHandler ).handle( messageB ); - verify( innerHandler ).handle( messageD ); + verify( raftStateMachine ).handle( new RaftMessages.StoreIdAwareMessage( localStoreId, batch ) ); + verify( raftStateMachine ).handle( messageB ); + verify( raftStateMachine ).handle( messageD ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/EmptyStateMachine.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/EmptyStateMachine.java index bb649b41e5321..c3749c0b8cb2b 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/EmptyStateMachine.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/EmptyStateMachine.java @@ -37,4 +37,9 @@ public void notifyNeedFreshSnapshot() public void downloadSnapshot( MemberId from ) { } + + @Override + public void innerHandle( RaftMessages.StoreIdAwareMessage raftMessage ) + { + } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/Fixture.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/Fixture.java index 218672139bd3c..ec2174488bc92 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/Fixture.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/elections/Fixture.java @@ -31,6 +31,7 @@ import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftInstance.BootstrapException; import org.neo4j.coreedge.raft.RaftInstanceBuilder; +import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftStateMachine; import org.neo4j.coreedge.raft.RaftTestNetwork; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; @@ -153,6 +154,11 @@ public void downloadSnapshot( MemberId from ) { } + @Override + public void innerHandle( RaftMessages.StoreIdAwareMessage raftMessage ) + { + } + } private void awaitBootstrapped() throws InterruptedException, TimeoutException diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java index e38ec8af14b4e..8f8a223e8665f 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java @@ -19,12 +19,13 @@ */ package org.neo4j.coreedge.raft.membership; -import org.junit.Test; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import org.junit.Test; + import org.neo4j.coreedge.raft.BatchingMessageHandler; +import org.neo4j.coreedge.raft.MismatchedStoreIdService; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.state.RaftState; @@ -34,6 +35,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; + import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -47,7 +49,7 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 500, - mock(BatchingMessageHandler.class), NullLogProvider.getInstance() ); + mock( MismatchedStoreIdService.class ), NullLogProvider.getInstance() ); InMemoryRaftLog raftLog = new InMemoryRaftLog(); raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) ); @@ -69,7 +71,7 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 1, - mock(BatchingMessageHandler.class), NullLogProvider.getInstance()); + mock( MismatchedStoreIdService.class ), NullLogProvider.getInstance() ); RaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 1 ) ) @@ -96,7 +98,7 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 1, - mock(BatchingMessageHandler.class), NullLogProvider.getInstance() ); + mock( MismatchedStoreIdService.class ), NullLogProvider.getInstance() ); RaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 0 ), member( 1 ) ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CommandApplicationProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CommandApplicationProcessTest.java new file mode 100644 index 0000000000000..3f7a236f9d5e1 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CommandApplicationProcessTest.java @@ -0,0 +1,407 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.state; + +import org.junit.Test; +import org.mockito.InOrder; + +import java.util.Arrays; +import java.util.UUID; +import java.util.function.Consumer; + +import org.neo4j.coreedge.SessionTracker; +import org.neo4j.coreedge.raft.NewLeaderBarrier; +import org.neo4j.coreedge.raft.log.InMemoryRaftLog; +import org.neo4j.coreedge.raft.log.RaftLogEntry; +import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; +import org.neo4j.coreedge.raft.log.segmented.InFlightMap; +import org.neo4j.coreedge.raft.replication.DistributedOperation; +import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.replication.session.GlobalSession; +import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; +import org.neo4j.coreedge.raft.replication.session.LocalOperationId; +import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; +import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction; +import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.logging.NullLogProvider; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class CommandApplicationProcessTest +{ + private final InMemoryRaftLog raftLog = spy( new InMemoryRaftLog() ); + + private final InMemoryStateStorage lastFlushedStorage = new InMemoryStateStorage<>( -1L ); + private final SessionTracker sessionStorage = new SessionTracker( + new InMemoryStateStorage<>( new GlobalSessionTrackerState() ) ); + + private final DatabaseHealth dbHealth = new DatabaseHealth( mock( DatabasePanicEventGenerator.class ), + NullLogProvider.getInstance().getLog( getClass() ) ); + + private final GlobalSession globalSession = new GlobalSession( UUID.randomUUID(), null ); + private final int flushEvery = 10; + private final int batchSize = 16; + + private final CoreStateApplier applier = new CoreStateApplier( NullLogProvider.getInstance() ); + private InFlightMap inFlightMap = spy( new InFlightMap<>() ); + private final Monitors monitors = new Monitors(); + private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); + private final CommandApplicationProcess applicationProcess = new CommandApplicationProcess( + coreStateMachines, raftLog, batchSize, flushEvery, () -> dbHealth, + NullLogProvider.getInstance(), new ProgressTrackerImpl( globalSession ), lastFlushedStorage, + sessionStorage, applier, inFlightMap, monitors ); + + private ReplicatedTransaction nullTx = new ReplicatedTransaction( null ); + + private final CommandDispatcher commandDispatcher = mock( CommandDispatcher.class ); + + { + when( coreStateMachines.commandDispatcher() ).thenReturn( commandDispatcher ); + when( coreStateMachines.getLastAppliedIndex() ).thenReturn( -1L ); + } + + private ReplicatedTransaction tx( byte dataValue ) + { + byte[] dataArray = new byte[30]; + Arrays.fill( dataArray, dataValue ); + return new ReplicatedTransaction( dataArray ); + } + + private int sequenceNumber = 0; + private synchronized ReplicatedContent operation( CoreReplicatedContent tx ) + { + return new DistributedOperation( tx, globalSession, new LocalOperationId( 0, sequenceNumber++ ) ); + } + + @Test + public void shouldApplyCommittedCommand() throws Throwable + { + // given + RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class ); + monitors.addMonitorListener( listener ); + applicationProcess.start(); + + InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); + + // when + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + applicationProcess.notifyCommitted( 2 ); + applier.sync( false ); + + // then + inOrder.verify( coreStateMachines ).commandDispatcher(); + inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() ); + inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); + inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 2L ), anyCallback() ); + inOrder.verify( commandDispatcher ).close(); + + verify( listener).commitIndex( 2 ); + } + + @Test + public void shouldNotApplyUncommittedCommands() throws Throwable + { + // given + applicationProcess.start(); + + // when + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + applicationProcess.notifyCommitted( -1 ); + applier.sync( false ); + + // then + verifyZeroInteractions( commandDispatcher ); + } + + @Test + public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Throwable + { + // given + applicationProcess.start(); + + // when + raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + applicationProcess.notifyCommitted( 1 ); + applier.sync( false ); + + InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); + + // then + inOrder.verify( coreStateMachines ).commandDispatcher(); + inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); + inOrder.verify( commandDispatcher ).close(); + } + + @Test + public void duplicatesShouldBeIgnoredButStillIncreaseCommandIndex() throws Exception + { + // given + applicationProcess.start(); + + // when + raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( nullTx, globalSession, new LocalOperationId( 0, 0 ) ) ) ); + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( nullTx, globalSession, new LocalOperationId( 0, 0 ) ) ) ); // duplicate + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( nullTx, globalSession, new LocalOperationId( 0, 1 ) ) ) ); + + applicationProcess.notifyCommitted( 3 ); + applier.sync( false ); + + InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); + + // then + inOrder.verify( coreStateMachines ).commandDispatcher(); + inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); + // duplicate not dispatched + inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 3L ), anyCallback() ); + inOrder.verify( commandDispatcher ).close(); + verifyNoMoreInteractions( commandDispatcher ); + } + + @Test + public void outOfOrderDuplicatesShouldBeIgnoredButStillIncreaseCommandIndex() throws Exception + { + // given + applicationProcess.start(); + + // when + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 100 ), globalSession, new LocalOperationId( 0, 0 ) ) ) ); + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 101 ), globalSession, new LocalOperationId( 0, 1 ) ) ) ); + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 102 ), globalSession, new LocalOperationId( 0, 2 ) ) ) ); + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 101 ), globalSession, new LocalOperationId( 0, 1 ) ) ) ); // duplicate of tx 101 + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 100 ), globalSession, new LocalOperationId( 0, 0 ) ) ) ); // duplicate of tx 100 + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 103 ), globalSession, new LocalOperationId( 0, 3 ) ) ) ); + raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 104 ), globalSession, new LocalOperationId( 0, 4 ) ) ) ); + + applicationProcess.notifyCommitted( 6 ); + applier.sync( false ); + + InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); + + // then + inOrder.verify( coreStateMachines ).commandDispatcher(); + inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 100 ) ), eq( 0L ), anyCallback() ); + inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 101 ) ), eq( 1L ), anyCallback() ); + inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 102 ) ), eq( 2L ), anyCallback() ); + // duplicate of tx 101 not dispatched, at index 3 + // duplicate of tx 100 not dispatched, at index 4 + inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 103 ) ), eq( 5L ), anyCallback() ); + inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 104 ) ), eq( 6L ), anyCallback() ); + inOrder.verify( commandDispatcher ).close(); + verifyNoMoreInteractions( commandDispatcher ); + } + + // TODO: Test recovery, see CoreState#start(). + + @Test + public void shouldPeriodicallyFlushState() throws Throwable + { + // given + applicationProcess.start(); + + int interactions = flushEvery * 5; + for ( int i = 0; i < interactions; i++ ) + { + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + } + + // when + applicationProcess.notifyCommitted( interactions ); + applier.sync( false ); + + // then + verify( coreStateMachines, times( interactions / batchSize ) ).flush(); + assertEquals( interactions - ( interactions % batchSize) - 1, (long) lastFlushedStorage.getInitialState() ); + } + + @Test + public void shouldPanicIfUnableToApply() throws Throwable + { + // given + doThrow( IllegalStateException.class ).when( commandDispatcher ) + .dispatch( any( ReplicatedTransaction.class ), anyLong(), anyCallback() ); + applicationProcess.start(); + + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + + // when + assertEquals( true, dbHealth.isHealthy() ); + applicationProcess.notifyCommitted( 0 ); + applier.sync( false ); + + // then + assertEquals( false, dbHealth.isHealthy() ); + } + + @Test + public void shouldApplyToLogFromCache() throws Throwable + { + //given n things to apply in the cache, check that they are actually applied. + + // given + applicationProcess.start(); + + inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); + + //when + applicationProcess.notifyCommitted( 0 ); + applier.sync( false ); + + //then the cache should have had it's get method called. + verify( inFlightMap, times( 1 ) ).retrieve( 0L ); + verifyZeroInteractions( raftLog ); + } + + @Test + public void cacheEntryShouldBePurgedWhenApplied() throws Throwable + { + //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" + applicationProcess.start(); + + inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.register( 1L, new RaftLogEntry( 0, operation( nullTx ) ) ); + inFlightMap.register( 2L, new RaftLogEntry( 0, operation( nullTx ) ) ); + //when + applicationProcess.notifyCommitted( 0 ); + + applier.sync( false ); + + //then the cache should have had its get method called. + assertNull( inFlightMap.retrieve( 0L ) ); + assertNotNull( inFlightMap.retrieve( 1L ) ); + assertNotNull( inFlightMap.retrieve( 2L ) ); + } + + @Test + public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable + { + // if the cache does not contain all things to be applied, make sure we fall back to the log + // should only happen in recovery, otherwise this is probably a bug. + applicationProcess.start(); + + //given cache with missing entry + ReplicatedContent operation0 = operation( nullTx ); + ReplicatedContent operation1 = operation( nullTx ); + ReplicatedContent operation2 = operation( nullTx ); + + inFlightMap.register( 0L, new RaftLogEntry( 0, operation0 ) ); + inFlightMap.register( 2L, new RaftLogEntry( 2, operation2 ) ); + + raftLog.append( new RaftLogEntry( 0, operation0 ) ); + raftLog.append( new RaftLogEntry( 1, operation1 ) ); + raftLog.append( new RaftLogEntry( 2, operation2 ) ); + + //when + applicationProcess.notifyCommitted( 2 ); + + applier.sync( false ); + + //then the cache should have had its get method called. + verify( inFlightMap, times( 0 ) ).retrieve( 2L ); + verify( inFlightMap, times( 3 ) ).unregister( anyLong() ); //everything is cleaned up + + verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() ); + verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); + verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 2L ), anyCallback() ); + + verify( raftLog, times( 1 ) ).getEntryCursor( 1 ); + } + + @Test + public void shouldFailWhenCacheAndLogMiss() throws Throwable + { + //When an entry is not in the log, we must fail. + applicationProcess.start(); + + inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + raftLog.append( new RaftLogEntry( 1, operation( nullTx ) ) ); + + //when + applicationProcess.notifyCommitted( 2 ); + applier.sync( false ); + + //then + assertFalse( dbHealth.isHealthy() ); + } + + @Test + public void shouldIncreaseLastAppliedForStateMachineCommands() throws Exception + { + // given + applicationProcess.start(); + + // when + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); + applicationProcess.notifyCommitted( 2 ); + applier.sync( false ); + + // then + assertEquals( 2, applicationProcess.lastApplied() ); + } + + @Test + public void shouldIncreaseLastAppliedForOtherCommands() throws Exception + { + // given + applicationProcess.start(); + + // when + raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); + raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); + raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); + applicationProcess.notifyCommitted( 2 ); + applier.sync( false ); + + // then + assertEquals( 2, applicationProcess.lastApplied() ); + } + + private Consumer anyCallback() + { + @SuppressWarnings( "unchecked" ) + Consumer anyCallback = any( Consumer.class ); + return anyCallback; + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java index 7e6c232c2b40e..b91b4256927d8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java @@ -20,388 +20,41 @@ package org.neo4j.coreedge.raft.state; import org.junit.Test; -import org.mockito.InOrder; -import java.util.Arrays; -import java.util.UUID; -import java.util.function.Consumer; +import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.server.StoreId; +import org.neo4j.logging.AssertableLogProvider; -import org.neo4j.coreedge.SessionTracker; -import org.neo4j.coreedge.raft.NewLeaderBarrier; -import org.neo4j.coreedge.raft.log.InMemoryRaftLog; -import org.neo4j.coreedge.raft.log.RaftLogEntry; -import org.neo4j.coreedge.raft.log.monitoring.RaftLogCommitIndexMonitor; -import org.neo4j.coreedge.raft.log.segmented.InFlightMap; -import org.neo4j.coreedge.raft.replication.DistributedOperation; -import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; -import org.neo4j.coreedge.raft.replication.ReplicatedContent; -import org.neo4j.coreedge.raft.replication.session.GlobalSession; -import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; -import org.neo4j.coreedge.raft.replication.session.LocalOperationId; -import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; -import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction; -import org.neo4j.coreedge.server.edge.CoreMemberSelectionStrategy; -import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; -import org.neo4j.kernel.internal.DatabaseHealth; -import org.neo4j.kernel.monitoring.Monitors; -import org.neo4j.logging.NullLogProvider; - -import static junit.framework.TestCase.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import static org.neo4j.coreedge.server.RaftTestMember.member; + public class CoreStateTest { - private final InMemoryRaftLog raftLog = spy( new InMemoryRaftLog() ); - - private final InMemoryStateStorage lastFlushedStorage = new InMemoryStateStorage<>( -1L ); - private final SessionTracker sessionStorage = new SessionTracker( - new InMemoryStateStorage<>( new GlobalSessionTrackerState() ) ); - - private final DatabaseHealth dbHealth = new DatabaseHealth( mock( DatabasePanicEventGenerator.class ), - NullLogProvider.getInstance().getLog( getClass() ) ); - - private final GlobalSession globalSession = new GlobalSession( UUID.randomUUID(), null ); - private final int flushEvery = 10; - private final int batchSize = 16; - - private final CoreStateApplier applier = new CoreStateApplier( NullLogProvider.getInstance() ); - private InFlightMap inFlightMap = spy( new InFlightMap<>() ); - private final Monitors monitors = new Monitors(); - private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); - private final CoreState coreState = new CoreState( coreStateMachines, raftLog, batchSize, flushEvery, () -> dbHealth, - NullLogProvider.getInstance(), new ProgressTrackerImpl( globalSession ), lastFlushedStorage, - sessionStorage, mock( CoreMemberSelectionStrategy.class), applier, mock( CoreStateDownloader.class ), inFlightMap, monitors ); - - private ReplicatedTransaction nullTx = new ReplicatedTransaction( null ); - - private final CommandDispatcher commandDispatcher = mock( CommandDispatcher.class ); - - { - when( coreStateMachines.commandDispatcher() ).thenReturn( commandDispatcher ); - when( coreStateMachines.getLastAppliedIndex() ).thenReturn( -1L ); - } - - private ReplicatedTransaction tx( byte dataValue ) - { - byte[] dataArray = new byte[30]; - Arrays.fill( dataArray, dataValue ); - return new ReplicatedTransaction( dataArray ); - } - - private int sequenceNumber = 0; - private synchronized ReplicatedContent operation( CoreReplicatedContent tx ) - { - return new DistributedOperation( tx, globalSession, new LocalOperationId( 0, sequenceNumber++ ) ); - } - - @Test - public void shouldApplyCommittedCommand() throws Throwable - { - // given - RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class ); - monitors.addMonitorListener( listener ); - coreState.start(); - - InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); - - // when - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - coreState.notifyCommitted( 2 ); - applier.sync( false ); - - // then - inOrder.verify( coreStateMachines ).commandDispatcher(); - inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() ); - inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); - inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 2L ), anyCallback() ); - inOrder.verify( commandDispatcher ).close(); - - verify( listener).commitIndex( 2 ); - } - - @Test - public void shouldNotApplyUncommittedCommands() throws Throwable - { - // given - coreState.start(); - - // when - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - coreState.notifyCommitted( -1 ); - applier.sync( false ); - - // then - verifyZeroInteractions( commandDispatcher ); - } - - @Test - public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Throwable - { - // given - coreState.start(); - - // when - raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - coreState.notifyCommitted( 1 ); - applier.sync( false ); - - InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); - - // then - inOrder.verify( coreStateMachines ).commandDispatcher(); - inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); - inOrder.verify( commandDispatcher ).close(); - } - - @Test - public void duplicatesShouldBeIgnoredButStillIncreaseCommandIndex() throws Exception - { - // given - coreState.start(); - - // when - raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( nullTx, globalSession, new LocalOperationId( 0, 0 ) ) ) ); - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( nullTx, globalSession, new LocalOperationId( 0, 0 ) ) ) ); // duplicate - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( nullTx, globalSession, new LocalOperationId( 0, 1 ) ) ) ); - - coreState.notifyCommitted( 3 ); - applier.sync( false ); - - InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); - - // then - inOrder.verify( coreStateMachines ).commandDispatcher(); - inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); - // duplicate not dispatched - inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 3L ), anyCallback() ); - inOrder.verify( commandDispatcher ).close(); - verifyNoMoreInteractions( commandDispatcher ); - } - - @Test - public void outOfOrderDuplicatesShouldBeIgnoredButStillIncreaseCommandIndex() throws Exception - { - // given - coreState.start(); - - // when - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 100 ), globalSession, new LocalOperationId( 0, 0 ) ) ) ); - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 101 ), globalSession, new LocalOperationId( 0, 1 ) ) ) ); - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 102 ), globalSession, new LocalOperationId( 0, 2 ) ) ) ); - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 101 ), globalSession, new LocalOperationId( 0, 1 ) ) ) ); // duplicate of tx 101 - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 100 ), globalSession, new LocalOperationId( 0, 0 ) ) ) ); // duplicate of tx 100 - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 103 ), globalSession, new LocalOperationId( 0, 3 ) ) ) ); - raftLog.append( new RaftLogEntry( 0, new DistributedOperation( tx( (byte) 104 ), globalSession, new LocalOperationId( 0, 4 ) ) ) ); - - coreState.notifyCommitted( 6 ); - applier.sync( false ); - - InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); - - // then - inOrder.verify( coreStateMachines ).commandDispatcher(); - inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 100 ) ), eq( 0L ), anyCallback() ); - inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 101 ) ), eq( 1L ), anyCallback() ); - inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 102 ) ), eq( 2L ), anyCallback() ); - // duplicate of tx 101 not dispatched, at index 3 - // duplicate of tx 100 not dispatched, at index 4 - inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 103 ) ), eq( 5L ), anyCallback() ); - inOrder.verify( commandDispatcher ).dispatch( eq( tx( (byte) 104 ) ), eq( 6L ), anyCallback() ); - inOrder.verify( commandDispatcher ).close(); - verifyNoMoreInteractions( commandDispatcher ); - } - - // TODO: Test recovery, see CoreState#start(). - - @Test - public void shouldPeriodicallyFlushState() throws Throwable - { - // given - coreState.start(); - - int interactions = flushEvery * 5; - for ( int i = 0; i < interactions; i++ ) - { - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - } - - // when - coreState.notifyCommitted( interactions ); - applier.sync( false ); - - // then - verify( coreStateMachines, times( interactions / batchSize ) ).flush(); - assertEquals( interactions - ( interactions % batchSize) - 1, (long) lastFlushedStorage.getInitialState() ); - } - - @Test - public void shouldPanicIfUnableToApply() throws Throwable - { - // given - doThrow( IllegalStateException.class ).when( commandDispatcher ) - .dispatch( any( ReplicatedTransaction.class ), anyLong(), anyCallback() ); - coreState.start(); - - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - - // when - assertEquals( true, dbHealth.isHealthy() ); - coreState.notifyCommitted( 0 ); - applier.sync( false ); - - // then - assertEquals( false, dbHealth.isHealthy() ); - } - - @Test - public void shouldApplyToLogFromCache() throws Throwable - { - //given n things to apply in the cache, check that they are actually applied. - - // given - coreState.start(); - - inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); - - //when - coreState.notifyCommitted( 0 ); - applier.sync( false ); - - //then the cache should have had it's get method called. - verify( inFlightMap, times( 1 ) ).retrieve( 0L ); - verifyZeroInteractions( raftLog ); - } - - @Test - public void cacheEntryShouldBePurgedWhenApplied() throws Throwable - { - //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" - coreState.start(); - - inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); - inFlightMap.register( 1L, new RaftLogEntry( 0, operation( nullTx ) ) ); - inFlightMap.register( 2L, new RaftLogEntry( 0, operation( nullTx ) ) ); - //when - coreState.notifyCommitted( 0 ); - - applier.sync( false ); - - //then the cache should have had its get method called. - assertNull( inFlightMap.retrieve( 0L ) ); - assertNotNull( inFlightMap.retrieve( 1L ) ); - assertNotNull( inFlightMap.retrieve( 2L ) ); - } - - @Test - public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable - { - // if the cache does not contain all things to be applied, make sure we fall back to the log - // should only happen in recovery, otherwise this is probably a bug. - coreState.start(); - - //given cache with missing entry - ReplicatedContent operation0 = operation( nullTx ); - ReplicatedContent operation1 = operation( nullTx ); - ReplicatedContent operation2 = operation( nullTx ); - - inFlightMap.register( 0L, new RaftLogEntry( 0, operation0 ) ); - inFlightMap.register( 2L, new RaftLogEntry( 2, operation2 ) ); - - raftLog.append( new RaftLogEntry( 0, operation0 ) ); - raftLog.append( new RaftLogEntry( 1, operation1 ) ); - raftLog.append( new RaftLogEntry( 2, operation2 ) ); - - //when - coreState.notifyCommitted( 2 ); - - applier.sync( false ); - - //then the cache should have had its get method called. - verify( inFlightMap, times( 0 ) ).retrieve( 2L ); - verify( inFlightMap, times( 3 ) ).unregister( anyLong() ); //everything is cleaned up - - verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() ); - verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); - verify( commandDispatcher, times( 1 ) ).dispatch( eq( nullTx ), eq( 2L ), anyCallback() ); - - verify( raftLog, times( 1 ) ).getEntryCursor( 1 ); - } - @Test - public void shouldFailWhenCacheAndLogMiss() throws Throwable - { - //When an entry is not in the log, we must fail. - coreState.start(); - - inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - raftLog.append( new RaftLogEntry( 1, operation( nullTx ) ) ); - - //when - coreState.notifyCommitted( 2 ); - applier.sync( false ); - - //then - assertFalse( dbHealth.isHealthy() ); - } - - @Test - public void shouldIncreaseLastAppliedForStateMachineCommands() throws Exception + public void shouldLogOnStoreIdMismatchAndNonEmptyStore() throws Exception { // given - coreState.start(); + StoreId localStoreId = new StoreId( 1, 2, 3, 4 ); + StoreId otherStoreId = new StoreId( 5, 6, 7, 8 ); - // when - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); - coreState.notifyCommitted( 2 ); - applier.sync( false ); + LocalDatabase localDatabase = mock( LocalDatabase.class ); + when( localDatabase.isEmpty() ).thenReturn( false ); + when( localDatabase.storeId() ).thenReturn( localStoreId ); + AssertableLogProvider logProvider = new AssertableLogProvider(); + CommandApplicationProcess applicationProcess = mock( CommandApplicationProcess.class ); - // then - assertEquals( 2, coreState.lastApplied() ); - } - - @Test - public void shouldIncreaseLastAppliedForOtherCommands() throws Exception - { - // given - coreState.start(); + CoreState coreState = new CoreState( null, localDatabase, logProvider, null, null, applicationProcess ); + RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( member( 0 ), null ); // when - raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); - raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); - raftLog.append( new RaftLogEntry( 0, new NewLeaderBarrier() ) ); - coreState.notifyCommitted( 2 ); - applier.sync( false ); + coreState.handle( new RaftMessages.StoreIdAwareMessage( otherStoreId, message ) ); // then - assertEquals( 2, coreState.lastApplied() ); - } - - private Consumer anyCallback() - { - @SuppressWarnings( "unchecked" ) - Consumer anyCallback = any( Consumer.class ); - return anyCallback; + verifyZeroInteractions( applicationProcess ); + logProvider.assertContainsLogCallContaining( "Discarding message" ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java index 04e2d1ffd9fd9..b3b939c33bf07 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreToCoreCopySnapshotIT.java @@ -128,7 +128,7 @@ public void shouldBeAbleToDownloadToNewInstanceAfterPruning() throws Exception // when for ( CoreClusterMember coreDb : cluster.coreMembers() ) { - coreDb.coreState().compact(); + coreDb.coreState().prune(); } cluster.removeCoreMember( leader ); // to force a change of leader