From 551165b6bc3afadfcdd9c8b3b6b374445739e0de Mon Sep 17 00:00:00 2001 From: Mark Needham Date: Thu, 7 Jul 2016 11:52:45 +0100 Subject: [PATCH] Moving StoreId checking down a layer so that we can do message verification of messages sequentially. We previously had a race condition where we'd start downloading a store and if we processed another message before we'd finished downloading we could end up thinking we had a mismatching StoreId on a non empty store because our StoreId hadn't updated yet make the batch handle raft messages --- .../coreedge/raft/BatchingMessageHandler.java | 77 ++++++-- .../raft/MismatchedStoreIdService.java | 41 +++++ .../org/neo4j/coreedge/raft/RaftServer.java | 53 +----- .../raft/membership/MembershipWaiter.java | 17 +- .../org/neo4j/coreedge/raft/net/Inbound.java | 1 + .../core/EnterpriseCoreEditionModule.java | 168 ++++++++---------- .../org/neo4j/coreedge/ClusterIdentityIT.java | 3 +- .../raft/BatchingMessageHandlerTest.java | 36 ++-- .../raft/membership/MembershipWaiterTest.java | 7 +- 9 files changed, 225 insertions(+), 178 deletions(-) create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/MismatchedStoreIdService.java diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java index 08528de6b68c5..0d4ac14b1d042 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/BatchingMessageHandler.java @@ -24,26 +24,35 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.RaftMessages.RaftMessage; import org.neo4j.coreedge.raft.net.Inbound.MessageHandler; +import org.neo4j.coreedge.server.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import static java.util.concurrent.TimeUnit.SECONDS; -public class BatchingMessageHandler implements Runnable, MessageHandler +public class BatchingMessageHandler implements Runnable, MessageHandler, MismatchedStoreIdService { private final Log log; private final MessageHandler innerHandler; + private final BlockingQueue messageQueue; - private final BlockingQueue messageQueue; private final int maxBatch; - private final List batch; + private final List batch; + + private final LocalDatabase localDatabase; + private RaftStateMachine raftStateMachine; + private final List listeners = new ArrayList<>( ); public BatchingMessageHandler( MessageHandler innerHandler, LogProvider logProvider, - int queueSize, int maxBatch ) + int queueSize, int maxBatch, LocalDatabase localDatabase, + RaftStateMachine raftStateMachine ) { this.innerHandler = innerHandler; + this.localDatabase = localDatabase; + this.raftStateMachine = raftStateMachine; this.log = logProvider.getLog( getClass() ); this.maxBatch = maxBatch; @@ -52,7 +61,7 @@ public BatchingMessageHandler( MessageHandler innerHandler, LogProv } @Override - public void handle( RaftMessage message ) + public void handle( RaftMessages.StoreIdAwareMessage message ) { try { @@ -67,7 +76,7 @@ public void handle( RaftMessage message ) @Override public void run() { - RaftMessage message = null; + RaftMessages.StoreIdAwareMessage message = null; try { message = messageQueue.poll( 1, SECONDS ); @@ -79,26 +88,66 @@ public void run() if ( message != null ) { - if ( messageQueue.isEmpty() ) + // do the check here + RaftMessages.RaftMessage innerMessage = message.message(); + StoreId storeId = message.storeId(); + + if ( message.storeId().equals( localDatabase.storeId() ) ) { - innerHandler.handle( message ); + if ( messageQueue.isEmpty() ) + { + innerHandler.handle( message.message() ); + } + else + { + batch.clear(); + batch.add( innerMessage ); + drain( messageQueue, batch, maxBatch - 1 ); + collateAndHandleBatch( batch ); + } } else { - batch.clear(); - batch.add( message ); - messageQueue.drainTo( batch, maxBatch - 1 ); + if ( localDatabase.isEmpty() ) + { + raftStateMachine.downloadSnapshot( innerMessage.from() ); + } + else + { + log.info( "Discarding message owing to mismatched storeId and non-empty store. " + + "Expected: %s, Encountered: %s", storeId, localDatabase.storeId() ); + listeners.forEach( l -> { + MismatchedStoreIdException ex = new MismatchedStoreIdException( storeId, localDatabase.storeId() ); + l.onMismatchedStore( ex ); + } ); + } - collateAndHandleBatch( batch ); } } } - private void collateAndHandleBatch( List batch ) + private void drain( BlockingQueue messageQueue, + List batch, int maxElements ) + { + List tempDraining = new ArrayList<>(); + messageQueue.drainTo( tempDraining, maxElements ); + + for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : tempDraining ) + { + batch.add( storeIdAwareMessage.message() ); + } + } + + public void addMismatchedStoreListener( BatchingMessageHandler.MismatchedStoreListener listener ) + { + listeners.add(listener); + } + + private void collateAndHandleBatch( List batch ) { RaftMessages.NewEntry.Batch batchRequest = null; - for ( RaftMessage message : batch ) + for ( RaftMessages.RaftMessage message : batch ) { if ( message instanceof RaftMessages.NewEntry.Request ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/MismatchedStoreIdService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/MismatchedStoreIdService.java new file mode 100644 index 0000000000000..5474304a5916e --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/MismatchedStoreIdService.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft; + +import org.neo4j.coreedge.server.StoreId; +import org.neo4j.kernel.impl.store.StoreFailureException; + +public interface MismatchedStoreIdService +{ + void addMismatchedStoreListener( BatchingMessageHandler.MismatchedStoreListener listener ); + + interface MismatchedStoreListener + { + void onMismatchedStore( BatchingMessageHandler.MismatchedStoreIdException ex ); + } + + class MismatchedStoreIdException extends StoreFailureException + { + public MismatchedStoreIdException( StoreId expected, StoreId encountered ) + { + super( "Expected:" + expected + ", encountered:" + encountered ); + } + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java index 367a26e55d4c6..c5ac03de5eec6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftServer.java @@ -38,34 +38,30 @@ import java.util.concurrent.TimeUnit; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; -import org.neo4j.coreedge.raft.membership.MembershipWaiter; import org.neo4j.coreedge.raft.net.Inbound; import org.neo4j.coreedge.raft.net.codecs.RaftMessageDecoder; import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.ChannelMarshal; import org.neo4j.coreedge.server.ListenSocketAddress; -import org.neo4j.coreedge.server.StoreId; import org.neo4j.coreedge.server.logging.ExceptionLoggingHandler; import org.neo4j.helpers.NamedThreadFactory; -import org.neo4j.kernel.impl.store.MismatchingStoreIdException; -import org.neo4j.kernel.impl.store.StoreFailureException; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import static java.lang.String.format; -public class RaftServer extends LifecycleAdapter implements Inbound +public class RaftServer extends LifecycleAdapter implements Inbound { private final ListenSocketAddress listenAddress; private final LocalDatabase localDatabase; private final RaftStateMachine raftStateMachine; private final Log log; private final ChannelMarshal marshal; - private MessageHandler messageHandler; + private MessageHandler messageHandler; private EventLoopGroup workerGroup; private Channel channel; - private final List listeners = new ArrayList<>(); + private final List listeners = new ArrayList<>(); private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" ); @@ -134,16 +130,11 @@ protected void initChannel( SocketChannel ch ) throws Exception } @Override - public void registerHandler( Inbound.MessageHandler handler ) + public void registerHandler( Inbound.MessageHandler handler ) { this.messageHandler = handler; } - public void addMismatchedStoreListener( MismatchedStoreListener listener ) - { - listeners.add( listener ); - } - private class RaftMessageHandler extends SimpleChannelInboundHandler { @Override @@ -152,29 +143,7 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, { try { - RaftMessages.RaftMessage message = storeIdAwareMessage.message(); - StoreId storeId = storeIdAwareMessage.storeId(); - - if ( storeId.equals( localDatabase.storeId() ) ) - { - messageHandler.handle( message ); - } - else - { - if ( localDatabase.isEmpty() ) - { - raftStateMachine.downloadSnapshot( message.from() ); - } - else - { - log.info( "Discarding message owing to mismatched storeId and non-empty store. Expected: %s, " + - "Encountered: %s", storeId, localDatabase.storeId() ); - listeners.forEach( l -> { - MismatchedStoreIdException ex = new MismatchedStoreIdException( storeId, localDatabase.storeId() ); - l.onMismatchedStore( ex ); - } ); - } - } + messageHandler.handle( storeIdAwareMessage ); } catch ( Exception e ) { @@ -183,16 +152,4 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, } } - public interface MismatchedStoreListener - { - void onMismatchedStore(MismatchedStoreIdException ex); - } - - public class MismatchedStoreIdException extends StoreFailureException - { - public MismatchedStoreIdException( StoreId expected, StoreId encountered ) - { - super( "Expected:" + expected + ", encountered:" + encountered ); - } - } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java index 0504683a94ea8..8ff90a7ea1edc 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipWaiter.java @@ -22,10 +22,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; -import org.neo4j.coreedge.raft.RaftServer; +import org.neo4j.coreedge.raft.BatchingMessageHandler; +import org.neo4j.coreedge.raft.MismatchedStoreIdService; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.kernel.impl.store.MismatchingStoreIdException; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -53,15 +53,16 @@ public class MembershipWaiter private final CoreMember myself; private final JobScheduler jobScheduler; private final long maxCatchupLag; - private final RaftServer raftServer; + private final MismatchedStoreIdService mismatchedStoreIdService; private final Log log; - public MembershipWaiter( CoreMember myself, JobScheduler jobScheduler, long maxCatchupLag, RaftServer raftServer, LogProvider logProvider ) + public MembershipWaiter( CoreMember myself, JobScheduler jobScheduler, long maxCatchupLag, + MismatchedStoreIdService mismatchedStoreIdService, LogProvider logProvider ) { this.myself = myself; this.jobScheduler = jobScheduler; this.maxCatchupLag = maxCatchupLag; - this.raftServer = raftServer; + this.mismatchedStoreIdService = mismatchedStoreIdService; this.log = logProvider.getLog( getClass() ); } @@ -70,7 +71,7 @@ public CompletableFuture waitUntilCaughtUpMember( ReadableRaftState raf CompletableFuture catchUpFuture = new CompletableFuture<>(); Evaluator evaluator = new Evaluator( raftState, catchUpFuture ); - raftServer.addMismatchedStoreListener( evaluator ); + mismatchedStoreIdService.addMismatchedStoreListener( evaluator ); JobScheduler.JobHandle jobHandle = jobScheduler.scheduleRecurring( new JobScheduler.Group( getClass().toString(), POOLED ), @@ -81,7 +82,7 @@ public CompletableFuture waitUntilCaughtUpMember( ReadableRaftState raf return catchUpFuture; } - private class Evaluator implements Runnable, RaftServer.MismatchedStoreListener + private class Evaluator implements Runnable, BatchingMessageHandler.MismatchedStoreListener { private final ReadableRaftState raftState; private final CompletableFuture catchUpFuture; @@ -138,7 +139,7 @@ private boolean caughtUpWithLeader() } @Override - public void onMismatchedStore(RaftServer.MismatchedStoreIdException ex) + public void onMismatchedStore(BatchingMessageHandler.MismatchedStoreIdException ex) { catchUpFuture.completeExceptionally( ex ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java index 4cd286a3d16c2..e8d2c6b7ae2b8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/net/Inbound.java @@ -20,6 +20,7 @@ package org.neo4j.coreedge.raft.net; import org.neo4j.coreedge.network.Message; +import org.neo4j.coreedge.raft.BatchingMessageHandler; public interface Inbound { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index e886ff240ded8..1d04a4c69f0ec 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -48,7 +48,6 @@ import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftServer; -import org.neo4j.coreedge.raft.RaftStateMachine; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.MonitoredRaftLog; import org.neo4j.coreedge.raft.log.RaftLog; @@ -148,7 +147,6 @@ import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleStatus; -import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; import org.neo4j.storageengine.api.Token; import org.neo4j.udc.UsageData; @@ -334,10 +332,81 @@ public void registerProcedures( Procedures procedures ) raftServer = new RaftServer( marshal, raftListenAddress, localDatabase, logProvider, coreState ); - raft = dependencies.satisfyDependency( createRaft( life, loggingOutbound, discoveryService, config, - messageLogger, raftLog, coreState, fileSystem, clusterStateDirectory, myself, logProvider, - raftServer, raftTimeoutService, databaseHealthSupplier, inFlightMap, platformModule.monitors, - platformModule.jobScheduler ) ); + StateStorage termState; + StateStorage voteState; + StateStorage raftMembershipStorage; + + try + { + StateStorage durableTermState = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "term-state" ), + "term-state", new TermState.Marshal(), + config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier, + logProvider ) ); + + termState = new MonitoredTermStateStorage( durableTermState, platformModule.monitors ); + + voteState = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "vote-state" ), + "vote-state", new VoteState.Marshal( new CoreMemberMarshal() ), + config.get( CoreEdgeClusterSettings.vote_state_size ), databaseHealthSupplier, + logProvider ) ); + + raftMembershipStorage = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "membership-state" ), + "membership-state", new RaftMembershipState.Marshal( new CoreMemberMarshal() ), + config.get( CoreEdgeClusterSettings.raft_membership_state_size ), databaseHealthSupplier, + logProvider ) ); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + + LoggingInbound loggingRaftInbound = + new LoggingInbound<>( raftServer, messageLogger, myself ); + + long electionTimeout1 = config.get( CoreEdgeClusterSettings.leader_election_timeout ); + long heartbeatInterval = electionTimeout1 / 3; + + Integer expectedClusterSize = config.get( CoreEdgeClusterSettings.expected_core_cluster_size ); + + CoreMemberSetBuilder memberSetBuilder = new CoreMemberSetBuilder(); + + SendToMyself leaderOnlyReplicator = + new SendToMyself( myself, loggingOutbound ); + + RaftMembershipManager raftMembershipManager = + new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, + expectedClusterSize, electionTimeout1, systemUTC(), + config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage ); + + RaftLogShippingManager logShipping = + new RaftLogShippingManager( loggingOutbound, logProvider, raftLog, systemUTC(), + myself, raftMembershipManager, electionTimeout1, + config.get( CoreEdgeClusterSettings.catchup_batch_size ), + config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap ); + + RaftInstance raftInstance = + new RaftInstance( myself, termState, voteState, raftLog, coreState, electionTimeout1, + heartbeatInterval, raftTimeoutService, loggingOutbound, logProvider, raftMembershipManager, + logShipping, databaseHealthSupplier, inFlightMap, platformModule.monitors ); + + int queueSize = config.get( CoreEdgeClusterSettings.raft_in_queue_size ); + int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch ); + BatchingMessageHandler batchingMessageHandler = + new BatchingMessageHandler( raftInstance, logProvider, queueSize, maxBatch, localDatabase, coreState ); + + life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ), + batchingMessageHandler ) ); + + loggingRaftInbound.registerHandler( batchingMessageHandler ); + + life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) ); + + life.add(logShipping); + + raft = dependencies.satisfyDependency( raftInstance ); life.add( new PruningScheduler( coreState, platformModule.jobScheduler, config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ) ) ); @@ -358,7 +427,7 @@ public void registerProcedures( Procedures procedures ) long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); MembershipWaiter membershipWaiter = - new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, raftServer, logProvider ); + new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, batchingMessageHandler, logProvider ); ReplicatedIdGeneratorFactory replicatedIdGeneratorFactory = createIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider ); @@ -510,91 +579,6 @@ private File createClusterStateDirectory( File dir, FileSystemAbstraction fileSy { throw new RuntimeException( e ); } - - } - - private static RaftInstance createRaft( LifeSupport life, - Outbound raftOutbound, CoreTopologyService discoveryService, - Config config, MessageLogger messageLogger, RaftLog raftLog, RaftStateMachine raftStateMachine, - FileSystemAbstraction fileSystem, File clusterStateDirectory, CoreMember myself, LogProvider logProvider, - RaftServer raftServer, DelayedRenewableTimeoutService raftTimeoutService, - Supplier databaseHealthSupplier, InFlightMap inFlightMap, - Monitors monitors, JobScheduler jobScheduler ) - { - StateStorage termState; - StateStorage voteState; - StateStorage raftMembershipStorage; - - try - { - StateStorage durableTermState = life.add( - new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "term-state" ), - "term-state", new TermState.Marshal(), - config.get( CoreEdgeClusterSettings.term_state_size ), databaseHealthSupplier, - logProvider ) ); - - termState = new MonitoredTermStateStorage( durableTermState, monitors ); - - voteState = life.add( - new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "vote-state" ), - "vote-state", new VoteState.Marshal( new CoreMemberMarshal() ), - config.get( CoreEdgeClusterSettings.vote_state_size ), databaseHealthSupplier, - logProvider ) ); - - raftMembershipStorage = life.add( - new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "membership-state" ), - "membership-state", new RaftMembershipState.Marshal( new CoreMemberMarshal() ), - config.get( CoreEdgeClusterSettings.raft_membership_state_size ), databaseHealthSupplier, - logProvider ) ); - } - catch ( IOException e ) - { - throw new RuntimeException( e ); - } - - LoggingInbound loggingRaftInbound = - new LoggingInbound<>( raftServer, messageLogger, myself ); - - long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); - long heartbeatInterval = electionTimeout / 3; - - Integer expectedClusterSize = config.get( CoreEdgeClusterSettings.expected_core_cluster_size ); - - CoreMemberSetBuilder memberSetBuilder = new CoreMemberSetBuilder(); - - SendToMyself leaderOnlyReplicator = - new SendToMyself( myself, raftOutbound ); - - RaftMembershipManager raftMembershipManager = - new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider, - expectedClusterSize, electionTimeout, systemUTC(), - config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage ); - - RaftLogShippingManager logShipping = - new RaftLogShippingManager( raftOutbound, logProvider, raftLog, systemUTC(), - myself, raftMembershipManager, electionTimeout, - config.get( CoreEdgeClusterSettings.catchup_batch_size ), - config.get( CoreEdgeClusterSettings.log_shipping_max_lag ), inFlightMap ); - life.add( logShipping ); - - RaftInstance raftInstance = - new RaftInstance( myself, termState, voteState, raftLog, raftStateMachine, electionTimeout, - heartbeatInterval, raftTimeoutService, raftOutbound, logProvider, raftMembershipManager, - logShipping, databaseHealthSupplier, inFlightMap, monitors ); - - int queueSize = config.get( CoreEdgeClusterSettings.raft_in_queue_size ); - int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch ); - BatchingMessageHandler batchingMessageHandler = - new BatchingMessageHandler( raftInstance, logProvider, queueSize, maxBatch ); - - life.add( new ContinuousJob( jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ), - batchingMessageHandler ) ); - - loggingRaftInbound.registerHandler( batchingMessageHandler ); - - life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) ); - - return raftInstance; } private static PrintWriter raftMessagesLog( File storeDir ) diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java index 2014f75165c0c..357b99c30da4e 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java @@ -53,6 +53,7 @@ import static org.neo4j.coreedge.TestStoreId.assertAllStoresHaveTheSameStoreId; import static org.neo4j.graphdb.Label.label; +import static org.neo4j.kernel.impl.store.MetaDataStore.Position.RANDOM_NUMBER; import static org.neo4j.kernel.impl.store.MetaDataStore.Position.TIME; import static org.neo4j.test.rule.SuppressOutput.suppress; @@ -268,7 +269,7 @@ private void changeStoreId( File storeDir ) throws IOException File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); try ( PageCache pageCache = StandalonePageCacheFactory.createPageCache( fs ) ) { - MetaDataStore.setRecord( pageCache, neoStoreFile, TIME, System.currentTimeMillis() ); + MetaDataStore.setRecord( pageCache, neoStoreFile, RANDOM_NUMBER, System.currentTimeMillis() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java index 0de67d51b07c6..2bc48267570c8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BatchingMessageHandlerTest.java @@ -19,23 +19,35 @@ */ package org.neo4j.coreedge.raft; +import org.junit.Before; import org.junit.Test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.raft.net.Inbound; +import org.neo4j.coreedge.server.StoreId; import org.neo4j.logging.NullLogProvider; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; public class BatchingMessageHandlerTest { private static final int MAX_BATCH = 16; private static final int QUEUE_SIZE = 64; + private LocalDatabase localDatabase = mock( LocalDatabase.class ); + private RaftStateMachine raftStateMachine = mock( RaftStateMachine.class ); + + @Before + public void setup() + { + when(localDatabase.storeId()).thenReturn( new StoreId( 1,2,3,4 ) ); + } @Test public void shouldInvokeInnerHandlerWhenRun() throws Exception @@ -45,10 +57,10 @@ public void shouldInvokeInnerHandlerWhenRun() throws Exception Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH ); + innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null ); - batchHandler.handle( message ); + batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), message ) ); verifyZeroInteractions( innerHandler ); // when @@ -66,7 +78,7 @@ public void shouldInvokeHandlerOnQueuedMessage() throws Exception Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH ); + innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); RaftMessages.NewEntry.Request message = new RaftMessages.NewEntry.Request( null, null ); ExecutorService executor = Executors.newCachedThreadPool(); @@ -79,7 +91,7 @@ public void shouldInvokeHandlerOnQueuedMessage() throws Exception Thread.sleep( 50 ); // when - batchHandler.handle( message ); + batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), message ) ); // then future.get(); @@ -94,14 +106,14 @@ public void shouldBatchRequests() throws Exception Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH ); + innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); ReplicatedString contentA = new ReplicatedString( "A" ); ReplicatedString contentB = new ReplicatedString( "B" ); RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request( null, contentA ); RaftMessages.NewEntry.Request messageB = new RaftMessages.NewEntry.Request( null, contentB ); - batchHandler.handle( messageA ); - batchHandler.handle( messageB ); + batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageA ) ); + batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageB ) ); verifyZeroInteractions( innerHandler ); // when @@ -122,7 +134,7 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep Inbound.MessageHandler innerHandler = mock( Inbound.MessageHandler.class ); BatchingMessageHandler batchHandler = new BatchingMessageHandler( - innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH ); + innerHandler, NullLogProvider.getInstance(), QUEUE_SIZE, MAX_BATCH, localDatabase, raftStateMachine ); ReplicatedString contentA = new ReplicatedString( "A" ); ReplicatedString contentC = new ReplicatedString( "C" ); @@ -131,10 +143,10 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep RaftMessages.NewEntry.Request messageC = new RaftMessages.NewEntry.Request( null, contentC ); RaftMessages.Heartbeat messageD = new RaftMessages.Heartbeat( null, 1, 1, 1 ); - batchHandler.handle( messageA ); - batchHandler.handle( messageB ); - batchHandler.handle( messageC ); - batchHandler.handle( messageD ); + batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageA ) ); + batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageB ) ); + batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageC ) ); + batchHandler.handle( new RaftMessages.StoreIdAwareMessage( new StoreId( 1,2,3,4 ), messageD ) ); verifyZeroInteractions( innerHandler ); // when diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java index 7b3e719f24e74..1cc28b4517440 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/MembershipWaiterTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import org.neo4j.coreedge.raft.BatchingMessageHandler; import org.neo4j.coreedge.raft.RaftServer; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -47,7 +48,7 @@ public void shouldReturnImmediatelyIfMemberAndCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 500, - mock(RaftServer.class), NullLogProvider.getInstance() ); + mock(BatchingMessageHandler.class), NullLogProvider.getInstance() ); InMemoryRaftLog raftLog = new InMemoryRaftLog(); raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) ); @@ -69,7 +70,7 @@ public void shouldTimeoutIfCaughtUpButNotMember() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 1, - mock(RaftServer.class), NullLogProvider.getInstance()); + mock(BatchingMessageHandler.class), NullLogProvider.getInstance()); RaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 1 ) ) @@ -96,7 +97,7 @@ public void shouldTimeoutIfMemberButNotCaughtUp() throws Exception { OnDemandJobScheduler jobScheduler = new OnDemandJobScheduler(); MembershipWaiter waiter = new MembershipWaiter( member( 0 ), jobScheduler, 1, - mock(RaftServer.class), NullLogProvider.getInstance() ); + mock(BatchingMessageHandler.class), NullLogProvider.getInstance() ); RaftState raftState = RaftStateBuilder.raftState() .votingMembers( member( 0 ), member( 1 ) )