diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java index f5190657eb0e..fb56b3d546cb 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java @@ -27,6 +27,7 @@ import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages.RaftMessage; import org.neo4j.coreedge.raft.net.Inbound.MessageHandler; +import org.neo4j.coreedge.raft.outcome.ConsensusOutcome; import org.neo4j.coreedge.server.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -36,7 +37,7 @@ public class BatchingMessageHandler implements Runnable, MessageHandler, MismatchedStoreIdService { private final Log log; - private final MessageHandler innerHandler; + private final RaftInstance raftInstance; private final BlockingQueue messageQueue; private final int maxBatch; @@ -46,11 +47,11 @@ public class BatchingMessageHandler implements Runnable, MessageHandler listeners = new ArrayList<>( ); - public BatchingMessageHandler( MessageHandler innerHandler, LogProvider logProvider, + public BatchingMessageHandler( RaftInstance raftInstance, LogProvider logProvider, int queueSize, int maxBatch, LocalDatabase localDatabase, RaftStateMachine raftStateMachine ) { - this.innerHandler = innerHandler; + this.raftInstance = raftInstance; this.localDatabase = localDatabase; this.raftStateMachine = raftStateMachine; this.log = logProvider.getLog( getClass() ); @@ -95,7 +96,7 @@ public void run() { if ( messageQueue.isEmpty() ) { - innerHandler.handle( message.message() ); + innerHandle( message.message() ); } else { @@ -127,6 +128,27 @@ public void run() } } + private void innerHandle( RaftMessage raftMessage ) + { + try + { + ConsensusOutcome outcome = raftInstance.handle( raftMessage ); + if ( outcome.needsFreshSnapshot() ) + { + raftStateMachine.notifyNeedFreshSnapshot(); + } + else + { + raftStateMachine.notifyCommitted( outcome.getCommitIndex()); + } + } + catch ( Throwable e ) + { + raftInstance.stopTimers(); + localDatabase.panic( e ); + } + } + private void drain( BlockingQueue messageQueue, List batch, int maxElements ) { @@ -162,13 +184,13 @@ private void collateAndHandleBatch( List batch ) } else { - innerHandler.handle( message ); + innerHandle( message ); } } if ( batchRequest != null ) { - innerHandler.handle( batchRequest ); + innerHandle( batchRequest ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/ConsensusModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/ConsensusModule.java new file mode 100644 index 000000000000..61465f1d7951 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/ConsensusModule.java @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.function.Supplier; + +import org.neo4j.coreedge.discovery.CoreTopologyService; +import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector; +import org.neo4j.coreedge.raft.log.InMemoryRaftLog; +import org.neo4j.coreedge.raft.log.MonitoredRaftLog; +import org.neo4j.coreedge.raft.log.RaftLog; +import org.neo4j.coreedge.raft.log.RaftLogEntry; +import org.neo4j.coreedge.raft.log.segmented.InFlightMap; +import org.neo4j.coreedge.raft.log.segmented.SegmentedRaftLog; +import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder; +import org.neo4j.coreedge.raft.membership.RaftMembershipManager; +import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal; +import org.neo4j.coreedge.raft.net.LoggingOutbound; +import org.neo4j.coreedge.raft.net.Outbound; +import org.neo4j.coreedge.raft.net.RaftChannelInitializer; +import org.neo4j.coreedge.raft.net.RaftOutbound; +import org.neo4j.coreedge.raft.replication.SendToMyself; +import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager; +import org.neo4j.coreedge.raft.state.DurableStateStorage; +import org.neo4j.coreedge.raft.state.StateStorage; +import org.neo4j.coreedge.raft.state.membership.RaftMembershipState; +import org.neo4j.coreedge.raft.state.term.MonitoredTermStateStorage; +import org.neo4j.coreedge.raft.state.term.TermState; +import org.neo4j.coreedge.raft.state.vote.VoteState; +import org.neo4j.coreedge.server.CoreEdgeClusterSettings; +import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.coreedge.server.NonBlockingChannels; +import org.neo4j.coreedge.server.SenderService; +import org.neo4j.coreedge.server.core.EnterpriseCoreEditionModule; +import org.neo4j.coreedge.server.logging.BetterMessageLogger; +import org.neo4j.coreedge.server.logging.MessageLogger; +import org.neo4j.coreedge.server.logging.NullMessageLogger; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.factory.PlatformModule; +import org.neo4j.kernel.impl.logging.LogService; +import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.LifeSupport; +import org.neo4j.logging.LogProvider; + +import static java.time.Clock.systemUTC; + +public class ConsensusModule +{ + private final MonitoredRaftLog raftLog; + private final RaftInstance raftInstance; + + public ConsensusModule( CoreMember myself, final PlatformModule platformModule, + RaftOutbound raftOutbound, File clusterStateDirectory, + DelayedRenewableTimeoutService raftTimeoutService, + CoreTopologyService discoveryService, long recoverFromIndex ) + { + final Dependencies dependencies = platformModule.dependencies; + final Config config = platformModule.config; + final LogService logging = platformModule.logging; + final FileSystemAbstraction fileSystem = platformModule.fileSystem; + final LifeSupport life = platformModule.life; + + LogProvider logProvider = logging.getInternalLogProvider(); + final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); + + final CoreReplicatedContentMarshal marshal = new CoreReplicatedContentMarshal(); + int maxQueueSize = config.get( CoreEdgeClusterSettings.outgoing_queue_size ); + final SenderService senderService = + new SenderService( new RaftChannelInitializer( marshal, logProvider ), logProvider, platformModule.monitors, + maxQueueSize, new NonBlockingChannels() ); + life.add( senderService ); + + final MessageLogger messageLogger; + if ( config.get( CoreEdgeClusterSettings.raft_messages_log_enable ) ) + { + File logsDir = config.get( GraphDatabaseSettings.logs_directory ); + messageLogger = life.add( new BetterMessageLogger<>( myself, raftMessagesLog( logsDir ) ) ); + } + else + { + messageLogger = new NullMessageLogger<>(); + } + + RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider ); + + raftLog = new MonitoredRaftLog( underlyingLog, platformModule.monitors ); + + Outbound loggingOutbound = new LoggingOutbound<>( + raftOutbound, myself, messageLogger ); + + InFlightMap inFlightMap = new InFlightMap<>(); + + StateStorage termState; + StateStorage voteState; + StateStorage raftMembershipStorage; + + try + { + StateStorage durableTermState = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "term-state" ), + "term-state", new TermState.Marshal(), + config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier, + logProvider ) ); + + termState = new MonitoredTermStateStorage( durableTermState, platformModule.monitors ); + + voteState = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "vote-state" ), + "vote-state", new VoteState.Marshal( new CoreMember.CoreMemberMarshal() ), + config.get( CoreEdgeClusterSettings.vote_state_size ), databaseHealthSupplier, + logProvider ) ); + + raftMembershipStorage = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "membership-state" ), + "membership-state", new RaftMembershipState.Marshal(), + config.get( CoreEdgeClusterSettings.raft_membership_state_size ), databaseHealthSupplier, + logProvider ) ); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + + long electionTimeout1 = config.get( CoreEdgeClusterSettings.leader_election_timeout ); + long heartbeatInterval = electionTimeout1 / 3; + + Integer expectedClusterSize = config.get( CoreEdgeClusterSettings.expected_core_cluster_size ); + + CoreMemberSetBuilder memberSetBuilder = new CoreMemberSetBuilder(); + + SendToMyself leaderOnlyReplicator = + new SendToMyself( myself, loggingOutbound ); + + RaftMembershipManager raftMembershipManager = + new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, + expectedClusterSize, electionTimeout1, systemUTC(), + config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage, + recoverFromIndex ); + + life.add( raftMembershipManager ); + + RaftLogShippingManager logShipping = + new RaftLogShippingManager( loggingOutbound, logProvider, raftLog, systemUTC(), + myself, raftMembershipManager, electionTimeout1, + config.get( CoreEdgeClusterSettings.catchup_batch_size ), + config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap ); + + raftInstance = + new RaftInstance( myself, termState, voteState, raftLog, electionTimeout1, + heartbeatInterval, raftTimeoutService, loggingOutbound, logProvider, raftMembershipManager, + logShipping, inFlightMap, platformModule.monitors ); + + life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) ); + + life.add(logShipping); + } + + private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem, + File clusterStateDirectory, CoreReplicatedContentMarshal marshal, LogProvider logProvider ) + { + EnterpriseCoreEditionModule.RaftLogImplementation raftLogImplementation = + EnterpriseCoreEditionModule.RaftLogImplementation.valueOf( config.get( CoreEdgeClusterSettings.raft_log_implementation ) ); + switch ( raftLogImplementation ) + { + case IN_MEMORY: + { + return new InMemoryRaftLog(); + } + + case SEGMENTED: + { + long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size ); + int readerPoolSize = config.get( CoreEdgeClusterSettings.raft_log_reader_pool_size ); + + String pruningStrategyConfig = config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy ); + + return life.add( new SegmentedRaftLog( + fileSystem, + new File( clusterStateDirectory, RaftLog.PHYSICAL_LOG_DIRECTORY_NAME ), + rotateAtSize, + marshal, + logProvider, + pruningStrategyConfig, + readerPoolSize, systemUTC() ) ); + } + default: + throw new IllegalStateException( "Unknown raft log implementation: " + raftLogImplementation ); + } + } + + private static PrintWriter raftMessagesLog( File logsDir ) + { + //noinspection ResultOfMethodCallIgnored + logsDir.mkdirs(); + try + { + + return new PrintWriter( new FileOutputStream( new File( logsDir, "raft-messages.log" ), true ) ); + } + catch ( FileNotFoundException e ) + { + throw new RuntimeException( e ); + } + } + + public RaftLog raftLog() + { + return raftLog; + } + + public RaftInstance raftInstance() + { + return raftInstance; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index 3cffbf4589a6..edd9c3502ec6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -25,7 +25,6 @@ import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; -import java.util.function.Supplier; import org.neo4j.coreedge.helper.VolatileFuture; import org.neo4j.coreedge.raft.log.RaftLog; @@ -33,9 +32,9 @@ import org.neo4j.coreedge.raft.log.segmented.InFlightMap; import org.neo4j.coreedge.raft.membership.RaftGroup; import org.neo4j.coreedge.raft.membership.RaftMembershipManager; -import org.neo4j.coreedge.raft.net.Inbound; import org.neo4j.coreedge.raft.net.Outbound; import org.neo4j.coreedge.raft.outcome.AppendLogEntry; +import org.neo4j.coreedge.raft.outcome.ConsensusOutcome; import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager; import org.neo4j.coreedge.raft.roles.Role; @@ -46,7 +45,6 @@ import org.neo4j.coreedge.raft.state.vote.VoteState; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.kernel.impl.util.Listener; -import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -73,10 +71,10 @@ * the leader will have replicated it safely, and at a later point in time the commit() function * of the entry log will be called. */ -public class RaftInstance implements LeaderLocator, - Inbound.MessageHandler, CoreMetaData +public class RaftInstance implements LeaderLocator, CoreMetaData { private final LeaderNotFoundMonitor leaderNotFoundMonitor; + private RenewableTimeoutService.RenewableTimeout heartbeatTimer; public enum Timeouts implements RenewableTimeoutService.TimeoutName { @@ -92,10 +90,8 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private RenewableTimeoutService.RenewableTimeout electionTimer; private RaftMembershipManager membershipManager; - private final RaftStateMachine raftStateMachine; private final long electionTimeout; - private final Supplier databaseHealthSupplier; private final VolatileFuture volatileLeader = new VolatileFuture<>( null ); private final Outbound outbound; @@ -105,19 +101,17 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private RaftLogShippingManager logShipping; public RaftInstance( CoreMember myself, StateStorage termStorage, - StateStorage voteStorage, RaftLog entryLog, - RaftStateMachine raftStateMachine, long electionTimeout, long heartbeatInterval, - RenewableTimeoutService renewableTimeoutService, - Outbound outbound, - LogProvider logProvider, RaftMembershipManager membershipManager, - RaftLogShippingManager logShipping, - Supplier databaseHealthSupplier, - InFlightMap inFlightMap, - Monitors monitors ) + StateStorage voteStorage, RaftLog entryLog, + long electionTimeout, long heartbeatInterval, + RenewableTimeoutService renewableTimeoutService, + Outbound outbound, + LogProvider logProvider, RaftMembershipManager membershipManager, + RaftLogShippingManager logShipping, + InFlightMap inFlightMap, + Monitors monitors ) { this.myself = myself; this.entryLog = entryLog; - this.raftStateMachine = raftStateMachine; this.electionTimeout = electionTimeout; this.heartbeatInterval = heartbeatInterval; @@ -125,7 +119,6 @@ public RaftInstance( CoreMember myself, StateStorage termStorage, this.outbound = outbound; this.logShipping = logShipping; - this.databaseHealthSupplier = databaseHealthSupplier; this.log = logProvider.getLog( getClass() ); this.membershipManager = membershipManager; @@ -139,19 +132,39 @@ public RaftInstance( CoreMember myself, StateStorage termStorage, private void initTimers() { - electionTimer = renewableTimeoutService.create( - Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> { + electionTimer = renewableTimeoutService.create( Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), + timeout -> { log.info( "Election timeout triggered, base timeout value is %d", electionTimeout ); - handle( new RaftMessages.Timeout.Election( myself ) ); + try + { + handle( new RaftMessages.Timeout.Election( myself ) ); + } + catch ( IOException e ) + { + log.error( "Failed to process election timeout.", e ); + } timeout.renew(); } ); - renewableTimeoutService.create( - Timeouts.HEARTBEAT, heartbeatInterval, 0, timeout -> { - handle( new RaftMessages.Timeout.Heartbeat( myself ) ); + heartbeatTimer = renewableTimeoutService.create( Timeouts.HEARTBEAT, heartbeatInterval, 0, + timeout -> { + try + { + handle( new RaftMessages.Timeout.Heartbeat( myself ) ); + } + catch ( IOException e ) + { + log.error( "Failed to process heartbeat timeout.", e ); + } timeout.renew(); } ); } + public void stopTimers() + { + heartbeatTimer.cancel(); + electionTimer.cancel(); + } + /** * All members must be bootstrapped with the exact same set of initial members. Bootstrapping * requires an empty log as input and will seed it with the initial group entry in term 0. @@ -160,8 +173,10 @@ Timeouts.ELECTION, electionTimeout, randomTimeoutRange(), timeout -> { */ public synchronized void bootstrapWithInitialMembers( RaftGroup memberSet ) throws BootstrapException { + log.info( "Attempting to bootstrap with initial member set %s", memberSet ); if ( entryLog.appendIndex() >= 0 ) { + log.info( "Ignoring bootstrap attempt because the raft log is not empty." ); return; } @@ -181,7 +196,6 @@ public synchronized void bootstrapWithInitialMembers( RaftGroup memberSet ) thro } catch ( IOException e ) { - databaseHealthSupplier.get().panic( e ); throw new BootstrapException( e ); } } @@ -242,14 +256,6 @@ public ReadableRaftState state() return state; } - private void checkForSnapshotNeed( Outcome outcome ) - { - if ( outcome.needsFreshSnapshot() ) - { - raftStateMachine.notifyNeedFreshSnapshot(); - } - } - private void notifyLeaderChanges( Outcome outcome ) { for ( Listener listener : leaderListeners ) @@ -290,47 +296,27 @@ else if ( oldLeader != null && !oldLeader.equals( outcome.getLeader() ) ) return false; } - private void panicAndStop( RaftMessages.RaftMessage incomingMessage, Throwable e ) + public synchronized ConsensusOutcome handle( RaftMessages.RaftMessage incomingMessage ) throws IOException { - log.error( "Failed to process Raft message " + incomingMessage, e ); - databaseHealthSupplier.get().panic( e ); - electionTimer.cancel(); - } + Outcome outcome = currentRole.handler.handle( incomingMessage, state, log ); - public synchronized void handle( RaftMessages.RaftMessage incomingMessage ) - { - try - { - Outcome outcome = currentRole.handler.handle( incomingMessage, state, log ); - - boolean newLeaderWasElected = leaderChanged( outcome, state.leader() ); - boolean newCommittedEntry = outcome.getCommitIndex() > state.commitIndex(); - - state.update( outcome ); // updates to raft log happen within - sendMessages( outcome ); + boolean newLeaderWasElected = leaderChanged( outcome, state.leader() ); - handleTimers( outcome ); - handleLogShipping( outcome ); + state.update( outcome ); // updates to raft log happen within + sendMessages( outcome ); - driveMembership( outcome ); + handleTimers( outcome ); + handleLogShipping( outcome ); - volatileLeader.set( outcome.getLeader() ); + driveMembership( outcome ); - if ( newCommittedEntry ) - { - raftStateMachine.notifyCommitted( state.commitIndex() ); - } - if ( newLeaderWasElected ) - { - notifyLeaderChanges( outcome ); - } + volatileLeader.set( outcome.getLeader() ); - checkForSnapshotNeed( outcome ); - } - catch ( Throwable e ) + if ( newLeaderWasElected ) { - panicAndStop( incomingMessage, e ); + notifyLeaderChanges( outcome ); } + return outcome; } private void driveMembership( Outcome outcome ) throws IOException diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/ConsensusOutcome.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/ConsensusOutcome.java new file mode 100644 index 000000000000..bbd65c5ecc35 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/ConsensusOutcome.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.outcome; + +public interface ConsensusOutcome +{ + boolean needsFreshSnapshot(); + + long getCommitIndex(); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java index 1f33745b65b3..09e602b1e1ce 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java @@ -39,7 +39,7 @@ * A state update could be to change role, change term, etc. * A command could be to append to the RAFT log, tell the log shipper that there was a mismatch, etc. */ -public class Outcome implements Message +public class Outcome implements Message, ConsensusOutcome { /* Common */ private Role nextRole; @@ -256,6 +256,7 @@ public boolean electionTimeoutRenewed() return renewElectionTimeout; } + @Override public boolean needsFreshSnapshot() { return needsFreshSnapshot; @@ -291,6 +292,7 @@ public boolean isSteppingDown() return steppingDown; } + @Override public long getCommitIndex() { return commitIndex; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java index 61589599826b..03b5e08880a7 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java @@ -24,7 +24,7 @@ import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType; -import org.neo4j.coreedge.raft.log.MonitoredRaftLog; +import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationRequest; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine; import org.neo4j.coreedge.raft.replication.token.ReplicatedTokenRequest; @@ -52,7 +52,7 @@ public class CoreStateMachines private final ReplicatedIdAllocationStateMachine idAllocationStateMachine; private final CoreState coreState; private final RecoverTransactionLogState txLogState; - private final MonitoredRaftLog raftLog; + private final RaftLog raftLog; private final LocalDatabase localDatabase; private final CommandDispatcher currentBatch = new StateMachineCommandDispatcher(); @@ -67,7 +67,7 @@ public CoreStateMachines( ReplicatedIdAllocationStateMachine idAllocationStateMachine, CoreState coreState, RecoverTransactionLogState txLogState, - MonitoredRaftLog raftLog, + RaftLog raftLog, LocalDatabase localDatabase ) { this.replicatedTxStateMachine = replicatedTxStateMachine; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java index 91d0c17998a3..0c27f5221845 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java @@ -54,7 +54,8 @@ public static LifeSupport createLifeSupport( DataSourceManager dataSourceManager services.add( raftServer ); services.add( catchupServer ); services.add( raftTimeoutService ); - services.add( new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, raft, raftServer, logProvider ) ); + services.add( new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, raft, logProvider ) ); + return services; } @@ -63,16 +64,14 @@ private static class MembershipWaiterLifecycle extends LifecycleAdapter private final MembershipWaiter membershipWaiter; private final Long joinCatchupTimeout; private final RaftInstance raft; - private final RaftServer raftServer; private final Log log; private MembershipWaiterLifecycle( MembershipWaiter membershipWaiter, Long joinCatchupTimeout, - RaftInstance raft, RaftServer raftServer, LogProvider logProvider ) + RaftInstance raft, LogProvider logProvider ) { this.membershipWaiter = membershipWaiter; this.joinCatchupTimeout = joinCatchupTimeout; this.raft = raft; - this.raftServer = raftServer; this.log = logProvider.getLog( getClass() ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 8c5bf277bb8c..425fa159dad3 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -40,24 +40,18 @@ import org.neo4j.coreedge.catchup.tx.edge.TxPullClient; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; -import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector; import org.neo4j.coreedge.raft.BatchingMessageHandler; +import org.neo4j.coreedge.raft.ConsensusModule; import org.neo4j.coreedge.raft.ContinuousJob; import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService; import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftServer; -import org.neo4j.coreedge.raft.log.InMemoryRaftLog; -import org.neo4j.coreedge.raft.log.MonitoredRaftLog; -import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.log.pruning.PruningScheduler; import org.neo4j.coreedge.raft.log.segmented.InFlightMap; -import org.neo4j.coreedge.raft.log.segmented.SegmentedRaftLog; -import org.neo4j.coreedge.raft.membership.CoreMemberSetBuilder; import org.neo4j.coreedge.raft.membership.MembershipWaiter; -import org.neo4j.coreedge.raft.membership.RaftMembershipManager; import org.neo4j.coreedge.raft.net.CoreReplicatedContentMarshal; import org.neo4j.coreedge.raft.net.LoggingInbound; import org.neo4j.coreedge.raft.net.LoggingOutbound; @@ -67,14 +61,12 @@ import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; import org.neo4j.coreedge.raft.replication.RaftReplicator; import org.neo4j.coreedge.raft.replication.Replicator; -import org.neo4j.coreedge.raft.replication.SendToMyself; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdRangeAcquirer; import org.neo4j.coreedge.raft.replication.session.GlobalSession; import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; -import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager; import org.neo4j.coreedge.raft.replication.token.ReplicatedLabelTokenHolder; import org.neo4j.coreedge.raft.replication.token.ReplicatedPropertyKeyTokenHolder; import org.neo4j.coreedge.raft.replication.token.ReplicatedRelationshipTypeTokenHolder; @@ -92,10 +84,6 @@ import org.neo4j.coreedge.raft.state.LongIndexMarshal; import org.neo4j.coreedge.raft.state.StateStorage; import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState; -import org.neo4j.coreedge.raft.state.membership.RaftMembershipState; -import org.neo4j.coreedge.raft.state.term.MonitoredTermStateStorage; -import org.neo4j.coreedge.raft.state.term.TermState; -import org.neo4j.coreedge.raft.state.vote.VoteState; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.CoreMember.CoreMemberMarshal; @@ -125,8 +113,8 @@ import org.neo4j.kernel.impl.core.RelationshipTypeToken; import org.neo4j.kernel.impl.coreapi.CoreAPIAvailabilityGuard; import org.neo4j.kernel.impl.enterprise.EnterpriseConstraintSemantics; -import org.neo4j.kernel.impl.enterprise.id.EnterpriseIdTypeConfigurationProvider; import org.neo4j.kernel.impl.enterprise.StandardSessionTracker; +import org.neo4j.kernel.impl.enterprise.id.EnterpriseIdTypeConfigurationProvider; import org.neo4j.kernel.impl.enterprise.transaction.log.checkpoint.ConfigurableIOLimiter; import org.neo4j.kernel.impl.factory.CommunityEditionModule; import org.neo4j.kernel.impl.factory.DatabaseInfo; @@ -156,6 +144,7 @@ import static java.time.Clock.systemUTC; import static java.util.concurrent.TimeUnit.SECONDS; + import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD; /** @@ -166,7 +155,7 @@ public class EnterpriseCoreEditionModule extends EditionModule { public static final String CLUSTER_STATE_DIRECTORY_NAME = "cluster-state"; - private final RaftInstance raft; + private final ConsensusModule consensusModule; private final CoreTopologyService discoveryService; private final LogProvider logProvider; @@ -181,8 +170,8 @@ public void registerProcedures( Procedures procedures ) try { procedures.register( new DiscoverMembersProcedure( discoveryService, logProvider ) ); - procedures.register( new AcquireEndpointsProcedure( discoveryService, raft, logProvider ) ); - procedures.register( new ClusterOverviewProcedure( discoveryService, raft, logProvider ) ); + procedures.register( new AcquireEndpointsProcedure( discoveryService, consensusModule.raftInstance(), logProvider ) ); + procedures.register( new ClusterOverviewProcedure( discoveryService, consensusModule.raftInstance(), logProvider ) ); } catch ( ProcedureException e ) { @@ -291,10 +280,6 @@ public void registerProcedures( Procedures procedures ) final DelayedRenewableTimeoutService raftTimeoutService = new DelayedRenewableTimeoutService( systemUTC(), logProvider ); - RaftLog underlyingLog = createRaftLog( config, life, fileSystem, clusterStateDirectory, marshal, logProvider ); - - MonitoredRaftLog raftLog = new MonitoredRaftLog( underlyingLog, platformModule.monitors ); - NonBlockingChannels nonBlockingChannels = new NonBlockingChannels(); CoreToCoreClient.ChannelInitializer channelInitializer = @@ -327,98 +312,38 @@ public void registerProcedures( Procedures procedures ) NotMyselfSelectionStrategy someoneElse = new NotMyselfSelectionStrategy( discoveryService, myself ); + consensusModule = + new ConsensusModule( myself, platformModule, raftOutbound, clusterStateDirectory, raftTimeoutService, + discoveryService, lastFlushedStorage.getInitialState() ); + coreState = dependencies.satisfyDependency( new CoreState( - raftLog, config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), + consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, sessionTrackerStorage, someoneElse, coreStateApplier, downloader, inFlightMap, platformModule.monitors ) ); raftServer = new RaftServer( marshal, raftListenAddress, logProvider ); - StateStorage termState; - StateStorage voteState; - StateStorage raftMembershipStorage; - - try - { - StateStorage durableTermState = life.add( - new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "term-state" ), - "term-state", new TermState.Marshal(), - config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier, - logProvider ) ); - - termState = new MonitoredTermStateStorage( durableTermState, platformModule.monitors ); - - voteState = life.add( - new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "vote-state" ), - "vote-state", new VoteState.Marshal( new CoreMemberMarshal() ), - config.get( CoreEdgeClusterSettings.vote_state_size ), databaseHealthSupplier, - logProvider ) ); - - raftMembershipStorage = life.add( - new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "membership-state" ), - "membership-state", new RaftMembershipState.Marshal(), - config.get( CoreEdgeClusterSettings.raft_membership_state_size ), databaseHealthSupplier, - logProvider ) ); - } - catch ( IOException e ) - { - throw new RuntimeException( e ); - } - LoggingInbound loggingRaftInbound = new LoggingInbound<>( raftServer, messageLogger, myself ); - long electionTimeout1 = config.get( CoreEdgeClusterSettings.leader_election_timeout ); - long heartbeatInterval = electionTimeout1 / 3; - - Integer expectedClusterSize = config.get( CoreEdgeClusterSettings.expected_core_cluster_size ); - - CoreMemberSetBuilder memberSetBuilder = new CoreMemberSetBuilder(); - - SendToMyself leaderOnlyReplicator = - new SendToMyself( myself, loggingOutbound ); - - RaftMembershipManager raftMembershipManager = - new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, - expectedClusterSize, electionTimeout1, systemUTC(), - config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage, - lastFlushedStorage.getInitialState() ); - - life.add( raftMembershipManager ); - - RaftLogShippingManager logShipping = - new RaftLogShippingManager( loggingOutbound, logProvider, raftLog, systemUTC(), - myself, raftMembershipManager, electionTimeout1, - config.get( CoreEdgeClusterSettings.catchup_batch_size ), - config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap ); - - RaftInstance raftInstance = - new RaftInstance( myself, termState, voteState, raftLog, coreState, electionTimeout1, - heartbeatInterval, raftTimeoutService, loggingOutbound, logProvider, raftMembershipManager, - logShipping, databaseHealthSupplier, inFlightMap, platformModule.monitors ); - int queueSize = config.get( CoreEdgeClusterSettings.raft_in_queue_size ); int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch ); BatchingMessageHandler batchingMessageHandler = - new BatchingMessageHandler( raftInstance, logProvider, queueSize, maxBatch, localDatabase, coreState ); + new BatchingMessageHandler( consensusModule.raftInstance(), logProvider, queueSize, maxBatch, localDatabase, coreState ); life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ), batchingMessageHandler ) ); loggingRaftInbound.registerHandler( batchingMessageHandler ); - life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) ); - - life.add(logShipping); - - raft = dependencies.satisfyDependency( raftInstance ); + dependencies.satisfyDependency( consensusModule.raftInstance() ); life.add( new PruningScheduler( coreState, platformModule.jobScheduler, config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ) ) ); RaftReplicator replicator = - new RaftReplicator( raft, myself, + new RaftReplicator( consensusModule.raftInstance(), myself, loggingOutbound, sessionPool, progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ) ); @@ -483,7 +408,7 @@ expectedClusterSize, electionTimeout1, systemUTC(), CoreStateMachines coreStateMachines = new CoreStateMachines( replicatedTxStateMachine, labelTokenStateMachine, relationshipTypeTokenStateMachine, propertyKeyTokenStateMachine, replicatedLockTokenStateMachine, - idAllocationStateMachine, coreState, txLogState, raftLog, localDatabase ); + idAllocationStateMachine, coreState, txLogState, consensusModule.raftLog(), localDatabase ); commitProcessFactory = ( appender, applier, ignored ) -> { TransactionRepresentationCommitProcess localCommit = @@ -517,7 +442,7 @@ expectedClusterSize, electionTimeout1, systemUTC(), publishEditionInfo( dependencies.resolveDependency( UsageData.class ), platformModule.databaseInfo, config ); long leaderLockTokenTimeout = config.get( CoreEdgeClusterSettings.leader_lock_token_timeout ); - Locks lockManager = createLockManager( config, logging, replicator, myself, raft, leaderLockTokenTimeout, + Locks lockManager = createLockManager( config, logging, replicator, myself, consensusModule.raftInstance(), leaderLockTokenTimeout, replicatedLockTokenStateMachine ); this.lockManager = dependencies.satisfyDependency( lockManager ); @@ -531,48 +456,15 @@ expectedClusterSize, electionTimeout1, systemUTC(), long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout ); life.add( CoreServerStartupProcess.createLifeSupport( - platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft, coreState, raftServer, + platformModule.dataSourceManager, replicatedIdGeneratorFactory, consensusModule.raftInstance(), coreState, raftServer, catchupServer, raftTimeoutService, membershipWaiter, joinCatchupTimeout, logProvider ) ); dependencies.satisfyDependency( createSessionTracker() ); } - private RaftLog createRaftLog( Config config, LifeSupport life, FileSystemAbstraction fileSystem, - File clusterStateDirectory, CoreReplicatedContentMarshal marshal, LogProvider logProvider ) - { - RaftLogImplementation raftLogImplementation = - RaftLogImplementation.valueOf( config.get( CoreEdgeClusterSettings.raft_log_implementation ) ); - switch ( raftLogImplementation ) - { - case IN_MEMORY: - { - return new InMemoryRaftLog(); - } - - case SEGMENTED: - { - long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size ); - int readerPoolSize = config.get( CoreEdgeClusterSettings.raft_log_reader_pool_size ); - - String pruningStrategyConfig = config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy ); - - return life.add( new SegmentedRaftLog( - fileSystem, - new File( clusterStateDirectory, RaftLog.PHYSICAL_LOG_DIRECTORY_NAME ), - rotateAtSize, - marshal, - logProvider, - pruningStrategyConfig, - readerPoolSize, systemUTC() ) ); - } - default: - throw new IllegalStateException( "Unknown raft log implementation: " + raftLogImplementation ); - } - } - public boolean isLeader() { - return raft.currentRole() == Role.LEADER; + return consensusModule.raftInstance().currentRole() == Role.LEADER; } private File createClusterStateDirectory( File dir, FileSystemAbstraction fileSystem ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesMessageFlowTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesMessageFlowTest.java index f162846d7347..c586087d1721 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesMessageFlowTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/AppendEntriesMessageFlowTest.java @@ -68,7 +68,7 @@ public void setup() } @Test - public void shouldReturnFalseOnAppendRequestFromOlderTerm() + public void shouldReturnFalseOnAppendRequestFromOlderTerm() throws Exception { // when raft.handle( appendEntriesRequest().from( otherMember ).leaderTerm( -1 ).prevLogIndex( 0 ) @@ -81,7 +81,7 @@ public void shouldReturnFalseOnAppendRequestFromOlderTerm() } @Test - public void shouldReturnTrueOnAppendRequestWithFirstLogEntry() + public void shouldReturnTrueOnAppendRequestWithFirstLogEntry() throws Exception { // when raft.handle( appendEntriesRequest().from( otherMember ).leaderTerm( 0 ).prevLogIndex( -1 ) @@ -93,7 +93,7 @@ public void shouldReturnTrueOnAppendRequestWithFirstLogEntry() } @Test - public void shouldReturnTrueOnAppendRequestWithFirstLogEntryAndIgnorePrevTerm() + public void shouldReturnTrueOnAppendRequestWithFirstLogEntryAndIgnorePrevTerm() throws Exception { // when raft.handle( appendEntriesRequest().from( otherMember ).leaderTerm( 0 ).prevLogIndex( -1 ) @@ -106,7 +106,7 @@ public void shouldReturnTrueOnAppendRequestWithFirstLogEntryAndIgnorePrevTerm() } @Test - public void shouldReturnFalseOnAppendRequestWhenPrevLogEntryNotMatched() + public void shouldReturnFalseOnAppendRequestWhenPrevLogEntryNotMatched() throws Exception { // when raft.handle( appendEntriesRequest().from( otherMember ).leaderTerm( 0 ).prevLogIndex( 0 ) @@ -118,7 +118,7 @@ public void shouldReturnFalseOnAppendRequestWhenPrevLogEntryNotMatched() } @Test - public void shouldAcceptSequenceOfAppendEntries() + public void shouldAcceptSequenceOfAppendEntries() throws Exception { // when raft.handle( appendEntriesRequest().from( otherMember ).leaderTerm( 0 ).prevLogIndex( -1 ) @@ -153,7 +153,7 @@ public void shouldAcceptSequenceOfAppendEntries() } @Test - public void shouldReturnFalseIfLogHistoryDoesNotMatch() + public void shouldReturnFalseIfLogHistoryDoesNotMatch() throws Exception { raft.handle( appendEntriesRequest().from( otherMember ).leaderTerm( 0 ).prevLogIndex( -1 ) .prevLogTerm( -1 ).logEntry( new RaftLogEntry( 0, data( 1 ) ) ).build() ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java index 0cb83d11bd27..022686cc2dd9 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java @@ -28,12 +28,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.raft.net.Inbound; +import org.neo4j.coreedge.raft.outcome.ConsensusOutcome; import org.neo4j.coreedge.server.StoreId; import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.NullLogProvider; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -58,8 +59,8 @@ public void setup() public void shouldDownloadSnapshotOnStoreIdMismatch() throws Exception { // given - @SuppressWarnings("unchecked") - Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); + RaftInstance innerHandler = mock( RaftInstance.class ); + when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); when( localDatabase.isEmpty() ).thenReturn( true ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( @@ -82,8 +83,8 @@ public void shouldDownloadSnapshotOnStoreIdMismatch() throws Exception public void shouldLogOnStoreIdMismatchAndNonEmptyStore() throws Exception { // given - @SuppressWarnings("unchecked") - Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); + RaftInstance innerHandler = mock( RaftInstance.class ); + when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); when( localDatabase.isEmpty() ).thenReturn( false ); AssertableLogProvider logProvider = new AssertableLogProvider(); @@ -107,8 +108,8 @@ public void shouldLogOnStoreIdMismatchAndNonEmptyStore() throws Exception public void shouldInformListenersOnStoreIdMismatch() throws Exception { // given - @SuppressWarnings("unchecked") - Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); + RaftInstance innerHandler = mock( RaftInstance.class ); + when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); when( localDatabase.isEmpty() ).thenReturn( false ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( @@ -133,8 +134,8 @@ public void shouldInformListenersOnStoreIdMismatch() throws Exception public void shouldInvokeInnerHandlerWhenRun() throws Exception { // given - @SuppressWarnings("unchecked") - Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); + RaftInstance innerHandler = mock( RaftInstance.class ); + when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); @@ -154,8 +155,8 @@ public void shouldInvokeInnerHandlerWhenRun() throws Exception public void shouldInvokeHandlerOnQueuedMessage() throws Exception { // given - @SuppressWarnings("unchecked") - Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); + RaftInstance innerHandler = mock( RaftInstance.class ); + when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); @@ -182,8 +183,8 @@ public void shouldInvokeHandlerOnQueuedMessage() throws Exception public void shouldBatchRequests() throws Exception { // given - @SuppressWarnings("unchecked") - Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); + RaftInstance innerHandler = mock( RaftInstance.class ); + when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); @@ -210,8 +211,8 @@ public void shouldBatchRequests() throws Exception public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Exception { // given - @SuppressWarnings("unchecked") - Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); + RaftInstance innerHandler = mock( RaftInstance.class ); + when( innerHandler.handle( any() ) ).thenReturn( mock( ConsensusOutcome.class ) ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java index 7499b9580694..b67cb3180363 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java @@ -19,9 +19,9 @@ */ package org.neo4j.coreedge.raft; +import java.io.IOException; import java.time.Clock; import java.util.Collection; -import java.util.function.Supplier; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLog; @@ -31,6 +31,7 @@ import org.neo4j.coreedge.raft.membership.RaftMembershipManager; import org.neo4j.coreedge.raft.net.Inbound; import org.neo4j.coreedge.raft.net.Outbound; +import org.neo4j.coreedge.raft.outcome.ConsensusOutcome; import org.neo4j.coreedge.raft.replication.SendToMyself; import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager; import org.neo4j.coreedge.raft.state.InMemoryStateStorage; @@ -39,7 +40,6 @@ import org.neo4j.coreedge.raft.state.term.TermState; import org.neo4j.coreedge.raft.state.vote.VoteState; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; @@ -81,7 +81,6 @@ public void send( CoreMember to, Collection raftMessag private long retryTimeMillis = electionTimeout / 2; private int catchupBatchSize = 64; private int maxAllowedShippingLag = 256; - private Supplier databaseHealthSupplier; private StateStorage raftMembership = new InMemoryStateStorage<>( new RaftMembershipState() ); private Monitors monitors = new Monitors(); @@ -105,10 +104,27 @@ public RaftInstance build() RaftLogShippingManager logShipping = new RaftLogShippingManager( outbound, logProvider, raftLog, clock, member, membershipManager, retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, inFlightMap ); - RaftInstance raft = new RaftInstance( member, termState, voteState, raftLog, raftStateMachine, electionTimeout, + RaftInstance raft = new RaftInstance( member, termState, voteState, raftLog, electionTimeout, heartbeatInterval, renewableTimeoutService, outbound, logProvider, - membershipManager, logShipping, databaseHealthSupplier, inFlightMap, monitors ); - inbound.registerHandler( raft ); + membershipManager, logShipping, inFlightMap, monitors ); + inbound.registerHandler( ( incomingMessage ) -> { + try + { + ConsensusOutcome outcome = raft.handle( incomingMessage ); + if ( outcome.needsFreshSnapshot() ) + { + raftStateMachine.notifyNeedFreshSnapshot(); + } + else + { + raftStateMachine.notifyCommitted( outcome.getCommitIndex()); + } + } + catch ( IOException e ) + { + e.printStackTrace(); + } + } ); return raft; } @@ -154,12 +170,6 @@ public RaftInstanceBuilder stateMachine( RaftStateMachine raftStateMachine ) return this; } - RaftInstanceBuilder databaseHealth( final DatabaseHealth databaseHealth ) - { - this.databaseHealthSupplier = () -> databaseHealth; - return this; - } - RaftInstanceBuilder monitors( Monitors monitors ) { this.monitors = monitors; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java index de6a71f742ae..81278d3c76da 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceTest.java @@ -409,16 +409,13 @@ public void handle( RaftMessages.RaftMessage message ) } @Test - public void shouldPanicWhenFailingToHandleMessageAtBootstrapTime() throws Throwable + public void shouldThrowBootstrapExceptionIfUnableToBootstrap() throws Throwable { // given - TestDatabaseHealth databaseHealth = new TestDatabaseHealth(); - ExplodingRaftLog explodingLog = new ExplodingRaftLog(); explodingLog.startExploding(); RaftInstance raft = new RaftInstanceBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) .raftLog( explodingLog ) - .databaseHealth( databaseHealth ) .build(); try { @@ -428,35 +425,10 @@ public void shouldPanicWhenFailingToHandleMessageAtBootstrapTime() throws Throwa } catch ( RaftInstance.BootstrapException e ) { - // then - assertTrue( databaseHealth.hasPanicked() ); + // expected } } - @Test - public void shouldPanicWhenFailingToHandleMessageUnderNormalConditions() throws Throwable - { - // given - TestDatabaseHealth databaseHealth = new TestDatabaseHealth(); - - ExplodingRaftLog explodingLog = new ExplodingRaftLog(); - - RaftInstance raft = new RaftInstanceBuilder( myself, 3, RaftTestMemberSetBuilder.INSTANCE ) - .raftLog( explodingLog ) - .databaseHealth( databaseHealth ) - .build(); - - raft.bootstrapWithInitialMembers( new RaftTestGroup( asSet( myself, member1, member2 ) ) ); - explodingLog.startExploding(); - - // when - raft.handle( new RaftMessages.AppendEntries.Request( member1, 0, -1, -1, - new RaftLogEntry[]{new RaftLogEntry( 0, new ReplicatedString( "hello" ) )}, 0 ) ); - - // then - assertTrue( databaseHealth.hasPanicked() ); - } - @Test public void shouldMonitorLeaderNotFound() throws Exception {