diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/CoreStateMachinesModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/CoreStateMachinesModule.java new file mode 100644 index 0000000000000..eb18c4e794f09 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/CoreStateMachinesModule.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge; + +import java.io.File; +import java.io.IOException; +import java.util.function.Supplier; + +import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; +import org.neo4j.coreedge.raft.LeaderLocator; +import org.neo4j.coreedge.raft.replication.RaftReplicator; +import org.neo4j.coreedge.raft.replication.Replicator; +import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine; +import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory; +import org.neo4j.coreedge.raft.replication.id.ReplicatedIdRangeAcquirer; +import org.neo4j.coreedge.raft.replication.token.ReplicatedLabelTokenHolder; +import org.neo4j.coreedge.raft.replication.token.ReplicatedPropertyKeyTokenHolder; +import org.neo4j.coreedge.raft.replication.token.ReplicatedRelationshipTypeTokenHolder; +import org.neo4j.coreedge.raft.replication.token.ReplicatedTokenStateMachine; +import org.neo4j.coreedge.raft.replication.token.TokenRegistry; +import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionCommitProcess; +import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionStateMachine; +import org.neo4j.coreedge.raft.state.CoreStateMachines; +import org.neo4j.coreedge.raft.state.DurableStateStorage; +import org.neo4j.coreedge.raft.state.StateStorage; +import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState; +import org.neo4j.coreedge.server.CoreEdgeClusterSettings; +import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.coreedge.server.core.RecoverTransactionLogState; +import org.neo4j.coreedge.server.core.locks.LeaderOnlyLockManager; +import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenState; +import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenStateMachine; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.api.CommitProcessFactory; +import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; +import org.neo4j.kernel.impl.core.LabelTokenHolder; +import org.neo4j.kernel.impl.core.PropertyKeyTokenHolder; +import org.neo4j.kernel.impl.core.RelationshipTypeToken; +import org.neo4j.kernel.impl.core.RelationshipTypeTokenHolder; +import org.neo4j.kernel.impl.enterprise.id.EnterpriseIdTypeConfigurationProvider; +import org.neo4j.kernel.impl.factory.CommunityEditionModule; +import org.neo4j.kernel.impl.factory.PlatformModule; +import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.logging.LogService; +import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; +import org.neo4j.kernel.impl.store.id.configuration.IdTypeConfigurationProvider; +import org.neo4j.kernel.impl.store.stats.IdBasedStoreEntityCounters; +import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.LifeSupport; +import org.neo4j.logging.LogProvider; +import org.neo4j.storageengine.api.Token; + +public class CoreStateMachinesModule +{ + public final IdGeneratorFactory idGeneratorFactory; + public final IdTypeConfigurationProvider idTypeConfigurationProvider; + public final LabelTokenHolder labelTokenHolder; + public final PropertyKeyTokenHolder propertyKeyTokenHolder; + public final RelationshipTypeTokenHolder relationshipTypeTokenHolder; + public final Locks lockManager; + public final CommitProcessFactory commitProcessFactory; + + public final ReplicatedIdGeneratorFactory replicatedIdGeneratorFactory; + public final CoreStateMachines coreStateMachines; + + public CoreStateMachinesModule( CoreMember myself, PlatformModule platformModule, File clusterStateDirectory, + Supplier databaseHealthSupplier, Config config, + RaftReplicator replicator, LeaderLocator leaderLocator, + Dependencies dependencies, LocalDatabase localDatabase ) + { + StateStorage idAllocationState; + StateStorage lockTokenState; + final LifeSupport life = platformModule.life; + final FileSystemAbstraction fileSystem = platformModule.fileSystem; + LogService logging = platformModule.logging; + LogProvider logProvider = logging.getInternalLogProvider(); + + try + { + lockTokenState = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "lock-token-state" ), + "lock-token", new ReplicatedLockTokenState.Marshal( new CoreMember.CoreMemberMarshal() ), + config.get( CoreEdgeClusterSettings.replicated_lock_token_state_size ), + databaseHealthSupplier, logProvider ) ); + + idAllocationState = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "id-allocation-state" ), + "id-allocation", new IdAllocationState.Marshal(), + config.get( CoreEdgeClusterSettings.id_alloc_state_size ), databaseHealthSupplier, + logProvider ) ); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + + ReplicatedIdAllocationStateMachine idAllocationStateMachine = + new ReplicatedIdAllocationStateMachine( idAllocationState ); + + int allocationChunk = 1024; // TODO: AllocationChunk should be configurable and per type. + ReplicatedIdRangeAcquirer idRangeAcquirer = + new ReplicatedIdRangeAcquirer( replicator, idAllocationStateMachine, allocationChunk, myself, + logProvider ); + + idTypeConfigurationProvider = new EnterpriseIdTypeConfigurationProvider( config ); + replicatedIdGeneratorFactory = createIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider, + idTypeConfigurationProvider ); + + this.idGeneratorFactory = dependencies.satisfyDependency( replicatedIdGeneratorFactory ); + dependencies.satisfyDependency( new IdBasedStoreEntityCounters( this.idGeneratorFactory ) ); + + Long tokenCreationTimeout = config.get( CoreEdgeClusterSettings.token_creation_timeout ); + + TokenRegistry relationshipTypeTokenRegistry = new TokenRegistry<>( "RelationshipType" ); + ReplicatedRelationshipTypeTokenHolder relationshipTypeTokenHolder = + new ReplicatedRelationshipTypeTokenHolder( relationshipTypeTokenRegistry, replicator, + this.idGeneratorFactory, dependencies, tokenCreationTimeout ); + + TokenRegistry propertyKeyTokenRegistry = new TokenRegistry<>( "PropertyKey" ); + ReplicatedPropertyKeyTokenHolder propertyKeyTokenHolder = + new ReplicatedPropertyKeyTokenHolder( propertyKeyTokenRegistry, replicator, this.idGeneratorFactory, + dependencies, tokenCreationTimeout ); + + TokenRegistry labelTokenRegistry = new TokenRegistry<>( "Label" ); + ReplicatedLabelTokenHolder labelTokenHolder = + new ReplicatedLabelTokenHolder( labelTokenRegistry, replicator, this.idGeneratorFactory, dependencies, + tokenCreationTimeout ); + + ReplicatedLockTokenStateMachine replicatedLockTokenStateMachine = + new ReplicatedLockTokenStateMachine( lockTokenState ); + + RecoverTransactionLogState txLogState = new RecoverTransactionLogState( dependencies, logProvider ); + + ReplicatedTokenStateMachine labelTokenStateMachine = + new ReplicatedTokenStateMachine<>( labelTokenRegistry, new Token.Factory(), logProvider ); + + ReplicatedTokenStateMachine propertyKeyTokenStateMachine = + new ReplicatedTokenStateMachine<>( propertyKeyTokenRegistry, new Token.Factory(), logProvider ); + + ReplicatedTokenStateMachine relationshipTypeTokenStateMachine = + new ReplicatedTokenStateMachine<>( relationshipTypeTokenRegistry, new RelationshipTypeToken.Factory(), + logProvider ); + + ReplicatedTransactionStateMachine replicatedTxStateMachine = + new ReplicatedTransactionStateMachine( replicatedLockTokenStateMachine, + config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), logProvider ); + + dependencies.satisfyDependencies( replicatedTxStateMachine ); + + long leaderLockTokenTimeout = config.get( CoreEdgeClusterSettings.leader_lock_token_timeout ); + lockManager = createLockManager( config, logging, replicator, myself, leaderLocator, leaderLockTokenTimeout, + replicatedLockTokenStateMachine ); + + coreStateMachines = new CoreStateMachines( replicatedTxStateMachine, labelTokenStateMachine, + relationshipTypeTokenStateMachine, propertyKeyTokenStateMachine, replicatedLockTokenStateMachine, + idAllocationStateMachine, txLogState, localDatabase ); + + commitProcessFactory = ( appender, applier, ignored ) -> { + TransactionRepresentationCommitProcess localCommit = + new TransactionRepresentationCommitProcess( appender, applier ); + coreStateMachines.refresh( localCommit ); // This gets called when a core-to-core download is performed. + return new ReplicatedTransactionCommitProcess( replicator ); + }; + + this.relationshipTypeTokenHolder = relationshipTypeTokenHolder; + this.propertyKeyTokenHolder = propertyKeyTokenHolder; + this.labelTokenHolder = labelTokenHolder; + } + + private ReplicatedIdGeneratorFactory createIdGeneratorFactory( + FileSystemAbstraction fileSystem, + final ReplicatedIdRangeAcquirer idRangeAcquirer, + final LogProvider logProvider, + IdTypeConfigurationProvider idTypeConfigurationProvider ) + { + return new ReplicatedIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider, + idTypeConfigurationProvider ); + } + + private Locks createLockManager( final Config config, final LogService logging, final Replicator replicator, + CoreMember myself, LeaderLocator leaderLocator, long leaderLockTokenTimeout, + ReplicatedLockTokenStateMachine lockTokenStateMachine ) + { + Locks localLocks = CommunityEditionModule.createLockManager( config, logging ); + + return new LeaderOnlyLockManager( myself, replicator, leaderLocator, localLocks, leaderLockTokenTimeout, + lockTokenStateMachine ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index d61a732a35a9c..badc0bec7a8da 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -73,7 +73,7 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log private long lastFlushed = NOTHING; public CoreState( - RaftLog raftLog, + CoreStateMachines coreStateMachines, RaftLog raftLog, int maxBatchSize, int flushEvery, Supplier dbHealth, @@ -84,9 +84,10 @@ public CoreState( CoreServerSelectionStrategy someoneElse, CoreStateApplier applier, CoreStateDownloader downloader, - InFlightMap inFlightMap, + InFlightMap inFlightMap, Monitors monitors ) { + this.coreStateMachines = coreStateMachines; this.raftLog = raftLog; this.lastFlushedStorage = lastFlushedStorage; this.flushEvery = flushEvery; @@ -102,17 +103,6 @@ public CoreState( this.batcher = new OperationBatcher( maxBatchSize ); } - synchronized void setStateMachine( CoreStateMachines coreStateMachines ) - { - this.coreStateMachines = coreStateMachines; - } - - public void skip( long lastApplied ) - { - this.lastApplied = this.lastFlushed = lastApplied; - log.info( format( "Skipping lastApplied index forward to %d", lastApplied ) ); - } - @Override public synchronized void notifyCommitted( long commitIndex ) { @@ -332,6 +322,21 @@ public synchronized CoreSnapshot snapshot() throws IOException, InterruptedExcep synchronized void installSnapshot( CoreSnapshot coreSnapshot ) { coreStateMachines.installSnapshots( coreSnapshot ); + long snapshotPrevIndex = coreSnapshot.prevIndex(); + try + { + if ( snapshotPrevIndex > 1 ) + { + raftLog.skip( snapshotPrevIndex, coreSnapshot.prevTerm() ); + } + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + this.lastApplied = this.lastFlushed = snapshotPrevIndex; + log.info( format( "Skipping lastApplied index forward to %d", snapshotPrevIndex ) ); + sessionState = coreSnapshot.get( CoreStateType.SESSION_TRACKER ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java index 03b5e08880a78..4b7c4174b731a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java @@ -50,9 +50,7 @@ public class CoreStateMachines private final ReplicatedLockTokenStateMachine replicatedLockTokenStateMachine; private final ReplicatedIdAllocationStateMachine idAllocationStateMachine; - private final CoreState coreState; private final RecoverTransactionLogState txLogState; - private final RaftLog raftLog; private final LocalDatabase localDatabase; private final CommandDispatcher currentBatch = new StateMachineCommandDispatcher(); @@ -65,9 +63,7 @@ public CoreStateMachines( ReplicatedTokenStateMachine propertyKeyTokenStateMachine, ReplicatedLockTokenStateMachine replicatedLockTokenStateMachine, ReplicatedIdAllocationStateMachine idAllocationStateMachine, - CoreState coreState, RecoverTransactionLogState txLogState, - RaftLog raftLog, LocalDatabase localDatabase ) { this.replicatedTxStateMachine = replicatedTxStateMachine; @@ -76,9 +72,7 @@ public CoreStateMachines( this.propertyKeyTokenStateMachine = propertyKeyTokenStateMachine; this.replicatedLockTokenStateMachine = replicatedLockTokenStateMachine; this.idAllocationStateMachine = idAllocationStateMachine; - this.coreState = coreState; this.txLogState = txLogState; - this.raftLog = raftLog; this.localDatabase = localDatabase; } @@ -120,20 +114,6 @@ void installSnapshots( CoreSnapshot coreSnapshot ) idAllocationStateMachine.installSnapshot( coreSnapshot.get( CoreStateType.ID_ALLOCATION ) ); replicatedLockTokenStateMachine.installSnapshot( coreSnapshot.get( CoreStateType.LOCK_TOKEN ) ); // transactions and tokens live in the store - - long snapshotPrevIndex = coreSnapshot.prevIndex(); - try - { - if ( snapshotPrevIndex > 1 ) - { - raftLog.skip( snapshotPrevIndex, coreSnapshot.prevTerm() ); - } - } - catch ( IOException e ) - { - throw new RuntimeException( e ); - } - coreState.skip( snapshotPrevIndex ); } public void refresh( TransactionRepresentationCommitProcess localCommit ) @@ -146,8 +126,6 @@ public void refresh( TransactionRepresentationCommitProcess localCommit ) labelTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex ); relationshipTypeTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex ); propertyKeyTokenStateMachine.installCommitProcess( localCommit, lastAppliedIndex ); - - coreState.setStateMachine( this ); } private class StateMachineCommandDispatcher implements CommandDispatcher diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index b4c21b6867477..b52e841fef4b7 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -27,6 +27,7 @@ import java.util.UUID; import java.util.function.Supplier; +import org.neo4j.coreedge.CoreStateMachinesModule; import org.neo4j.coreedge.catchup.CatchupServer; import org.neo4j.coreedge.catchup.CheckpointerSupplier; import org.neo4j.coreedge.catchup.DataSourceSupplier; @@ -44,7 +45,6 @@ import org.neo4j.coreedge.raft.ConsensusModule; import org.neo4j.coreedge.raft.ContinuousJob; import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService; -import org.neo4j.coreedge.raft.LeaderLocator; import org.neo4j.coreedge.raft.RaftInstance; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftServer; @@ -60,39 +60,23 @@ import org.neo4j.coreedge.raft.net.RaftOutbound; import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; import org.neo4j.coreedge.raft.replication.RaftReplicator; -import org.neo4j.coreedge.raft.replication.Replicator; -import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine; -import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory; -import org.neo4j.coreedge.raft.replication.id.ReplicatedIdRangeAcquirer; import org.neo4j.coreedge.raft.replication.session.GlobalSession; import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; -import org.neo4j.coreedge.raft.replication.token.ReplicatedLabelTokenHolder; -import org.neo4j.coreedge.raft.replication.token.ReplicatedPropertyKeyTokenHolder; -import org.neo4j.coreedge.raft.replication.token.ReplicatedRelationshipTypeTokenHolder; -import org.neo4j.coreedge.raft.replication.token.ReplicatedTokenStateMachine; -import org.neo4j.coreedge.raft.replication.token.TokenRegistry; import org.neo4j.coreedge.raft.replication.tx.ExponentialBackoffStrategy; -import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionCommitProcess; -import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionStateMachine; import org.neo4j.coreedge.raft.roles.Role; import org.neo4j.coreedge.raft.state.CoreState; import org.neo4j.coreedge.raft.state.CoreStateApplier; import org.neo4j.coreedge.raft.state.CoreStateDownloader; -import org.neo4j.coreedge.raft.state.CoreStateMachines; import org.neo4j.coreedge.raft.state.DurableStateStorage; import org.neo4j.coreedge.raft.state.LongIndexMarshal; import org.neo4j.coreedge.raft.state.StateStorage; -import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState; import org.neo4j.coreedge.server.CoreEdgeClusterSettings; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.CoreMember.CoreMemberMarshal; import org.neo4j.coreedge.server.ListenSocketAddress; import org.neo4j.coreedge.server.NonBlockingChannels; import org.neo4j.coreedge.server.SenderService; -import org.neo4j.coreedge.server.core.locks.LeaderOnlyLockManager; -import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenState; -import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenStateMachine; import org.neo4j.coreedge.server.logging.BetterMessageLogger; import org.neo4j.coreedge.server.logging.MessageLogger; import org.neo4j.coreedge.server.logging.NullMessageLogger; @@ -108,24 +92,17 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.api.SchemaWriteGuard; import org.neo4j.kernel.impl.api.TransactionHeaderInformation; -import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; import org.neo4j.kernel.impl.api.index.RemoveOrphanConstraintIndexesOnStartup; -import org.neo4j.kernel.impl.core.RelationshipTypeToken; import org.neo4j.kernel.impl.coreapi.CoreAPIAvailabilityGuard; import org.neo4j.kernel.impl.enterprise.EnterpriseConstraintSemantics; import org.neo4j.kernel.impl.enterprise.StandardSessionTracker; -import org.neo4j.kernel.impl.enterprise.id.EnterpriseIdTypeConfigurationProvider; import org.neo4j.kernel.impl.enterprise.transaction.log.checkpoint.ConfigurableIOLimiter; -import org.neo4j.kernel.impl.factory.CommunityEditionModule; import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.factory.EditionModule; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.factory.PlatformModule; -import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.proc.Procedures; -import org.neo4j.kernel.impl.store.id.configuration.IdTypeConfigurationProvider; -import org.neo4j.kernel.impl.store.stats.IdBasedStoreEntityCounters; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; @@ -139,7 +116,6 @@ import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.LifecycleStatus; import org.neo4j.logging.LogProvider; -import org.neo4j.storageengine.api.Token; import org.neo4j.udc.UsageData; import static java.time.Clock.systemUTC; @@ -159,6 +135,7 @@ public class EnterpriseCoreEditionModule extends EditionModule private final ConsensusModule consensusModule; private final CoreTopologyService discoveryService; private final LogProvider logProvider; + private final CoreStateMachinesModule coreStateMachinesModule; public enum RaftLogImplementation { @@ -199,10 +176,8 @@ public void registerProcedures( Procedures procedures ) final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); CoreMember myself; - StateStorage idAllocationState; StateStorage lastFlushedStorage; StateStorage sessionTrackerStorage; - StateStorage lockTokenState; try { @@ -228,17 +203,6 @@ public void registerProcedures( Procedures procedures ) config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ), databaseHealthSupplier, logProvider ) ); - lockTokenState = life.add( - new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "lock-token-state" ), - "lock-token", new ReplicatedLockTokenState.Marshal( new CoreMemberMarshal() ), - config.get( CoreEdgeClusterSettings.replicated_lock_token_state_size ), - databaseHealthSupplier, logProvider ) ); - - idAllocationState = life.add( - new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "id-allocation-state" ), - "id-allocation", new IdAllocationState.Marshal(), - config.get( CoreEdgeClusterSettings.id_alloc_state_size ), databaseHealthSupplier, - logProvider ) ); } catch ( IOException e ) { @@ -303,9 +267,6 @@ public void registerProcedures( Procedures procedures ) Outbound loggingOutbound = new LoggingOutbound<>( raftOutbound, myself, messageLogger ); - RaftServer raftServer; - CoreState coreState; - CoreStateApplier coreStateApplier = new CoreStateApplier( logProvider ); CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher, coreToCoreClient, logProvider ); @@ -318,110 +279,53 @@ public void registerProcedures( Procedures procedures ) new ConsensusModule( myself, platformModule, raftOutbound, clusterStateDirectory, raftTimeoutService, discoveryService, lastFlushedStorage.getInitialState() ); - coreState = dependencies.satisfyDependency( new CoreState( - consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), - config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), - databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, - sessionTrackerStorage, someoneElse, coreStateApplier, downloader, inFlightMap, platformModule.monitors ) ); - - raftServer = new RaftServer( marshal, raftListenAddress, logProvider ); + RaftServer raftServer = new RaftServer( marshal, raftListenAddress, logProvider ); LoggingInbound loggingRaftInbound = new LoggingInbound<>( raftServer, messageLogger, myself ); int queueSize = config.get( CoreEdgeClusterSettings.raft_in_queue_size ); int maxBatch = config.get( CoreEdgeClusterSettings.raft_in_queue_max_batch ); - BatchingMessageHandler batchingMessageHandler = - new BatchingMessageHandler( consensusModule.raftInstance(), logProvider, queueSize, maxBatch, localDatabase, coreState ); - - life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ), - batchingMessageHandler ) ); - - loggingRaftInbound.registerHandler( batchingMessageHandler ); dependencies.satisfyDependency( consensusModule.raftInstance() ); - life.add( new PruningScheduler( coreState, platformModule.jobScheduler, - config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ) ) ); - RaftReplicator replicator = new RaftReplicator( consensusModule.raftInstance(), myself, loggingOutbound, sessionPool, progressTracker, new ExponentialBackoffStrategy( 10, SECONDS ) ); - ReplicatedIdAllocationStateMachine idAllocationStateMachine = - new ReplicatedIdAllocationStateMachine( idAllocationState ); - - int allocationChunk = 1024; // TODO: AllocationChunk should be configurable and per type. - ReplicatedIdRangeAcquirer idRangeAcquirer = - new ReplicatedIdRangeAcquirer( replicator, idAllocationStateMachine, allocationChunk, myself, - logProvider ); - - long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); - MembershipWaiter membershipWaiter = - new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, batchingMessageHandler, logProvider ); - - idTypeConfigurationProvider = new EnterpriseIdTypeConfigurationProvider( config ); - ReplicatedIdGeneratorFactory replicatedIdGeneratorFactory = - createIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider, idTypeConfigurationProvider ); - - this.idGeneratorFactory = dependencies.satisfyDependency( replicatedIdGeneratorFactory ); - dependencies.satisfyDependency( new IdBasedStoreEntityCounters( this.idGeneratorFactory ) ); - - Long tokenCreationTimeout = config.get( CoreEdgeClusterSettings.token_creation_timeout ); - - TokenRegistry relationshipTypeTokenRegistry = new TokenRegistry<>( "RelationshipType" ); - ReplicatedRelationshipTypeTokenHolder relationshipTypeTokenHolder = - new ReplicatedRelationshipTypeTokenHolder( relationshipTypeTokenRegistry, replicator, - this.idGeneratorFactory, dependencies, tokenCreationTimeout ); + coreStateMachinesModule = new CoreStateMachinesModule( myself, platformModule, clusterStateDirectory, + databaseHealthSupplier, config, replicator, consensusModule.raftInstance(), dependencies, localDatabase ); - TokenRegistry propertyKeyTokenRegistry = new TokenRegistry<>( "PropertyKey" ); - ReplicatedPropertyKeyTokenHolder propertyKeyTokenHolder = - new ReplicatedPropertyKeyTokenHolder( propertyKeyTokenRegistry, replicator, this.idGeneratorFactory, - dependencies, tokenCreationTimeout ); + this.idGeneratorFactory = coreStateMachinesModule.idGeneratorFactory; + this.idTypeConfigurationProvider = coreStateMachinesModule.idTypeConfigurationProvider; + this.labelTokenHolder = coreStateMachinesModule.labelTokenHolder; + this.propertyKeyTokenHolder = coreStateMachinesModule.propertyKeyTokenHolder; + this.relationshipTypeTokenHolder = coreStateMachinesModule.relationshipTypeTokenHolder; + this.lockManager = coreStateMachinesModule.lockManager; + this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory; - TokenRegistry labelTokenRegistry = new TokenRegistry<>( "Label" ); - ReplicatedLabelTokenHolder labelTokenHolder = - new ReplicatedLabelTokenHolder( labelTokenRegistry, replicator, this.idGeneratorFactory, dependencies, - tokenCreationTimeout ); - - ReplicatedLockTokenStateMachine replicatedLockTokenStateMachine = - new ReplicatedLockTokenStateMachine( lockTokenState ); - - RecoverTransactionLogState txLogState = new RecoverTransactionLogState( dependencies, logProvider ); - - ReplicatedTokenStateMachine labelTokenStateMachine = - new ReplicatedTokenStateMachine<>( labelTokenRegistry, new Token.Factory(), logProvider ); - - ReplicatedTokenStateMachine propertyKeyTokenStateMachine = - new ReplicatedTokenStateMachine<>( propertyKeyTokenRegistry, new Token.Factory(), logProvider ); - - ReplicatedTokenStateMachine relationshipTypeTokenStateMachine = - new ReplicatedTokenStateMachine<>( relationshipTypeTokenRegistry, new RelationshipTypeToken.Factory(), - logProvider ); + CoreState coreState = dependencies.satisfyDependency( new CoreState( coreStateMachinesModule.coreStateMachines, + consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), + config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), + databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, + sessionTrackerStorage, someoneElse, coreStateApplier, downloader, inFlightMap, platformModule.monitors ) ); - ReplicatedTransactionStateMachine replicatedTxStateMachine = - new ReplicatedTransactionStateMachine( replicatedLockTokenStateMachine, - config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), - logging.getInternalLogProvider() ); + life.add( new PruningScheduler( coreState, platformModule.jobScheduler, + config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ) ) ); - dependencies.satisfyDependencies( replicatedTxStateMachine ); + BatchingMessageHandler batchingMessageHandler = + new BatchingMessageHandler( consensusModule.raftInstance(), logProvider, queueSize, maxBatch, localDatabase, coreState ); - CoreStateMachines coreStateMachines = new CoreStateMachines( replicatedTxStateMachine, labelTokenStateMachine, - relationshipTypeTokenStateMachine, propertyKeyTokenStateMachine, replicatedLockTokenStateMachine, - idAllocationStateMachine, coreState, txLogState, consensusModule.raftLog(), localDatabase ); + long electionTimeout = config.get( CoreEdgeClusterSettings.leader_election_timeout ); + MembershipWaiter membershipWaiter = + new MembershipWaiter( myself, platformModule.jobScheduler, electionTimeout * 4, batchingMessageHandler, logProvider ); - commitProcessFactory = ( appender, applier, ignored ) -> { - TransactionRepresentationCommitProcess localCommit = - new TransactionRepresentationCommitProcess( appender, applier ); - coreStateMachines.refresh( localCommit ); // This gets called when a core-to-core download is performed. - return new ReplicatedTransactionCommitProcess( replicator ); - }; + life.add( new ContinuousJob( platformModule.jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ), + batchingMessageHandler ) ); - this.relationshipTypeTokenHolder = relationshipTypeTokenHolder; - this.propertyKeyTokenHolder = propertyKeyTokenHolder; - this.labelTokenHolder = labelTokenHolder; + loggingRaftInbound.registerHandler( batchingMessageHandler ); dependencies.satisfyDependency( createKernelData( fileSystem, platformModule.pageCache, storeDir, config, graphDatabaseFacade, life ) ); @@ -443,10 +347,6 @@ public void registerProcedures( Procedures procedures ) publishEditionInfo( dependencies.resolveDependency( UsageData.class ), platformModule.databaseInfo, config ); - long leaderLockTokenTimeout = config.get( CoreEdgeClusterSettings.leader_lock_token_timeout ); - Locks lockManager = createLockManager( config, logging, replicator, myself, consensusModule.raftInstance(), leaderLockTokenTimeout, - replicatedLockTokenStateMachine ); - this.lockManager = dependencies.satisfyDependency( lockManager ); CatchupServer catchupServer = new CatchupServer( logProvider, localDatabase, @@ -458,7 +358,7 @@ public void registerProcedures( Procedures procedures ) long joinCatchupTimeout = config.get( CoreEdgeClusterSettings.join_catch_up_timeout ); life.add( CoreServerStartupProcess.createLifeSupport( - platformModule.dataSourceManager, replicatedIdGeneratorFactory, consensusModule.raftInstance(), coreState, raftServer, + platformModule.dataSourceManager, coreStateMachinesModule.replicatedIdGeneratorFactory, consensusModule.raftInstance(), coreState, raftServer, catchupServer, raftTimeoutService, membershipWaiter, joinCatchupTimeout, logProvider ) ); dependencies.satisfyDependency( createSessionTracker() ); @@ -511,23 +411,6 @@ private KernelData createKernelData( FileSystemAbstraction fileSystem, PageCache return life.add( kernelData ); } - private ReplicatedIdGeneratorFactory createIdGeneratorFactory( FileSystemAbstraction fileSystem, - final ReplicatedIdRangeAcquirer idRangeAcquirer, final LogProvider logProvider, - IdTypeConfigurationProvider idTypeConfigurationProvider ) - { - return new ReplicatedIdGeneratorFactory( fileSystem, idRangeAcquirer, logProvider, idTypeConfigurationProvider ); - } - - private Locks createLockManager( final Config config, final LogService logging, final Replicator replicator, - CoreMember myself, LeaderLocator leaderLocator, long leaderLockTokenTimeout, - ReplicatedLockTokenStateMachine lockTokenStateMachine ) - { - Locks localLocks = CommunityEditionModule.createLockManager( config, logging ); - - return new LeaderOnlyLockManager( myself, replicator, leaderLocator, localLocks, leaderLockTokenTimeout, - lockTokenStateMachine ); - } - private TransactionHeaderInformationFactory createHeaderInformationFactory() { return () -> new TransactionHeaderInformation( -1, -1, new byte[0] ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateMachinesTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateMachinesTest.java index 1deb102e5e40b..67de8025be223 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateMachinesTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateMachinesTest.java @@ -166,8 +166,8 @@ public void shouldReturnLastAppliedOfAllStateMachines() throws Exception private final MonitoredRaftLog txLogState = mock( MonitoredRaftLog.class); private final CoreStateMachines coreStateMachines = new CoreStateMachines( txSM, labelTokenSM, - relationshipTypeTokenSM, propertyKeyTokenSM, lockTokenSM, idAllocationSM, coreState, - recoverTransactionLogState, txLogState, mock( LocalDatabase.class ) ); + relationshipTypeTokenSM, propertyKeyTokenSM, lockTokenSM, idAllocationSM, + recoverTransactionLogState, mock( LocalDatabase.class ) ); private final ReplicatedTransaction replicatedTransaction = mock( ReplicatedTransaction.class ); private final ReplicatedIdAllocationRequest iAllocationRequest = mock( ReplicatedIdAllocationRequest.class ); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java index d23f34a3fa4d8..5496095219581 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java @@ -80,14 +80,14 @@ public class CoreStateTest private final CoreStateApplier applier = new CoreStateApplier( NullLogProvider.getInstance() ); private InFlightMap inFlightMap = spy( new InFlightMap<>() ); private final Monitors monitors = new Monitors(); - private final CoreState coreState = new CoreState( raftLog, batchSize, flushEvery, () -> dbHealth, + private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); + private final CoreState coreState = new CoreState( coreStateMachines, raftLog, batchSize, flushEvery, () -> dbHealth, NullLogProvider.getInstance(), new ProgressTrackerImpl( globalSession ), lastFlushedStorage, sessionStorage, mock( CoreServerSelectionStrategy.class), applier, mock( CoreStateDownloader.class ), inFlightMap, monitors ); private ReplicatedTransaction nullTx = new ReplicatedTransaction( null ); private final CommandDispatcher commandDispatcher = mock( CommandDispatcher.class ); - private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); { when( coreStateMachines.commandDispatcher() ).thenReturn( commandDispatcher ); @@ -113,7 +113,6 @@ public void shouldApplyCommittedCommand() throws Throwable // given RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class ); monitors.addMonitorListener( listener ); - coreState.setStateMachine( coreStateMachines ); coreState.start(); InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); @@ -139,7 +138,6 @@ public void shouldApplyCommittedCommand() throws Throwable public void shouldNotApplyUncommittedCommands() throws Throwable { // given - coreState.setStateMachine( coreStateMachines ); coreState.start(); // when @@ -156,7 +154,6 @@ public void shouldNotApplyUncommittedCommands() throws Throwable public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Throwable { // given - coreState.setStateMachine( coreStateMachines ); coreState.start(); // when @@ -177,7 +174,6 @@ public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex public void duplicatesShouldBeIgnoredButStillIncreaseCommandIndex() throws Exception { // given - coreState.setStateMachine( coreStateMachines ); coreState.start(); // when @@ -204,7 +200,6 @@ public void duplicatesShouldBeIgnoredButStillIncreaseCommandIndex() throws Excep public void outOfOrderDuplicatesShouldBeIgnoredButStillIncreaseCommandIndex() throws Exception { // given - coreState.setStateMachine( coreStateMachines ); coreState.start(); // when @@ -240,7 +235,6 @@ public void outOfOrderDuplicatesShouldBeIgnoredButStillIncreaseCommandIndex() th public void shouldPeriodicallyFlushState() throws Throwable { // given - coreState.setStateMachine( coreStateMachines ); coreState.start(); int interactions = flushEvery * 5; @@ -264,7 +258,6 @@ public void shouldPanicIfUnableToApply() throws Throwable // given doThrow( IllegalStateException.class ).when( commandDispatcher ) .dispatch( any( ReplicatedTransaction.class ), anyLong(), anyCallback() ); - coreState.setStateMachine( coreStateMachines ); coreState.start(); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); @@ -284,7 +277,6 @@ public void shouldApplyToLogFromCache() throws Throwable //given n things to apply in the cache, check that they are actually applied. // given - coreState.setStateMachine( coreStateMachines ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); @@ -302,7 +294,6 @@ public void shouldApplyToLogFromCache() throws Throwable public void cacheEntryShouldBePurgedWhenApplied() throws Throwable { //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" - coreState.setStateMachine( coreStateMachines ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); @@ -324,7 +315,6 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable { // if the cache does not contain all things to be applied, make sure we fall back to the log // should only happen in recovery, otherwise this is probably a bug. - coreState.setStateMachine( coreStateMachines ); coreState.start(); //given cache with missing entry @@ -359,8 +349,6 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable public void shouldFailWhenCacheAndLogMiss() throws Throwable { //When an entry is not in the log, we must fail. - - coreState.setStateMachine( coreStateMachines ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) );