Skip to content

Commit

Permalink
Database local lock managers
Browse files Browse the repository at this point in the history
Update lock managers from to be a database local instead of global.
  • Loading branch information
MishaDemianenko committed Aug 21, 2018
1 parent 0ecea77 commit 2c254b7
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 40 deletions.
Expand Up @@ -43,6 +43,7 @@
import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.StatementLocksFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.proc.Procedures;
Expand Down Expand Up @@ -82,6 +83,8 @@ public interface DatabaseCreationContext

TokenHolders getTokenHolders();

Locks getLocks();

StatementLocksFactory getStatementLocksFactory();

SchemaWriteGuard getSchemaWriteGuard();
Expand Down
Expand Up @@ -75,6 +75,7 @@
import org.neo4j.kernel.impl.index.ExplicitIndexStore;
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ReentrantLockService;
import org.neo4j.kernel.impl.locking.StatementLocksFactory;
import org.neo4j.kernel.impl.logging.LogService;
Expand Down Expand Up @@ -196,19 +197,20 @@ public class NeoStoreDataSource extends LifecycleAdapter
private final ExplicitIndexProvider explicitIndexProvider;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final CollectionsFactorySupplier collectionsFactorySupplier;

private Dependencies dataSourceDependencies;
private LifeSupport life;
private IndexProviderMap indexProviderMap;
private final Locks locks;
private final String databaseName;
private DatabaseLayout databaseLayout;
private boolean readOnly;
private final DatabaseLayout databaseLayout;
private final boolean readOnly;
private final IdController idController;
private final DatabaseInfo databaseInfo;
private final RecoveryCleanupWorkCollector recoveryCleanupWorkCollector;
private final VersionContextSupplier versionContextSupplier;
private final AccessCapability accessCapability;

private Dependencies dataSourceDependencies;
private LifeSupport life;
private IndexProviderMap indexProviderMap;

private StorageEngine storageEngine;
private QueryExecutionEngine executionEngine;
private NeoStoreTransactionLogModule transactionLogModule;
Expand All @@ -235,6 +237,7 @@ public NeoStoreDataSource( DatabaseCreationContext context )
this.storeCopyCheckPointMutex = context.getStoreCopyCheckPointMutex();
this.logProvider = context.getLogService().getInternalLogProvider();
this.tokenHolders = context.getTokenHolders();
this.locks = context.getLocks();
this.statementLocksFactory = context.getStatementLocksFactory();
this.schemaWriteGuard = context.getSchemaWriteGuard();
this.transactionEventHandlers = context.getTransactionEventHandlers();
Expand Down Expand Up @@ -287,6 +290,7 @@ public void start() throws IOException
dataSourceDependencies.satisfyDependency( databaseHealth );
dataSourceDependencies.satisfyDependency( storeCopyCheckPointMutex );
dataSourceDependencies.satisfyDependency( transactionMonitor );
dataSourceDependencies.satisfyDependency( locks );

life = new LifeSupport();
dataSourceDependencies.satisfyDependency( explicitIndexProvider );
Expand Down
Expand Up @@ -344,6 +344,11 @@ public TokenHolders getTokenHolders()
}

@Override
public Locks getLocks()
{
return mock( Locks.class );
}

public StatementLocksFactory getStatementLocksFactory()
{
return statementLocksFactory;
Expand Down
Expand Up @@ -97,8 +97,8 @@ public CommunityEditionModule( PlatformModule platformModule )
dependencies.satisfyDependency(
SslPolicyLoader.create( config, logging.getInternalLogProvider() ) ); // for bolt and web server

lockManager = dependencies.satisfyDependency( createLockManager( config, platformModule.clock, logging ) );
statementLocksFactory = createStatementLocksFactory( lockManager, config, logging );
locksSupplier = () -> createLockManager( config, platformModule.clock, logging );
statementLocksFactoryProvider = locks -> createStatementLocksFactory( locks, config, logging );

idTypeConfigurationProvider = createIdTypeConfigurationProvider( config );
eligibleForIdReuse = IdReuseEligibility.ALWAYS;
Expand Down
Expand Up @@ -92,9 +92,9 @@ public abstract class EditionModule

public Supplier<TokenHolders> tokenHoldersSupplier;

public Locks lockManager;
public Supplier<Locks> locksSupplier;

public StatementLocksFactory statementLocksFactory;
public Function<Locks, StatementLocksFactory> statementLocksFactoryProvider;

public CommitProcessFactory commitProcessFactory;

Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.StatementLocksFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.proc.Procedures;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class ModularDatabaseCreationContext implements DatabaseCreationContext
private final TokenNameLookup tokenNameLookup;
private final DependencyResolver globalDependencies;
private final TokenHolders tokenHolders;
private final Locks locks;
private final StatementLocksFactory statementLocksFactory;
private final SchemaWriteGuard schemaWriteGuard;
private final TransactionEventHandlers transactionEventHandlers;
Expand Down Expand Up @@ -123,7 +125,8 @@ public class ModularDatabaseCreationContext implements DatabaseCreationContext
this.globalDependencies = platformModule.dependencies;
this.tokenHolders = tokenHolders;
this.tokenNameLookup = new NonTransactionalTokenNameLookup( tokenHolders );
this.statementLocksFactory = editionModule.statementLocksFactory;
this.locks = editionModule.locksSupplier.get();
this.statementLocksFactory = editionModule.statementLocksFactoryProvider.apply( locks );
this.schemaWriteGuard = editionModule.schemaWriteGuard;
this.transactionEventHandlers = new TransactionEventHandlers( facade );
this.monitors = new Monitors( platformModule.monitors );
Expand Down Expand Up @@ -211,6 +214,12 @@ public TokenHolders getTokenHolders()
return tokenHolders;
}

@Override
public Locks getLocks()
{
return locks;
}

@Override
public StatementLocksFactory getStatementLocksFactory()
{
Expand Down
Expand Up @@ -310,7 +310,7 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,

// TODO: this is broken, coreStateMachinesModule.tokenHolders should be supplier, somehow...
this.tokenHoldersSupplier = () -> coreStateMachinesModule.tokenHolders;
this.lockManager = coreStateMachinesModule.lockManager;
this.locksSupplier = coreStateMachinesModule.locksSupplier;
this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory;
this.accessCapability = new LeaderCanWrite( consensusModule.raftMachine() );

Expand All @@ -334,8 +334,6 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,

editionInvariants( platformModule, dependencies, config, logging, life );

dependencies.satisfyDependency( lockManager );

life.add( coreServerModule.membershipWaiterLifecycle );
}

Expand Down Expand Up @@ -417,7 +415,7 @@ private static MessageLogger<MemberId> createMessageLogger( Config config, LifeS
private void editionInvariants( PlatformModule platformModule, Dependencies dependencies, Config config,
LogService logging, LifeSupport life )
{
statementLocksFactory = new StatementLocksFactorySelector( lockManager, config, logging ).select();
statementLocksFactoryProvider = locks -> new StatementLocksFactorySelector( locks, config, logging ).select();

dependencies.satisfyDependency(
createKernelData( platformModule.fileSystem, platformModule.pageCache, platformModule.storeLayout.storeDirectory(),
Expand Down
Expand Up @@ -103,7 +103,7 @@ public class CoreStateMachinesModule
public final IdGeneratorFactory idGeneratorFactory;
public final IdTypeConfigurationProvider idTypeConfigurationProvider;
public final TokenHolders tokenHolders;
public final Locks lockManager;
public final Supplier<Locks> locksSupplier;
public final CommitProcessFactory commitProcessFactory;

public final CoreStateMachines coreStateMachines;
Expand Down Expand Up @@ -184,7 +184,7 @@ public CoreStateMachinesModule( MemberId myself, PlatformModule platformModule,

dependencies.satisfyDependencies( replicatedTxStateMachine );

lockManager = createLockManager( config, platformModule.clock, logging, replicator, myself, raftMachine,
locksSupplier = () -> createLockManager( config, platformModule.clock, logging, replicator, myself, raftMachine,
replicatedLockTokenStateMachine );

RecoverConsensusLogIndex consensusLogIndexRecovery = new RecoverConsensusLogIndex( localDatabase, logProvider );
Expand Down
Expand Up @@ -176,9 +176,9 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule,
platformModule.jobScheduler, config, fileWatcherFileNameFilter() );
dependencies.satisfyDependencies( watcherServiceFactory );

lockManager = dependencies.satisfyDependency( new ReadReplicaLockManager() );

statementLocksFactory = new StatementLocksFactorySelector( lockManager, config, logging ).select();
ReadReplicaLockManager emptyLockManager = new ReadReplicaLockManager();
locksSupplier = () -> emptyLockManager;
statementLocksFactoryProvider = locks -> new StatementLocksFactorySelector( locks, config, logging ).select();

idTypeConfigurationProvider = new EnterpriseIdTypeConfigurationProvider( config );
idGeneratorFactory = dependencies.satisfyDependency( new DefaultIdGeneratorFactory( fileSystem, idTypeConfigurationProvider ) );
Expand Down
Expand Up @@ -26,13 +26,13 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId;
import org.neo4j.cluster.member.ClusterMemberAvailability;
import org.neo4j.com.ServerUtil;
import org.neo4j.function.Factory;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.DelegateInvocationHandler;
Expand All @@ -42,6 +42,7 @@
import org.neo4j.kernel.ha.com.master.MasterServer;
import org.neo4j.kernel.ha.com.master.SlaveFactory;
import org.neo4j.kernel.ha.id.HaIdGeneratorFactory;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.Log;
Expand All @@ -50,7 +51,7 @@

public class SwitchToMaster implements AutoCloseable
{
Factory<ConversationManager> conversationManagerFactory;
Function<Locks, ConversationManager> conversationManagerFactory;
BiFunction<ConversationManager, LifeSupport, Master> masterFactory;
BiFunction<Master, ConversationManager, MasterServer> masterServerFactory;
private Log userLog;
Expand All @@ -63,7 +64,7 @@ public class SwitchToMaster implements AutoCloseable

public SwitchToMaster( LogService logService,
HaIdGeneratorFactory idGeneratorFactory, Config config, Supplier<SlaveFactory> slaveFactorySupplier,
Factory<ConversationManager> conversationManagerFactory,
Function<Locks, ConversationManager> conversationManagerFactory,
BiFunction<ConversationManager, LifeSupport, Master> masterFactory,
BiFunction<Master, ConversationManager, MasterServer> masterServerFactory,
DelegateInvocationHandler<Master> masterDelegateHandler, ClusterMemberAvailability clusterMemberAvailability,
Expand Down Expand Up @@ -104,10 +105,10 @@ public URI switchToMaster( LifeSupport haCommunicationLife, URI me )
// the switch until then.

idGeneratorFactory.switchToMaster();
NeoStoreDataSource neoStoreXaDataSource = dataSourceSupplier.get();
neoStoreXaDataSource.afterModeSwitch();
NeoStoreDataSource dataSource = dataSourceSupplier.get();
dataSource.afterModeSwitch();

ConversationManager conversationManager = conversationManagerFactory.newInstance();
ConversationManager conversationManager = conversationManagerFactory.apply( dataSource.getDependencyResolver().resolveDependency( Locks.class ) );
Master master = masterFactory.apply( conversationManager, haCommunicationLife );

MasterServer masterServer = masterServerFactory.apply( master, conversationManager );
Expand All @@ -118,10 +119,10 @@ public URI switchToMaster( LifeSupport haCommunicationLife, URI me )
haCommunicationLife.start();

URI masterHaURI = getMasterUri( me, masterServer, config );
clusterMemberAvailability.memberIsAvailable( MASTER, masterHaURI, neoStoreXaDataSource.getStoreId() );
clusterMemberAvailability.memberIsAvailable( MASTER, masterHaURI, dataSource.getStoreId() );
userLog.info( "I am %s, successfully moved to master", myId( config ) );

slaveFactorySupplier.get().setStoreId( neoStoreXaDataSource.getStoreId() );
slaveFactorySupplier.get().setStoreId( dataSource.getStoreId() );

return masterHaURI;
}
Expand Down
Expand Up @@ -437,10 +437,8 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
platformModule.dataSourceManager.getDataSource(),
logging.getInternalLogProvider() );

final Factory<ConversationSPI> conversationSPIFactory =
() -> new DefaultConversationSPI( lockManager, platformModule.jobScheduler );
Factory<ConversationManager> conversationManagerFactory =
() -> new ConversationManager( conversationSPIFactory.newInstance(), config );
Function<Locks,ConversationSPI> conversationSPIFactory = locks -> new DefaultConversationSPI( locks, platformModule.jobScheduler );
Function<Locks,ConversationManager> conversationManagerFactory = locks -> new ConversationManager( conversationSPIFactory.apply( locks ), config );

BiFunction<ConversationManager, LifeSupport, Master> masterFactory = ( conversationManager, life1 ) ->
life1.add( new MasterImpl( masterSPIFactory.newInstance(),
Expand Down Expand Up @@ -496,11 +494,9 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
SslPolicyLoader.create( config, logging.getInternalLogProvider() ) ); // for bolt and web server

// Create HA services
lockManager = dependencies.satisfyDependency(
createLockManager( componentSwitcherContainer, config, masterDelegateInvocationHandler,
requestContextFactory, platformModule.availabilityGuard, platformModule.clock, logging ) );

statementLocksFactory = createStatementLocksFactory( componentSwitcherContainer, config, logging );
locksSupplier = () -> createLockManager( componentSwitcherContainer, config, masterDelegateInvocationHandler,
requestContextFactory, platformModule.availabilityGuard, platformModule.clock, logging );
statementLocksFactoryProvider = locks -> createStatementLocksFactory( locks, componentSwitcherContainer, config, logging );

DelegatingTokenHolder propertyKeyTokenHolder = new DelegatingTokenHolder(
createPropertyKeyCreator( config, componentSwitcherContainer, masterDelegateInvocationHandler, requestContextFactory, kernelProvider ),
Expand Down Expand Up @@ -563,10 +559,10 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
procedures.registerProcedure( EnterpriseBuiltInProcedures.class, true );
}

private StatementLocksFactory createStatementLocksFactory( ComponentSwitcherContainer componentSwitcherContainer,
Config config, LogService logging )
private static StatementLocksFactory createStatementLocksFactory( Locks locks, ComponentSwitcherContainer componentSwitcherContainer, Config config,
LogService logging )
{
StatementLocksFactory configuredStatementLocks = new StatementLocksFactorySelector( lockManager, config, logging ).select();
StatementLocksFactory configuredStatementLocks = new StatementLocksFactorySelector( locks, config, logging ).select();

DelegateInvocationHandler<StatementLocksFactory> locksFactoryDelegate =
new DelegateInvocationHandler<>( StatementLocksFactory.class );
Expand Down

0 comments on commit 2c254b7

Please sign in to comment.