From fa2ab5e7796261884b6d7d76bb94d50680347d22 Mon Sep 17 00:00:00 2001 From: Alistair Jones Date: Sat, 17 Sep 2016 16:52:12 +0100 Subject: [PATCH] Filter messages by clusterId. Avoids processing of rogue messages from another cluster. Replaces previous implementation based on storeId. Also rename BindingService to ClusterIdentity, owned by ClusteringModule. --- .../core/EnterpriseCoreEditionModule.java | 16 ++++--- .../neo4j/coreedge/core/IdentityModule.java | 3 +- .../coreedge/core/consensus/RaftMessages.java | 22 +++++----- .../coreedge/core/consensus/RaftServer.java | 15 +++---- .../core/server/BatchingMessageHandler.java | 44 +++++++++---------- .../core/server/CoreServerModule.java | 42 ++++++------------ .../coreedge/core/state/ClusteringModule.java | 31 ++++++++++++- .../coreedge/core/state/CoreBootstrapper.java | 2 +- .../neo4j/coreedge/core/state/CoreState.java | 44 +++++++++++-------- .../state => identity}/BindingException.java | 2 +- .../neo4j/coreedge/identity/ClusterId.java | 3 ++ .../ClusterIdentity.java} | 31 ++++++++----- .../coreedge/messaging/RaftOutbound.java | 18 ++++---- .../marshalling/RaftMessageDecoder.java | 7 ++- .../marshalling/RaftMessageEncoder.java | 11 +++-- .../server/BatchingMessageHandlerTest.java | 35 ++++++++------- .../ClusterIdentityTest.java} | 28 ++++++------ .../RaftMessageEncodingDecodingTest.java | 11 +++-- .../coreedge/scenarios/ClusterIdentityIT.java | 16 ++++++- 19 files changed, 214 insertions(+), 167 deletions(-) rename enterprise/core-edge/src/main/java/org/neo4j/coreedge/{core/state => identity}/BindingException.java (96%) rename enterprise/core-edge/src/main/java/org/neo4j/coreedge/{core/state/BindingService.java => identity/ClusterIdentity.java} (83%) rename enterprise/core-edge/src/test/java/org/neo4j/coreedge/{core/state/BindingServiceTest.java => identity/ClusterIdentityTest.java} (86%) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/EnterpriseCoreEditionModule.java index 3a98a9fb8219b..21d0bd11ad83e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/EnterpriseCoreEditionModule.java @@ -151,7 +151,7 @@ protected Log authManagerLog() IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory ); ClusteringModule clusteringModule = new ClusteringModule( discoveryServiceFactory, identityModule.myself(), - platformModule ); + platformModule, clusterStateDirectory ); topologyService = clusteringModule.topologyService(); long logThresholdMillis = config.get( CoreEdgeClusterSettings.unknown_address_logging_throttle ); @@ -164,16 +164,18 @@ protected Log authManagerLog() final MessageLogger messageLogger = createMessageLogger( config, life, identityModule.myself() ); - Outbound raftOutbound = new LoggingOutbound<>( - new RaftOutbound( topologyService, raftSender, localDatabase, logProvider, logThresholdMillis ), + RaftOutbound raftOutbound = new RaftOutbound( topologyService, raftSender, clusteringModule.clusterIdentity(), + logProvider, logThresholdMillis ); + Outbound loggingOutbound = new LoggingOutbound<>( raftOutbound, identityModule.myself(), messageLogger ); - consensusModule = new ConsensusModule( identityModule.myself(), platformModule, raftOutbound, clusterStateDirectory, topologyService ); + consensusModule = new ConsensusModule( identityModule.myself(), platformModule, loggingOutbound, + clusterStateDirectory, topologyService ); dependencies.satisfyDependency( consensusModule.raftMachine() ); ReplicationModule replicationModule = new ReplicationModule( identityModule.myself(), platformModule, config, consensusModule, - raftOutbound, clusterStateDirectory, fileSystem, logProvider ); + loggingOutbound, clusterStateDirectory, fileSystem, logProvider ); CoreStateMachinesModule coreStateMachinesModule = new CoreStateMachinesModule( identityModule.myself(), platformModule, clusterStateDirectory, config, replicationModule.getReplicator(), consensusModule.raftMachine(), dependencies, localDatabase ); @@ -187,8 +189,8 @@ protected Log authManagerLog() this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory; this.accessCapability = new LeaderCanWrite( consensusModule.raftMachine() ); - CoreServerModule coreServerModule = new CoreServerModule( identityModule.myself(), platformModule, consensusModule, - coreStateMachinesModule, replicationModule, clusterStateDirectory, topologyService, localDatabase, + CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule, + coreStateMachinesModule, replicationModule, clusterStateDirectory, clusteringModule, localDatabase, messageLogger, databaseHealthSupplier ); editionInvariants( platformModule, dependencies, config, logging, life ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/IdentityModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/IdentityModule.java index 2596f14883ea0..485f45a9e4cf9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/IdentityModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/IdentityModule.java @@ -34,6 +34,7 @@ public class IdentityModule { public static final String CORE_MEMBER_ID_NAME = "core-member-id"; + private MemberId myself; IdentityModule( PlatformModule platformModule, File clusterStateDirectory ) @@ -71,7 +72,7 @@ public class IdentityModule } } - MemberId myself() + public MemberId myself() { return myself; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMessages.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMessages.java index c08fea6491f59..dbb29a580a2c0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMessages.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftMessages.java @@ -25,11 +25,11 @@ import java.util.List; import java.util.Objects; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.messaging.Message; import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; import org.neo4j.coreedge.core.replication.ReplicatedContent; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.coreedge.identity.StoreId; import static java.lang.String.format; @@ -633,21 +633,21 @@ public List contents() } } - class StoreIdAwareMessage implements Message + class ClusterIdAwareMessage implements Message { - private final StoreId storeId; + private final ClusterId clusterId; private final RaftMessage message; - public StoreIdAwareMessage( StoreId storeId, RaftMessage message ) + public ClusterIdAwareMessage( ClusterId clusterId, RaftMessage message ) { Objects.requireNonNull( message ); - this.storeId = storeId; + this.clusterId = clusterId; this.message = message; } - public StoreId storeId() + public ClusterId clusterId() { - return storeId; + return clusterId; } public RaftMessage message() @@ -666,20 +666,20 @@ public boolean equals( Object o ) { return false; } - StoreIdAwareMessage that = (StoreIdAwareMessage) o; - return Objects.equals( storeId, that.storeId ) && Objects.equals( message, that.message ); + ClusterIdAwareMessage that = (ClusterIdAwareMessage) o; + return Objects.equals( clusterId, that.clusterId ) && Objects.equals( message, that.message ); } @Override public int hashCode() { - return Objects.hash( storeId, message ); + return Objects.hash( clusterId, message ); } @Override public String toString() { - return format( "{storeId: %s, message: %s}", storeId, message ); + return format( "{clusterId: %s, message: %s}", clusterId, message ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftServer.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftServer.java index 7418bb5a3bf09..f5b81fb6fcb34 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftServer.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/consensus/RaftServer.java @@ -38,7 +38,6 @@ import org.neo4j.coreedge.VersionDecoder; import org.neo4j.coreedge.VersionPrepender; -import org.neo4j.coreedge.catchup.CatchupServer; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; import org.neo4j.coreedge.core.replication.ReplicatedContent; import org.neo4j.coreedge.handlers.ExceptionLoggingHandler; @@ -58,7 +57,7 @@ import static java.lang.String.format; -public class RaftServer extends LifecycleAdapter implements Inbound +public class RaftServer extends LifecycleAdapter implements Inbound { private static final Setting setting = CoreEdgeClusterSettings.raft_listen_address; private final ChannelMarshal marshal; @@ -68,7 +67,7 @@ public class RaftServer extends LifecycleAdapter implements Inbound messageHandler; + private MessageHandler messageHandler; private EventLoopGroup workerGroup; private Channel channel; @@ -163,24 +162,24 @@ protected void initChannel( SocketChannel ch ) throws Exception } @Override - public void registerHandler( Inbound.MessageHandler handler ) + public void registerHandler( Inbound.MessageHandler handler ) { this.messageHandler = handler; } - private class RaftMessageHandler extends SimpleChannelInboundHandler + private class RaftMessageHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, - RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) throws Exception + RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage ) throws Exception { try { - messageHandler.handle( storeIdAwareMessage ); + messageHandler.handle( clusterIdAwareMessage ); } catch ( Exception e ) { - log.error( format( "Failed to process message %s", storeIdAwareMessage ), e ); + log.error( format( "Failed to process message %s", clusterIdAwareMessage ), e ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/BatchingMessageHandler.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/BatchingMessageHandler.java index 4687665b76e90..50ece54ad2541 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/BatchingMessageHandler.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/BatchingMessageHandler.java @@ -26,24 +26,24 @@ import org.neo4j.coreedge.core.consensus.RaftMessages; import org.neo4j.coreedge.core.consensus.RaftMessages.RaftMessage; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.messaging.Inbound.MessageHandler; -import org.neo4j.coreedge.identity.StoreId; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import static java.util.concurrent.TimeUnit.SECONDS; -public class BatchingMessageHandler implements Runnable, MessageHandler +public class BatchingMessageHandler implements Runnable, MessageHandler { private final Log log; - private final BlockingQueue messageQueue; + private final BlockingQueue messageQueue; private final int maxBatch; - private final List batch; + private final List batch; - private MessageHandler handler; + private MessageHandler handler; - public BatchingMessageHandler( MessageHandler handler, + public BatchingMessageHandler( MessageHandler handler, int queueSize, int maxBatch, LogProvider logProvider ) { this.handler = handler; @@ -55,7 +55,7 @@ public BatchingMessageHandler( MessageHandler } @Override - public void handle( RaftMessages.StoreIdAwareMessage message ) + public void handle( RaftMessages.ClusterIdAwareMessage message ) { try { @@ -70,7 +70,7 @@ public void handle( RaftMessages.StoreIdAwareMessage message ) @Override public void run() { - RaftMessages.StoreIdAwareMessage message = null; + RaftMessages.ClusterIdAwareMessage message = null; try { message = messageQueue.poll( 1, SECONDS ); @@ -96,32 +96,32 @@ public void run() } } - private void drain( BlockingQueue messageQueue, - List batch, int maxElements ) + private void drain( BlockingQueue messageQueue, + List batch, int maxElements ) { - List tempDraining = new ArrayList<>(); + List tempDraining = new ArrayList<>(); messageQueue.drainTo( tempDraining, maxElements ); - for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : tempDraining ) + for ( RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage : tempDraining ) { - batch.add( storeIdAwareMessage ); + batch.add( clusterIdAwareMessage ); } } - private void collateAndHandleBatch( List batch ) + private void collateAndHandleBatch( List batch ) { RaftMessages.NewEntry.BatchRequest batchRequest = null; - StoreId storeId = batch.get( 0 ).storeId(); + ClusterId clusterId = batch.get( 0 ).clusterId(); - for ( RaftMessages.StoreIdAwareMessage storeIdAwareMessage : batch ) + for ( RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage : batch ) { - if ( batchRequest != null && !storeIdAwareMessage.storeId().equals( storeId )) + if ( batchRequest != null && !clusterIdAwareMessage.clusterId().equals( clusterId )) { - handler.handle( new RaftMessages.StoreIdAwareMessage( storeId, batchRequest ) ); + handler.handle( new RaftMessages.ClusterIdAwareMessage( clusterId, batchRequest ) ); batchRequest = null; } - storeId = storeIdAwareMessage.storeId(); - RaftMessage message = storeIdAwareMessage.message(); + clusterId = clusterIdAwareMessage.clusterId(); + RaftMessage message = clusterIdAwareMessage.message(); if ( message instanceof RaftMessages.NewEntry.Request ) { RaftMessages.NewEntry.Request newEntryRequest = (RaftMessages.NewEntry.Request) message; @@ -134,13 +134,13 @@ private void collateAndHandleBatch( List batch } else { - handler.handle( storeIdAwareMessage ); + handler.handle( clusterIdAwareMessage ); } } if ( batchRequest != null ) { - handler.handle( new RaftMessages.StoreIdAwareMessage( storeId, batchRequest ) ); + handler.handle( new RaftMessages.ClusterIdAwareMessage( clusterId, batchRequest ) ); } } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/CoreServerModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/CoreServerModule.java index 91e8874747597..d435c210f5179 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/CoreServerModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/server/CoreServerModule.java @@ -34,6 +34,7 @@ import org.neo4j.coreedge.catchup.tx.TransactionLogCatchUpFactory; import org.neo4j.coreedge.catchup.tx.TxPullClient; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; +import org.neo4j.coreedge.core.IdentityModule; import org.neo4j.coreedge.core.consensus.ConsensusModule; import org.neo4j.coreedge.core.consensus.ContinuousJob; import org.neo4j.coreedge.core.consensus.RaftMessages; @@ -41,20 +42,15 @@ import org.neo4j.coreedge.core.consensus.log.pruning.PruningScheduler; import org.neo4j.coreedge.core.consensus.membership.MembershipWaiter; import org.neo4j.coreedge.core.consensus.membership.MembershipWaiterLifecycle; -import org.neo4j.coreedge.core.state.BindingService; +import org.neo4j.coreedge.core.state.ClusteringModule; import org.neo4j.coreedge.core.state.CommandApplicationProcess; -import org.neo4j.coreedge.core.state.CoreBootstrapper; import org.neo4j.coreedge.core.state.CoreState; import org.neo4j.coreedge.core.state.CoreStateApplier; import org.neo4j.coreedge.core.state.LongIndexMarshal; import org.neo4j.coreedge.core.state.machines.CoreStateMachinesModule; import org.neo4j.coreedge.core.state.snapshot.CoreStateDownloader; import org.neo4j.coreedge.core.state.storage.DurableStateStorage; -import org.neo4j.coreedge.core.state.storage.SimpleFileStorage; -import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.core.state.storage.StateStorage; -import org.neo4j.coreedge.discovery.CoreTopologyService; -import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.logging.MessageLogger; import org.neo4j.coreedge.messaging.CoreReplicatedContentMarshal; @@ -73,21 +69,19 @@ import org.neo4j.logging.LogProvider; import org.neo4j.time.Clocks; -import static java.lang.Thread.sleep; - import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD; public class CoreServerModule { - private static final String CLUSTER_ID_NAME = "cluster-id"; + public static final String CLUSTER_ID_NAME = "cluster-id"; public static final String LAST_FLUSHED_NAME = "last-flushed"; public final MembershipWaiterLifecycle membershipWaiterLifecycle; - public CoreServerModule( MemberId myself, final PlatformModule platformModule, ConsensusModule consensusModule, - CoreStateMachinesModule coreStateMachinesModule, ReplicationModule replicationModule, - File clusterStateDirectory, CoreTopologyService discoveryService, - LocalDatabase localDatabase, MessageLogger messageLogger, Supplier dbHealthSupplier ) + public CoreServerModule( IdentityModule identityModule, final PlatformModule platformModule, ConsensusModule consensusModule, + CoreStateMachinesModule coreStateMachinesModule, ReplicationModule replicationModule, + File clusterStateDirectory, ClusteringModule clusteringModule, + LocalDatabase localDatabase, MessageLogger messageLogger, Supplier dbHealthSupplier ) { final Dependencies dependencies = platformModule.dependencies; final Config config = platformModule.config; @@ -113,11 +107,11 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C RaftServer raftServer = new RaftServer( new CoreReplicatedContentMarshal(), config, logProvider, userLogProvider, monitors ); - LoggingInbound loggingRaftInbound = - new LoggingInbound<>( raftServer, messageLogger, myself ); + LoggingInbound loggingRaftInbound = + new LoggingInbound<>( raftServer, messageLogger, identityModule.myself() ); - CatchUpClient catchUpClient = - life.add( new CatchUpClient( discoveryService, logProvider, Clocks.systemClock(), monitors ) ); + CatchUpClient catchUpClient = life.add( new CatchUpClient( clusteringModule.topologyService(), logProvider, + Clocks.systemClock(), monitors ) ); StoreFetcher storeFetcher = new StoreFetcher( logProvider, fileSystem, platformModule.pageCache, new StoreCopyClient( catchUpClient ), new TxPullClient( catchUpClient, platformModule.monitors ), @@ -130,20 +124,10 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher, catchUpClient, logProvider, copiedStoreRecovery ); - SimpleStorage clusterIdStorage = new SimpleFileStorage<>( fileSystem, clusterStateDirectory, - CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider ); - - CoreBootstrapper coreBootstrapper = new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache, - fileSystem, config ); - - BindingService bindingService = new BindingService( clusterIdStorage, discoveryService, logProvider, - Clocks.systemClock(), () -> sleep( 100 ), 300_000, coreBootstrapper ); - CoreState coreState = new CoreState( - consensusModule.raftMachine(), localDatabase, + consensusModule.raftMachine(), localDatabase, clusteringModule.clusterIdentity(), logProvider, downloader, - bindingService, new CommandApplicationProcess( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), @@ -165,7 +149,7 @@ public CoreServerModule( MemberId myself, final PlatformModule platformModule, C long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); MembershipWaiter membershipWaiter = - new MembershipWaiter( myself, platformModule.jobScheduler, dbHealthSupplier, + new MembershipWaiter( identityModule.myself(), platformModule.jobScheduler, dbHealthSupplier, electionTimeout * 4, logProvider ); long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout ); membershipWaiterLifecycle = new MembershipWaiterLifecycle( membershipWaiter, diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java index 40efa93f7ca3c..4de7554ddc424 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/ClusteringModule.java @@ -19,38 +19,65 @@ */ package org.neo4j.coreedge.core.state; +import java.io.File; + +import org.neo4j.coreedge.core.state.storage.SimpleFileStorage; +import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.DiscoveryServiceFactory; +import org.neo4j.coreedge.identity.ClusterId; +import org.neo4j.coreedge.identity.ClusterIdentity; import org.neo4j.coreedge.identity.MemberId; +import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.factory.PlatformModule; -import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.logging.LogProvider; +import org.neo4j.time.Clocks; + +import static java.lang.Thread.sleep; + +import static org.neo4j.coreedge.core.server.CoreServerModule.CLUSTER_ID_NAME; public class ClusteringModule { private final CoreTopologyService topologyService; + private final ClusterIdentity clusterIdentity; public ClusteringModule( DiscoveryServiceFactory discoveryServiceFactory, MemberId myself, - PlatformModule platformModule ) + PlatformModule platformModule, File clusterStateDirectory ) { LifeSupport life = platformModule.life; Config config = platformModule.config; LogProvider logProvider = platformModule.logging.getInternalLogProvider(); LogProvider userLogProvider = platformModule.logging.getUserLogProvider(); Dependencies dependencies = platformModule.dependencies; + FileSystemAbstraction fileSystem = platformModule.fileSystem; topologyService = discoveryServiceFactory.coreTopologyService( config, myself, logProvider, userLogProvider ); life.add( topologyService ); dependencies.satisfyDependency( topologyService ); // for tests + + SimpleStorage clusterIdStorage = new SimpleFileStorage<>( fileSystem, clusterStateDirectory, + CLUSTER_ID_NAME, new ClusterId.Marshal(), logProvider ); + + CoreBootstrapper coreBootstrapper = new CoreBootstrapper( platformModule.storeDir, platformModule.pageCache, + fileSystem, config ); + + clusterIdentity = new ClusterIdentity( clusterIdStorage, topologyService, logProvider, + Clocks.systemClock(), () -> sleep( 100 ), 300_000, coreBootstrapper ); } public CoreTopologyService topologyService() { return topologyService; } + + public ClusterIdentity clusterIdentity() + { + return clusterIdentity; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreBootstrapper.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreBootstrapper.java index 9e16959fd092e..1abecfcf4f0b0 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreBootstrapper.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreBootstrapper.java @@ -106,7 +106,7 @@ public CoreBootstrapper( File storeDir, PageCache pageCache, FileSystemAbstracti this.config = config; } - CoreSnapshot bootstrap( Set members ) throws IOException + public CoreSnapshot bootstrap( Set members ) throws IOException { StoreFactory factory = new StoreFactory( storeDir, config, new DefaultIdGeneratorFactory( fs ), pageCache, fs, NullLogProvider.getInstance() ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java index 1669c560036a3..7500653e20a31 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/CoreState.java @@ -31,6 +31,7 @@ import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; import org.neo4j.coreedge.core.state.snapshot.CoreStateDownloader; import org.neo4j.coreedge.identity.ClusterId; +import org.neo4j.coreedge.identity.ClusterIdentity; import org.neo4j.coreedge.identity.MemberId; import org.neo4j.coreedge.messaging.Inbound.MessageHandler; import org.neo4j.kernel.lifecycle.Lifecycle; @@ -39,52 +40,60 @@ import static java.util.concurrent.TimeUnit.MINUTES; -public class CoreState implements MessageHandler, LogPruner, Lifecycle +public class CoreState implements MessageHandler, LogPruner, Lifecycle { private final RaftMachine raftMachine; private final LocalDatabase localDatabase; private final Log log; + private final ClusterIdentity clusterIdentity; private final CoreStateDownloader downloader; - private final BindingService bindingService; private final CommandApplicationProcess applicationProcess; private final CountDownLatch bootstrapLatch = new CountDownLatch( 1 ); - private ClusterId boundClusterId; // TODO: Use for network message filtering. - public CoreState( RaftMachine raftMachine, LocalDatabase localDatabase, + ClusterIdentity clusterIdentity, LogProvider logProvider, CoreStateDownloader downloader, - BindingService bindingService, CommandApplicationProcess commandApplicationProcess ) { this.raftMachine = raftMachine; this.localDatabase = localDatabase; + this.clusterIdentity = clusterIdentity; this.downloader = downloader; - this.bindingService = bindingService; this.log = logProvider.getLog( getClass() ); this.applicationProcess = commandApplicationProcess; } - public void handle( RaftMessages.StoreIdAwareMessage storeIdAwareMessage ) + public void handle( RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage ) { - try + ClusterId clusterId = clusterIdAwareMessage.clusterId(); + if ( clusterId.equals( clusterIdentity.clusterId() ) ) { - ConsensusOutcome outcome = raftMachine.handle( storeIdAwareMessage.message() ); - if ( outcome.needsFreshSnapshot() ) + try { - downloadSnapshot( storeIdAwareMessage.message().from() ); + ConsensusOutcome outcome = raftMachine.handle( clusterIdAwareMessage.message() ); + if ( outcome.needsFreshSnapshot() ) + { + downloadSnapshot( clusterIdAwareMessage.message().from() ); + } + else + { + notifyCommitted( outcome.getCommitIndex() ); + } } - else + catch ( Throwable e ) { - notifyCommitted( outcome.getCommitIndex() ); + raftMachine.stopTimers(); + localDatabase.panic( e ); } } - catch ( Throwable e ) + else { - raftMachine.stopTimers(); - localDatabase.panic( e ); + log.info( "Discarding message[%s] owing to mismatched storeId and non-empty store. " + + "Expected: %s, Encountered: %s", + clusterIdAwareMessage.message(), clusterId, clusterIdentity.clusterId() ); } } @@ -149,8 +158,7 @@ public void start() throws Throwable // 2. Bootstrap (single selected server) // 3. Download from someone else (others) - // TODO: Binding service can return whether or not we are allowed to bootstrap. ClusterId can be exposed at the interface. - boundClusterId = bindingService.bindToCluster( this::installSnapshot ); + clusterIdentity.bindToCluster( this::installSnapshot ); // TODO: Move haveState and CoreBootstrapper into CommandApplicationProcess, which perhaps needs a better name. // TODO: Include the None/Partial/Full in the move. diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingException.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/BindingException.java similarity index 96% rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingException.java rename to enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/BindingException.java index fd22275c79d35..27494bcaee9e4 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingException.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/BindingException.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.coreedge.core.state; +package org.neo4j.coreedge.identity; class BindingException extends Exception { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/ClusterId.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/ClusterId.java index 29a23d3ea1016..0f8776d75fb0e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/ClusterId.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/ClusterId.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.UUID; +import org.neo4j.coreedge.core.consensus.membership.RaftMembershipState; import org.neo4j.coreedge.core.state.storage.SafeChannelMarshal; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; @@ -68,6 +69,8 @@ public String toString() public static class Marshal extends SafeChannelMarshal { + public static Marshal INSTANCE = new Marshal(); + @Override public void marshal( ClusterId clusterId, WritableChannel channel ) throws IOException { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingService.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/ClusterIdentity.java similarity index 83% rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingService.java rename to enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/ClusterIdentity.java index 3b6c6ebf5c59c..d8d59283c8207 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/core/state/BindingService.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/identity/ClusterIdentity.java @@ -17,24 +17,24 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.coreedge.core.state; +package org.neo4j.coreedge.identity; import java.io.IOException; import java.time.Clock; import java.util.UUID; import java.util.concurrent.TimeoutException; +import org.neo4j.coreedge.core.state.CoreBootstrapper; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.discovery.CoreTopology; import org.neo4j.coreedge.discovery.CoreTopologyService; -import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.function.ThrowingAction; import org.neo4j.function.ThrowingConsumer; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -public class BindingService +public class ClusterIdentity { private final SimpleStorage clusterIdStorage; private final CoreTopologyService topologyService; @@ -44,9 +44,11 @@ public class BindingService private final ThrowingAction retryWaiter; private final long timeoutMillis; - public BindingService( SimpleStorage clusterIdStorage, CoreTopologyService topologyService, - LogProvider logProvider, Clock clock, ThrowingAction retryWaiter, - long timeoutMillis, CoreBootstrapper coreBootstrapper ) + private ClusterId clusterId; + + public ClusterIdentity( SimpleStorage clusterIdStorage, CoreTopologyService topologyService, + LogProvider logProvider, Clock clock, ThrowingAction retryWaiter, + long timeoutMillis, CoreBootstrapper coreBootstrapper ) { this.clusterIdStorage = clusterIdStorage; this.topologyService = topologyService; @@ -61,18 +63,17 @@ public BindingService( SimpleStorage clusterIdStorage, CoreTopologySe * The cluster binding process tries to establish a common cluster ID. If there is no common cluster ID * then a single instance will eventually create one and publish it through the underlying topology service. * - * @return The common cluster ID. * @throws IOException If there is an issue with I/O. * @throws InterruptedException If the process gets interrupted. * @throws TimeoutException If the process times out. */ - ClusterId bindToCluster( ThrowingConsumer snapshotInstaller ) throws Throwable + public void bindToCluster( ThrowingConsumer snapshotInstaller ) throws Throwable { if ( clusterIdStorage.exists() ) { ClusterId localClusterId = clusterIdStorage.readState(); publishClusterId( localClusterId ); - return localClusterId; + clusterId = localClusterId; } else { @@ -110,10 +111,19 @@ ClusterId bindToCluster( ThrowingConsumer snapshotInsta clusterIdStorage.writeState( commonClusterId ); - return commonClusterId; + clusterId = commonClusterId; } } + public ClusterId clusterId() + { + if ( clusterId == null ) + { + throw new IllegalStateException( "Cannot ask for cluster id before bound to a cluster" ); + } + return clusterId; + } + private void publishClusterId( ClusterId localClusterId ) throws BindingException { boolean success = topologyService.casClusterId( localClusterId ); @@ -126,4 +136,5 @@ private void publishClusterId( ClusterId localClusterId ) throws BindingExceptio log.info( "Published: " + localClusterId ); } } + } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/RaftOutbound.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/RaftOutbound.java index 82d8724de9b21..c9a96579a77c2 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/RaftOutbound.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/RaftOutbound.java @@ -21,12 +21,12 @@ import java.util.Collection; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.core.consensus.RaftMessages.RaftMessage; -import org.neo4j.coreedge.core.consensus.RaftMessages.StoreIdAwareMessage; +import org.neo4j.coreedge.core.consensus.RaftMessages.ClusterIdAwareMessage; import org.neo4j.coreedge.discovery.CoreAddresses; import org.neo4j.coreedge.discovery.CoreTopologyService; import org.neo4j.coreedge.discovery.NoKnownAddressesException; +import org.neo4j.coreedge.identity.ClusterIdentity; import org.neo4j.coreedge.identity.MemberId; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.coreedge.messaging.address.UnknownAddressMonitor; @@ -39,15 +39,15 @@ public class RaftOutbound implements Outbound { private final CoreTopologyService discoveryService; private final Outbound outbound; - private final LocalDatabase localDatabase; + private final ClusterIdentity clusterIdentity; private final UnknownAddressMonitor unknownAddressMonitor; public RaftOutbound( CoreTopologyService discoveryService, Outbound outbound, - LocalDatabase localDatabase, LogProvider logProvider, long logThresholdMillis ) + ClusterIdentity clusterIdentity, LogProvider logProvider, long logThresholdMillis ) { this.discoveryService = discoveryService; this.outbound = outbound; - this.localDatabase = localDatabase; + this.clusterIdentity = clusterIdentity; this.unknownAddressMonitor = new UnknownAddressMonitor( logProvider.getLog( this.getClass() ), Clocks.systemClock(), logThresholdMillis ); } @@ -58,7 +58,7 @@ public void send( MemberId to, RaftMessage message ) try { CoreAddresses coreAddresses = discoveryService.coreServers().find( to ); - outbound.send( coreAddresses.getRaftServer(), decorateWithStoreId( message ) ); + outbound.send( coreAddresses.getRaftServer(), decorateWithClusterId( message ) ); } catch ( NoKnownAddressesException e ) { @@ -73,7 +73,7 @@ public void send( MemberId to, Collection messages ) { CoreAddresses coreAddresses = discoveryService.coreServers().find( to ); outbound.send( coreAddresses.getRaftServer(), - messages.stream().map( this::decorateWithStoreId ).collect( toList() ) ); + messages.stream().map( this::decorateWithClusterId ).collect( toList() ) ); } catch ( NoKnownAddressesException e ) { @@ -81,8 +81,8 @@ public void send( MemberId to, Collection messages ) } } - private StoreIdAwareMessage decorateWithStoreId( RaftMessage m ) + private ClusterIdAwareMessage decorateWithClusterId( RaftMessage m ) { - return new StoreIdAwareMessage( localDatabase.storeId(), m ); + return new ClusterIdAwareMessage( clusterIdentity.clusterId(), m ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageDecoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageDecoder.java index 96d334b539889..4dca00be1fdba 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageDecoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageDecoder.java @@ -29,11 +29,10 @@ import org.neo4j.coreedge.core.consensus.RaftMessages; import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; import org.neo4j.coreedge.core.replication.ReplicatedContent; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.messaging.EndOfStreamException; import org.neo4j.coreedge.messaging.NetworkReadableClosableChannelNetty4; -import org.neo4j.coreedge.messaging.marshalling.storeid.StoreIdMarshal; import org.neo4j.storageengine.api.ReadableChannel; import static org.neo4j.coreedge.core.consensus.RaftMessages.Type.APPEND_ENTRIES_REQUEST; @@ -57,7 +56,7 @@ public RaftMessageDecoder( ChannelMarshal marshal ) protected void decode( ChannelHandlerContext ctx, ByteBuf buffer, List list ) throws Exception { ReadableChannel channel = new NetworkReadableClosableChannelNetty4( buffer ); - StoreId storeId = StoreIdMarshal.INSTANCE.unmarshal( channel ); + ClusterId clusterId = ClusterId.Marshal.INSTANCE.unmarshal( channel ); int messageTypeWire = channel.getInt(); RaftMessages.Type[] values = RaftMessages.Type.values(); @@ -140,7 +139,7 @@ else if ( messageType.equals( LOG_COMPACTION_INFO ) ) throw new IllegalArgumentException( "Unknown message type" ); } - list.add( new RaftMessages.StoreIdAwareMessage( storeId, result ) ); + list.add( new RaftMessages.ClusterIdAwareMessage( clusterId, result ) ); } private MemberId retrieveMember( ReadableChannel buffer ) throws IOException, EndOfStreamException diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageEncoder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageEncoder.java index 3df0d2dc3ce75..35e8ecb565376 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageEncoder.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageEncoder.java @@ -27,12 +27,11 @@ import org.neo4j.coreedge.core.consensus.RaftMessages; import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; import org.neo4j.coreedge.core.replication.ReplicatedContent; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.coreedge.identity.StoreId; import org.neo4j.coreedge.messaging.NetworkFlushableByteBuf; -import org.neo4j.coreedge.messaging.marshalling.storeid.StoreIdMarshal; -public class RaftMessageEncoder extends MessageToMessageEncoder +public class RaftMessageEncoder extends MessageToMessageEncoder { private final ChannelMarshal marshal; @@ -43,15 +42,15 @@ public RaftMessageEncoder( ChannelMarshal marshal ) @Override protected synchronized void encode( ChannelHandlerContext ctx, - RaftMessages.StoreIdAwareMessage decoratedMessage, + RaftMessages.ClusterIdAwareMessage decoratedMessage, List list ) throws Exception { RaftMessages.RaftMessage message = decoratedMessage.message(); - StoreId storeId = decoratedMessage.storeId(); + ClusterId clusterId = decoratedMessage.clusterId(); MemberId.Marshal memberMarshal = new MemberId.Marshal(); NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( ctx.alloc().buffer() ); - StoreIdMarshal.INSTANCE.marshal( storeId, channel ); + ClusterId.Marshal.INSTANCE.marshal( clusterId, channel ); channel.putInt( message.type().ordinal() ); memberMarshal.marshal( message.from(), channel ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/server/BatchingMessageHandlerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/server/BatchingMessageHandlerTest.java index 0c191d3610a80..55ebe6a0108c1 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/server/BatchingMessageHandlerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/server/BatchingMessageHandlerTest.java @@ -19,6 +19,7 @@ */ package org.neo4j.coreedge.core.server; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -26,10 +27,10 @@ import org.junit.Before; import org.junit.Test; -import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.core.consensus.RaftMessages; import org.neo4j.coreedge.core.consensus.ReplicatedString; -import org.neo4j.coreedge.identity.StoreId; +import org.neo4j.coreedge.identity.ClusterId; +import org.neo4j.coreedge.identity.ClusterIdentity; import org.neo4j.coreedge.messaging.Inbound.MessageHandler; import org.neo4j.logging.NullLogProvider; @@ -42,14 +43,14 @@ public class BatchingMessageHandlerTest { private static final int MAX_BATCH = 16; private static final int QUEUE_SIZE = 64; - private LocalDatabase localDatabase = mock( LocalDatabase.class ); - private MessageHandler raftStateMachine = mock( MessageHandler.class ); - private StoreId localStoreId = new StoreId( 1, 2, 3, 4 ); + private ClusterIdentity clusterIdentity = mock( ClusterIdentity.class ); + private MessageHandler raftStateMachine = mock( MessageHandler.class ); + private ClusterId localClusterId = new ClusterId( UUID.randomUUID() ); @Before public void setup() { - when( localDatabase.storeId() ).thenReturn( localStoreId ); + when( clusterIdentity.clusterId() ).thenReturn( localClusterId ); } @Test @@ -59,8 +60,8 @@ public void shouldInvokeInnerHandlerWhenRun() throws Exception BatchingMessageHandler batchHandler = new BatchingMessageHandler( raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance() ); - RaftMessages.StoreIdAwareMessage message = new RaftMessages.StoreIdAwareMessage( - localStoreId, new RaftMessages.NewEntry.Request( null, null ) ); + RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage( + localClusterId, new RaftMessages.NewEntry.Request( null, null ) ); batchHandler.handle( message ); verifyZeroInteractions( raftStateMachine ); @@ -77,7 +78,7 @@ public void shouldInvokeHandlerOnQueuedMessage() throws Exception // given BatchingMessageHandler batchHandler = new BatchingMessageHandler( raftStateMachine, QUEUE_SIZE, MAX_BATCH, NullLogProvider.getInstance() ); - RaftMessages.StoreIdAwareMessage message = new RaftMessages.StoreIdAwareMessage( localStoreId, + RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage( localClusterId, new RaftMessages.NewEntry.Request( null, null ) ); ExecutorService executor = Executors.newCachedThreadPool(); @@ -108,8 +109,8 @@ public void shouldBatchRequests() throws Exception RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request( null, contentA ); RaftMessages.NewEntry.Request messageB = new RaftMessages.NewEntry.Request( null, contentB ); - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageA ) ); - batchHandler.handle( new RaftMessages.StoreIdAwareMessage( localStoreId, messageB ) ); + batchHandler.handle( new RaftMessages.ClusterIdAwareMessage( localClusterId, messageA ) ); + batchHandler.handle( new RaftMessages.ClusterIdAwareMessage( localClusterId, messageB ) ); verifyZeroInteractions( raftStateMachine ); // when @@ -119,7 +120,7 @@ public void shouldBatchRequests() throws Exception RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest( 2 ); batchRequest.add( contentA ); batchRequest.add( contentB ); - verify( raftStateMachine ).handle( new RaftMessages.StoreIdAwareMessage( localStoreId, batchRequest ) ); + verify( raftStateMachine ).handle( new RaftMessages.ClusterIdAwareMessage( localClusterId, batchRequest ) ); } @Test @@ -132,13 +133,13 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep ReplicatedString contentA = new ReplicatedString( "A" ); ReplicatedString contentC = new ReplicatedString( "C" ); - RaftMessages.StoreIdAwareMessage messageA = new RaftMessages.StoreIdAwareMessage( localStoreId, + RaftMessages.ClusterIdAwareMessage messageA = new RaftMessages.ClusterIdAwareMessage( localClusterId, new RaftMessages.NewEntry.Request( null, contentA ) ); - RaftMessages.StoreIdAwareMessage messageB = new RaftMessages.StoreIdAwareMessage( localStoreId, + RaftMessages.ClusterIdAwareMessage messageB = new RaftMessages.ClusterIdAwareMessage( localClusterId, new RaftMessages.Heartbeat( null, 0, 0, 0 ) ); - RaftMessages.StoreIdAwareMessage messageC = new RaftMessages.StoreIdAwareMessage( localStoreId, + RaftMessages.ClusterIdAwareMessage messageC = new RaftMessages.ClusterIdAwareMessage( localClusterId, new RaftMessages.NewEntry.Request( null, contentC ) ); - RaftMessages.StoreIdAwareMessage messageD = new RaftMessages.StoreIdAwareMessage( localStoreId, + RaftMessages.ClusterIdAwareMessage messageD = new RaftMessages.ClusterIdAwareMessage( localClusterId, new RaftMessages.Heartbeat( null, 1, 1, 1 ) ); batchHandler.handle( messageA ); @@ -155,7 +156,7 @@ public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() throws Excep batchRequest.add( contentA ); batchRequest.add( contentC ); - verify( raftStateMachine ).handle( new RaftMessages.StoreIdAwareMessage( localStoreId, batchRequest ) ); + verify( raftStateMachine ).handle( new RaftMessages.ClusterIdAwareMessage( localClusterId, batchRequest ) ); verify( raftStateMachine ).handle( messageB ); verify( raftStateMachine ).handle( messageD ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingServiceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/identity/ClusterIdentityTest.java similarity index 86% rename from enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingServiceTest.java rename to enterprise/core-edge/src/test/java/org/neo4j/coreedge/identity/ClusterIdentityTest.java index bc82b7d0591ca..db4acdad628c8 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/core/state/BindingServiceTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/identity/ClusterIdentityTest.java @@ -17,7 +17,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.coreedge.core.state; +package org.neo4j.coreedge.identity; import java.io.IOException; import java.util.UUID; @@ -26,11 +26,11 @@ import org.junit.Test; +import org.neo4j.coreedge.core.state.CoreBootstrapper; import org.neo4j.coreedge.core.state.snapshot.CoreSnapshot; import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.discovery.CoreTopology; import org.neo4j.coreedge.discovery.CoreTopologyService; -import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.function.ThrowingConsumer; import org.neo4j.logging.NullLogProvider; import org.neo4j.time.Clocks; @@ -46,7 +46,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class BindingServiceTest +public class ClusterIdentityTest { private final CoreBootstrapper coreBootstrapper = mock( CoreBootstrapper.class ); private final FakeClock clock = Clocks.fakeClock(); @@ -59,7 +59,7 @@ public void shouldTimeoutWhenNotBootrappableAndNobodyElsePublishesClusterId() th CoreTopologyService topologyService = mock( CoreTopologyService.class ); when( topologyService.coreServers() ).thenReturn( unboundTopology ); - BindingService binder = new BindingService( new StubClusterIdStorage(), topologyService, + ClusterIdentity binder = new ClusterIdentity( new StubClusterIdStorage(), topologyService, NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, coreBootstrapper ); @@ -89,15 +89,15 @@ public void shouldBindToClusterIdPublishedByAnotherMember() throws Throwable CoreTopologyService topologyService = mock( CoreTopologyService.class ); when( topologyService.coreServers() ).thenReturn( unboundTopology ).thenReturn( boundTopology ); - BindingService binder = new BindingService( new StubClusterIdStorage(), topologyService, + ClusterIdentity binder = new ClusterIdentity( new StubClusterIdStorage(), topologyService, NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, coreBootstrapper ); // when - ClusterId boundClusterId = binder.bindToCluster( null ); + binder.bindToCluster( null ); // then - assertEquals( publishedClusterId, boundClusterId ); + assertEquals( publishedClusterId, binder.clusterId() ); verify( topologyService, atLeast( 2 ) ).coreServers(); } @@ -113,16 +113,16 @@ public void shouldPublishStoredClusterIdIfPreviouslyBound() throws Throwable StubClusterIdStorage clusterIdStorage = new StubClusterIdStorage(); clusterIdStorage.writeState( previouslyBoundClusterId ); - BindingService binder = new BindingService( clusterIdStorage, topologyService, + ClusterIdentity binder = new ClusterIdentity( clusterIdStorage, topologyService, NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, coreBootstrapper ); // when - ClusterId boundClusterId = binder.bindToCluster( null ); + binder.bindToCluster( null ); // then verify( topologyService ).casClusterId( previouslyBoundClusterId ); - assertEquals( previouslyBoundClusterId, boundClusterId ); + assertEquals( previouslyBoundClusterId, binder.clusterId() ); } @Test @@ -137,7 +137,7 @@ public void shouldFailToPublishMismatchingStoredClusterId() throws Throwable StubClusterIdStorage clusterIdStorage = new StubClusterIdStorage(); clusterIdStorage.writeState( previouslyBoundClusterId ); - BindingService binder = new BindingService( clusterIdStorage, topologyService, + ClusterIdentity binder = new ClusterIdentity( clusterIdStorage, topologyService, NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, coreBootstrapper ); @@ -163,18 +163,18 @@ public void shouldBootstrapWhenBootstrappable() throws Throwable when( topologyService.coreServers() ).thenReturn( bootstrappableTopology ); when( topologyService.casClusterId( any() ) ).thenReturn( true ); - BindingService binder = new BindingService( new StubClusterIdStorage(), topologyService, + ClusterIdentity binder = new ClusterIdentity( new StubClusterIdStorage(), topologyService, NullLogProvider.getInstance(), clock, () -> clock.forward( 1, TimeUnit.SECONDS ), 3_000, coreBootstrapper ); ThrowingConsumer snapshotInstaller = mock( ThrowingConsumer.class ); // when - ClusterId boundClusterId = binder.bindToCluster( snapshotInstaller ); + binder.bindToCluster( snapshotInstaller ); // then verify( coreBootstrapper ).bootstrap( any() ); - verify( topologyService ).casClusterId( boundClusterId ); + verify( topologyService ).casClusterId( binder.clusterId() ); verify( snapshotInstaller ).accept( any() ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageEncodingDecodingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageEncodingDecodingTest.java index 5841dd17d061b..2ce7198ea4f66 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageEncodingDecodingTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/messaging/marshalling/RaftMessageEncodingDecodingTest.java @@ -38,8 +38,8 @@ import org.neo4j.coreedge.core.consensus.log.RaftLogEntry; import org.neo4j.coreedge.core.replication.ReplicatedContent; import org.neo4j.coreedge.core.state.storage.SafeChannelMarshal; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.coreedge.identity.MemberId; -import org.neo4j.coreedge.identity.StoreId; import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.WritableChannel; @@ -49,8 +49,7 @@ public class RaftMessageEncodingDecodingTest { - - private StoreId storeId = new StoreId( 1, 2, 3, 4 ); + private ClusterId clusterId = new ClusterId( UUID.randomUUID() ); @Test public void shouldSerializeAppendRequestWithMultipleEntries() throws Exception @@ -104,7 +103,7 @@ public void shouldSerializeHeartbeats() throws Exception // When MemberId sender = new MemberId( UUID.randomUUID() ); - RaftMessages.StoreIdAwareMessage message = new RaftMessages.StoreIdAwareMessage( storeId, + RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage( clusterId, new RaftMessages.Heartbeat( sender, 1, 2, 3 ) ); encoder.encode( setupContext(), message, resultingBuffers ); @@ -157,8 +156,8 @@ private void serializeReadBackAndVerifyMessage( RaftMessages.RaftMessage message ArrayList thingsRead = new ArrayList<>( 1 ); // When - RaftMessages.StoreIdAwareMessage decoratedMessage = - new RaftMessages.StoreIdAwareMessage( storeId, message ); + RaftMessages.ClusterIdAwareMessage decoratedMessage = + new RaftMessages.ClusterIdAwareMessage( clusterId, message ); encoder.encode( setupContext(), decoratedMessage, resultingBuffers ); // Then diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java index 8351834262f70..2365f2e7fc2d4 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/ClusterIdentityIT.java @@ -28,17 +28,22 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import org.neo4j.coreedge.core.CoreEdgeClusterSettings; +import org.neo4j.coreedge.core.state.storage.SimpleFileStorage; +import org.neo4j.coreedge.core.state.storage.SimpleStorage; import org.neo4j.coreedge.discovery.Cluster; import org.neo4j.coreedge.discovery.CoreClusterMember; +import org.neo4j.coreedge.identity.ClusterId; import org.neo4j.graphdb.Node; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.impl.pagecache.StandalonePageCacheFactory; import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.lifecycle.LifecycleException; +import org.neo4j.logging.NullLogProvider; import org.neo4j.test.coreedge.ClusterRule; import org.neo4j.test.rule.SuppressOutput; @@ -47,6 +52,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.neo4j.coreedge.TestStoreId.assertAllStoresHaveTheSameStoreId; +import static org.neo4j.coreedge.core.EnterpriseCoreEditionModule.CLUSTER_STATE_DIRECTORY_NAME; +import static org.neo4j.coreedge.core.server.CoreServerModule.CLUSTER_ID_NAME; import static org.neo4j.graphdb.Label.label; import static org.neo4j.kernel.impl.store.MetaDataStore.Position.RANDOM_NUMBER; import static org.neo4j.test.rule.SuppressOutput.suppress; @@ -196,7 +203,7 @@ public void badFollowerShouldNotJoinCluster() throws Exception File storeDir = cluster.getCoreMemberById( 0 ).storeDir(); cluster.removeCoreMemberWithMemberId( 0 ); - changeStoreId( storeDir ); + changeClusterId( storeDir ); SampleData.createSomeData( 100, cluster ); @@ -253,6 +260,13 @@ private List storeDirs( Collection dbs ) return dbs.stream().map( CoreClusterMember::storeDir ).collect( Collectors.toList() ); } + private void changeClusterId( File storeDir ) throws IOException + { + SimpleStorage clusterIdStorage = new SimpleFileStorage<>( fs, new File( storeDir, CLUSTER_STATE_DIRECTORY_NAME ), + CLUSTER_ID_NAME, new ClusterId.Marshal(), NullLogProvider.getInstance() ); + clusterIdStorage.writeState( new ClusterId( UUID.randomUUID() ) ); + } + private void changeStoreId( File storeDir ) throws IOException { File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME );