Skip to content

Commit

Permalink
Correct NeoStoreDataSource lookup in HA
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Aug 3, 2018
1 parent 9e5aba4 commit d28dd5a
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 33 deletions.
Expand Up @@ -25,12 +25,13 @@
import org.neo4j.com.Response;
import org.neo4j.com.TransactionStream;
import org.neo4j.com.TransactionStreamResponse;
import org.neo4j.dbms.database.DatabaseManager;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.io.pagecache.tracing.cursor.context.VersionContextSupplier;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
Expand Down Expand Up @@ -267,8 +268,9 @@ public KernelTransactions kernelTransactions()

private DependencyResolver getDatabaseResolver()
{
NeoStoreDataSource dataSource = globalResolver.resolveDependency( NeoStoreDataSource.class );
return dataSource.getDependencyResolver();
DatabaseManager databaseManager = globalResolver.resolveDependency( DatabaseManager.class );
GraphDatabaseFacade facade = databaseManager.getDatabaseFacade( DatabaseManager.DEFAULT_DATABASE_NAME ).get();
return facade.getDependencyResolver();
}

@Override
Expand Down
Expand Up @@ -23,13 +23,14 @@
package org.neo4j.kernel.ha;

import org.neo4j.cluster.InstanceId;
import org.neo4j.dbms.database.DatabaseManager;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.ha.cluster.HighAvailabilityMemberStateMachine;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -84,8 +85,13 @@ public SlaveUpdatePuller createSlaveUpdatePuller()

public UpdatePullingTransactionObligationFulfiller createObligationFulfiller( UpdatePuller updatePuller )
{
return new UpdatePullingTransactionObligationFulfiller( updatePuller, memberStateMachine, serverId,
() -> dependencyResolver.resolveDependency( NeoStoreDataSource.class ).getDependencyResolver().resolveDependency( TransactionIdStore.class ) );
return new UpdatePullingTransactionObligationFulfiller( updatePuller, memberStateMachine, serverId, () ->
{
GraphDatabaseFacade databaseFacade =
this.dependencyResolver.resolveDependency( DatabaseManager.class ).getDatabaseFacade( DatabaseManager.DEFAULT_DATABASE_NAME ).get();
DependencyResolver databaseResolver = databaseFacade.getDependencyResolver();
return databaseResolver.resolveDependency( TransactionIdStore.class );
} );
}

public UpdatePullerScheduler createUpdatePullerScheduler( UpdatePuller updatePuller )
Expand Down
Expand Up @@ -22,8 +22,8 @@
*/
package org.neo4j.kernel.ha.cluster.modeswitch;

import org.neo4j.dbms.database.DatabaseManager;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.MasterTransactionCommitProcess;
import org.neo4j.kernel.ha.SlaveTransactionCommitProcess;
Expand All @@ -32,6 +32,7 @@
import org.neo4j.kernel.ha.transaction.TransactionPropagator;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -66,7 +67,9 @@ protected TransactionCommitProcess getSlaveImpl()
@Override
protected TransactionCommitProcess getMasterImpl()
{
DependencyResolver databaseResolver = this.dependencyResolver.resolveDependency( NeoStoreDataSource.class ).getDependencyResolver();
GraphDatabaseFacade databaseFacade = this.dependencyResolver.resolveDependency( DatabaseManager.class )
.getDatabaseFacade( DatabaseManager.DEFAULT_DATABASE_NAME ).get();
DependencyResolver databaseResolver = databaseFacade.getDependencyResolver();
TransactionCommitProcess commitProcess = new TransactionRepresentationCommitProcess(
databaseResolver.resolveDependency( TransactionAppender.class ),
databaseResolver.resolveDependency( StorageEngine.class ) );
Expand Down
Expand Up @@ -239,14 +239,14 @@ public HighlyAvailableEditionModule( final PlatformModule platformModule )

RequestContextFactory requestContextFactory = dependencies.satisfyDependency( new RequestContextFactory(
serverId.toIntegerIndex(),
() -> resolveDatabaseDependency( dependencies, TransactionIdStore.class ) ) );
() -> resolveDatabaseDependency( platformModule, TransactionIdStore.class ) ) );

final long idReuseSafeZone = config.get( HaSettings.id_reuse_safe_zone_time ).toMillis();
TransactionCommittingResponseUnpacker responseUnpacker = dependencies.satisfyDependency(
new TransactionCommittingResponseUnpacker( dependencies,
config.get( HaSettings.pull_apply_batch_size ), idReuseSafeZone ) );

Supplier<Kernel> kernelProvider = () -> resolveDatabaseDependency( dependencies, Kernel.class );
Supplier<Kernel> kernelProvider = () -> resolveDatabaseDependency( platformModule, Kernel.class );

transactionStartTimeout = config.get( HaSettings.state_switch_timeout ).toMillis();

Expand Down Expand Up @@ -275,7 +275,7 @@ public HighlyAvailableEditionModule( final PlatformModule platformModule )
// TODO There's a cyclical dependency here that should be fixed
final AtomicReference<HighAvailabilityMemberStateMachine> electionProviderRef = new AtomicReference<>();
OnDiskLastTxIdGetter lastTxIdGetter = new OnDiskLastTxIdGetter(
() -> resolveDatabaseDependency( dependencies, TransactionIdStore.class ).getLastCommittedTransactionId() );
() -> resolveDatabaseDependency( platformModule, TransactionIdStore.class ).getLastCommittedTransactionId() );
ElectionCredentialsProvider electionCredentialsProvider = config.get( HaSettings.slave_only ) ?
new NotElectableElectionCredentialsProvider() :
new DefaultElectionCredentialsProvider(
Expand Down Expand Up @@ -391,7 +391,7 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
// but merely provide a way to get access to it. That's why this is a Supplier and will be asked
// later, after the data source module and all that have started.
@SuppressWarnings( {"deprecation", "unchecked"} )
Supplier<LogEntryReader<ReadableClosablePositionAwareChannel>> logEntryReader = () -> resolveDatabaseDependency( dependencies, LogEntryReader.class );
Supplier<LogEntryReader<ReadableClosablePositionAwareChannel>> logEntryReader = () -> resolveDatabaseDependency( platformModule, LogEntryReader.class );

MasterClientResolver masterClientResolver = new MasterClientResolver( logging.getInternalLogProvider(),
responseUnpacker,
Expand Down Expand Up @@ -427,14 +427,14 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
masterClientResolver, updatePullerProxy, pullerFactory, slaveServerFactory, editionIdGeneratorFactory, databaseDirectory );

final Factory<MasterImpl.SPI> masterSPIFactory =
() -> new DefaultMasterImplSPI( resolveDatabaseDependency( dependencies, GraphDatabaseFacade.class ), platformModule.fileSystem,
() -> new DefaultMasterImplSPI( resolveDatabaseDependency( platformModule, GraphDatabaseFacade.class ), platformModule.fileSystem,
platformModule.monitors,
tokenHolders, this.idGeneratorFactory,
resolveDatabaseDependency( dependencies, TransactionCommitProcess.class ),
resolveDatabaseDependency( dependencies, CheckPointer.class ),
resolveDatabaseDependency( dependencies, TransactionIdStore.class ),
resolveDatabaseDependency( dependencies, LogicalTransactionStore.class ),
dependencies.resolveDependency( NeoStoreDataSource.class ),
resolveDatabaseDependency( platformModule, TransactionCommitProcess.class ),
resolveDatabaseDependency( platformModule, CheckPointer.class ),
resolveDatabaseDependency( platformModule, TransactionIdStore.class ),
resolveDatabaseDependency( platformModule, LogicalTransactionStore.class ),
platformModule.dataSourceManager.getDataSource(),
logging.getInternalLogProvider() );

final Factory<ConversationSPI> conversationSPIFactory =
Expand All @@ -450,8 +450,8 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
( master1, conversationManager ) ->
{
TransactionChecksumLookup txChecksumLookup = new TransactionChecksumLookup(
resolveDatabaseDependency( dependencies, TransactionIdStore.class ),
resolveDatabaseDependency( dependencies, LogicalTransactionStore.class ) );
resolveDatabaseDependency( platformModule, TransactionIdStore.class ),
resolveDatabaseDependency( platformModule, LogicalTransactionStore.class ) );

return new MasterServer( master1, logging.getInternalLogProvider(),
masterServerConfig( config ),
Expand All @@ -467,10 +467,10 @@ public void elected( String role, InstanceId instanceId, URI electedMember )
masterFactory,
masterServerFactory,
masterDelegateInvocationHandler, clusterMemberAvailability,
platformModule.dependencies.provideDependency( NeoStoreDataSource.class ) );
platformModule.dataSourceManager::getDataSource );

ComponentSwitcherContainer componentSwitcherContainer = new ComponentSwitcherContainer();
Supplier<StoreId> storeIdSupplier = () -> dependencies.resolveDependency( NeoStoreDataSource.class ).getStoreId();
Supplier<StoreId> storeIdSupplier = () -> platformModule.dataSourceManager.getDataSource().getStoreId();

HighAvailabilityModeSwitcher highAvailabilityModeSwitcher = new HighAvailabilityModeSwitcher(
switchToSlaveInstance, switchToMasterInstance, clusterClient, clusterMemberAvailability, clusterClient,
Expand Down Expand Up @@ -545,7 +545,7 @@ public void elected( String role, InstanceId instanceId, URI electedMember )

connectionTracker = dependencies.satisfyDependency( createConnectionTracker() );

registerRecovery( platformModule.databaseInfo, dependencies, logging );
registerRecovery( platformModule.databaseInfo, dependencies, logging, platformModule );

UsageData usageData = dependencies.resolveDependency( UsageData.class );
publishEditionInfo( usageData, platformModule.databaseInfo, config );
Expand Down Expand Up @@ -607,8 +607,8 @@ private static SwitchToSlave chooseSwitchToSlaveStrategy( PlatformModule platfor
platformModule.kernelExtensionFactories, masterClientResolver,
monitors.newMonitor( SwitchToSlave.Monitor.class ),
monitors.newMonitor( StoreCopyClientMonitor.class ),
dependencies.provideDependency( NeoStoreDataSource.class ),
() -> resolveDatabaseDependency( dependencies, TransactionIdStore.class ),
platformModule.dataSourceManager::getDataSource,
() -> resolveDatabaseDependency( platformModule, TransactionIdStore.class ),
slaveServerFactory, updatePullerProxy, platformModule.pageCache,
monitors, platformModule.transactionMonitor );
case copy_then_branch:
Expand All @@ -619,8 +619,8 @@ private static SwitchToSlave chooseSwitchToSlaveStrategy( PlatformModule platfor
platformModule.kernelExtensionFactories, masterClientResolver,
monitors.newMonitor( SwitchToSlave.Monitor.class ),
monitors.newMonitor( StoreCopyClientMonitor.class ),
dependencies.provideDependency( NeoStoreDataSource.class ),
() -> resolveDatabaseDependency( dependencies, TransactionIdStore.class ),
platformModule.dataSourceManager::getDataSource,
() -> resolveDatabaseDependency( platformModule, TransactionIdStore.class ),
slaveServerFactory, updatePullerProxy, platformModule.pageCache,
monitors, platformModule.transactionMonitor );
default:
Expand Down Expand Up @@ -779,8 +779,8 @@ private static KernelData createKernelData( Config config, DataSourceManager dat
return life.add( new HighlyAvailableKernelData( dataSourceManager, members, databaseInfo, fs, pageCache, storeDir, config ) );
}

private void registerRecovery( final DatabaseInfo databaseInfo, final DependencyResolver dependencyResolver,
final LogService logging )
private void registerRecovery( final DatabaseInfo databaseInfo, final DependencyResolver dependencyResolver, final LogService logging,
PlatformModule platformModule )
{
memberStateMachine.addHighAvailabilityMemberListener( new HighAvailabilityMemberListener.Adapter()
{
Expand Down Expand Up @@ -809,7 +809,8 @@ private void doAfterRecoveryAndStartup()
try
{
DiagnosticsManager diagnosticsManager = dependencyResolver.resolveDependency( DiagnosticsManager.class );
NeoStoreDataSource neoStoreDataSource = dependencyResolver.resolveDependency( NeoStoreDataSource.class );

NeoStoreDataSource neoStoreDataSource = platformModule.dataSourceManager.getDataSource();

diagnosticsManager.prependProvider( new KernelDiagnostics.Versions( databaseInfo, neoStoreDataSource.getStoreId() ) );
neoStoreDataSource.registerDiagnosticsWith( diagnosticsManager );
Expand Down Expand Up @@ -843,7 +844,9 @@ private void doAfterRecoveryAndStartup()

private static void assureLastCommitTimestampInitialized( DependencyResolver globalResolver )
{
DependencyResolver databaseResolver = globalResolver.resolveDependency( NeoStoreDataSource.class ).getDependencyResolver();
GraphDatabaseFacade databaseFacade =
globalResolver.resolveDependency( DatabaseManager.class ).getDatabaseFacade( DatabaseManager.DEFAULT_DATABASE_NAME ).get();
DependencyResolver databaseResolver = databaseFacade.getDependencyResolver();
MetaDataStore metaDataStore = databaseResolver.resolveDependency( MetaDataStore.class );
LogicalTransactionStore txStore = databaseResolver.resolveDependency( LogicalTransactionStore.class );

Expand Down Expand Up @@ -927,8 +930,8 @@ public void setupSecurityModule( PlatformModule platformModule, Procedures proce
EnterpriseEditionModule.setupEnterpriseSecurityModule( this, platformModule, procedures );
}

private static <T> T resolveDatabaseDependency( Dependencies dependencies, Class<T> clazz )
private static <T> T resolveDatabaseDependency( PlatformModule platfrom, Class<T> clazz )
{
return dependencies.resolveDependency( NeoStoreDataSource.class ).getDependencyResolver().resolveDependency( clazz );
return platfrom.dataSourceManager.getDataSource().getDependencyResolver().resolveDependency( clazz );
}
}

0 comments on commit d28dd5a

Please sign in to comment.