Skip to content

Commit

Permalink
NeoStoreDatasource dependencies lifecycle.
Browse files Browse the repository at this point in the history
Simplify possible possible parent-child relationships in dependency
resolver. Now its not possible to setup parent dependency resolver over
supplier.
Use platform dependency resolver as parent of neo store datasource
resolver.
  • Loading branch information
MishaDemianenko committed Jun 21, 2018
1 parent 575525c commit cc73c11
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 202 deletions.
Expand Up @@ -154,6 +154,7 @@
import org.neo4j.storageengine.api.StoreFileMetadata; import org.neo4j.storageengine.api.StoreFileMetadata;
import org.neo4j.time.SystemNanoClock; import org.neo4j.time.SystemNanoClock;
import org.neo4j.util.FeatureToggles; import org.neo4j.util.FeatureToggles;
import org.neo4j.util.VisibleForTesting;


import static org.neo4j.helpers.Exceptions.throwIfUnchecked; import static org.neo4j.helpers.Exceptions.throwIfUnchecked;


Expand Down Expand Up @@ -255,7 +256,7 @@ boolean applicable( DiagnosticsPhase phase )
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex; private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final CollectionsFactorySupplier collectionsFactorySupplier; private final CollectionsFactorySupplier collectionsFactorySupplier;


private Dependencies dependencies; private Dependencies dataSourceDependencies;
private LifeSupport life; private LifeSupport life;
private IndexProviderMap indexProviderMap; private IndexProviderMap indexProviderMap;
private File storeDir; private File storeDir;
Expand Down Expand Up @@ -362,7 +363,7 @@ public void init()
@Override @Override
public void start() throws IOException public void start() throws IOException
{ {
dependencies = new Dependencies(); dataSourceDependencies = new Dependencies( dependencyResolver );
life = new LifeSupport(); life = new LifeSupport();


life.add( recoveryCleanupWorkCollector ); life.add( recoveryCleanupWorkCollector );
Expand All @@ -375,11 +376,11 @@ public void start() throws IOException
indexProviderMap = indexProviderMap =
new DefaultIndexProviderMap( defaultIndexProvider, new DefaultIndexProviderMap( defaultIndexProvider,
indexProviderSelection.lowerPrioritizedCandidates() ); indexProviderSelection.lowerPrioritizedCandidates() );
dependencies.satisfyDependency( indexProviderMap ); dataSourceDependencies.satisfyDependency( indexProviderMap );


IndexConfigStore indexConfigStore = new IndexConfigStore( storeDir, fs ); IndexConfigStore indexConfigStore = new IndexConfigStore( storeDir, fs );
dependencies.satisfyDependency( lockService ); dataSourceDependencies.satisfyDependency( lockService );
dependencies.satisfyDependency( indexConfigStore ); dataSourceDependencies.satisfyDependency( indexConfigStore );
life.add( indexConfigStore ); life.add( indexConfigStore );


life.add( Lifecycles.multiple( indexProviders.values() ) ); life.add( Lifecycles.multiple( indexProviders.values() ) );
Expand All @@ -391,7 +392,7 @@ public void start() throws IOException
.withLogEntryReader( logEntryReader ) .withLogEntryReader( logEntryReader )
.withLogFileMonitor( physicalLogMonitor ) .withLogFileMonitor( physicalLogMonitor )
.withConfig( config ) .withConfig( config )
.withDependencies( dependencies ).build(); .withDependencies( dataSourceDependencies ).build();


LogTailScanner tailScanner = new LogTailScanner( logFiles, logEntryReader, monitors, failOnCorruptedLogFiles ); LogTailScanner tailScanner = new LogTailScanner( logFiles, logEntryReader, monitors, failOnCorruptedLogFiles );
monitors.addMonitorListener( monitors.addMonitorListener(
Expand All @@ -410,11 +411,9 @@ public void start() throws IOException
{ {
DatabaseSchemaState databaseSchemaState = new DatabaseSchemaState( logProvider ); DatabaseSchemaState databaseSchemaState = new DatabaseSchemaState( logProvider );


SynchronizedArrayIdOrderingQueue explicitIndexTransactionOrdering = SynchronizedArrayIdOrderingQueue explicitIndexTransactionOrdering = new SynchronizedArrayIdOrderingQueue( 20 );
new SynchronizedArrayIdOrderingQueue( 20 );


Supplier<KernelTransactionsSnapshot> transactionsSnapshotSupplier = Supplier<KernelTransactionsSnapshot> transactionsSnapshotSupplier = () -> kernelModule.kernelTransactions().get();
() -> kernelModule.kernelTransactions().get();
idController.initialize( transactionsSnapshotSupplier ); idController.initialize( transactionsSnapshotSupplier );


storageEngine = buildStorageEngine( storageEngine = buildStorageEngine(
Expand All @@ -423,14 +422,14 @@ public void start() throws IOException
versionContextSupplier ); versionContextSupplier );
life.add( logFiles ); life.add( logFiles );


TransactionIdStore transactionIdStore = dependencies.resolveDependency( TransactionIdStore.class ); TransactionIdStore transactionIdStore = dataSourceDependencies.resolveDependency( TransactionIdStore.class );


versionContextSupplier.init( transactionIdStore::getLastClosedTransactionId ); versionContextSupplier.init( transactionIdStore::getLastClosedTransactionId );


LogVersionRepository logVersionRepository = dependencies.resolveDependency( LogVersionRepository.class ); LogVersionRepository logVersionRepository = dataSourceDependencies.resolveDependency( LogVersionRepository.class );
NeoStoreTransactionLogModule transactionLogModule = buildTransactionLogs( logFiles, config, logProvider, NeoStoreTransactionLogModule transactionLogModule = buildTransactionLogs( logFiles, config, logProvider,
scheduler, storageEngine, logEntryReader, explicitIndexTransactionOrdering, transactionIdStore ); scheduler, storageEngine, logEntryReader, explicitIndexTransactionOrdering, transactionIdStore );
transactionLogModule.satisfyDependencies( dependencies ); transactionLogModule.satisfyDependencies( dataSourceDependencies );


buildRecovery( fs, buildRecovery( fs,
transactionIdStore, transactionIdStore,
Expand All @@ -442,32 +441,32 @@ public void start() throws IOException
); );


// At the time of writing this comes from the storage engine (IndexStoreView) // At the time of writing this comes from the storage engine (IndexStoreView)
NodePropertyAccessor nodePropertyAccessor = dependencies.resolveDependency( NodePropertyAccessor.class ); NodePropertyAccessor nodePropertyAccessor = dataSourceDependencies.resolveDependency( NodePropertyAccessor.class );


final NeoStoreKernelModule kernelModule = buildKernel( final NeoStoreKernelModule kernelModule = buildKernel(
logFiles, logFiles,
transactionLogModule.transactionAppender(), transactionLogModule.transactionAppender(),
dependencies.resolveDependency( IndexingService.class ), dataSourceDependencies.resolveDependency( IndexingService.class ),
databaseSchemaState, databaseSchemaState,
dependencies.resolveDependency( LabelScanStore.class ), dataSourceDependencies.resolveDependency( LabelScanStore.class ),
storageEngine, storageEngine,
indexConfigStore, indexConfigStore,
transactionIdStore, transactionIdStore,
availabilityGuard, availabilityGuard,
clock, nodePropertyAccessor ); clock, nodePropertyAccessor );


kernelModule.satisfyDependencies( dependencies ); kernelModule.satisfyDependencies( dataSourceDependencies );


// Do these assignments last so that we can ensure no cyclical dependencies exist // Do these assignments last so that we can ensure no cyclical dependencies exist
this.storageEngine = storageEngine; this.storageEngine = storageEngine;
this.transactionLogModule = transactionLogModule; this.transactionLogModule = transactionLogModule;
this.kernelModule = kernelModule; this.kernelModule = kernelModule;


dependencies.satisfyDependency( this ); dataSourceDependencies.satisfyDependency( this );
dependencies.satisfyDependency( databaseSchemaState ); dataSourceDependencies.satisfyDependency( databaseSchemaState );
dependencies.satisfyDependency( logEntryReader ); dataSourceDependencies.satisfyDependency( logEntryReader );
dependencies.satisfyDependency( storageEngine ); dataSourceDependencies.satisfyDependency( storageEngine );
dependencies.satisfyDependency( explicitIndexProviderLookup ); dataSourceDependencies.satisfyDependency( explicitIndexProviderLookup );
} }
catch ( Throwable e ) catch ( Throwable e )
{ {
Expand Down Expand Up @@ -562,7 +561,7 @@ private StorageEngine buildStorageEngine(
// We pretend that the storage engine abstract hides all details within it. Whereas that's mostly // We pretend that the storage engine abstract hides all details within it. Whereas that's mostly
// true it's not entirely true for the time being. As long as we need this call below, which // true it's not entirely true for the time being. As long as we need this call below, which
// makes available one or more internal things to the outside world, there are leaks to plug. // makes available one or more internal things to the outside world, there are leaks to plug.
storageEngine.satisfyDependencies( dependencies ); storageEngine.satisfyDependencies( dataSourceDependencies );


return life.add( storageEngine ); return life.add( storageEngine );
} }
Expand Down Expand Up @@ -649,10 +648,8 @@ private NeoStoreKernelModule buildKernel( LogFiles logFiles, TransactionAppender
ExplicitIndexStore explicitIndexStore = new ExplicitIndexStore( config, ExplicitIndexStore explicitIndexStore = new ExplicitIndexStore( config,
indexConfigStore, kernelProvider, explicitIndexProviderLookup ); indexConfigStore, kernelProvider, explicitIndexProviderLookup );


StatementOperationParts statementOperationParts = dependencies.satisfyDependency( StatementOperationParts statementOperationParts = dataSourceDependencies.satisfyDependency(
buildStatementOperations( buildStatementOperations( cpuClockRef, heapAllocationRef ) );
cpuClockRef,
heapAllocationRef ) );


TransactionHooks hooks = new TransactionHooks(); TransactionHooks hooks = new TransactionHooks();


Expand All @@ -674,7 +671,7 @@ private NeoStoreKernelModule buildKernel( LogFiles logFiles, TransactionAppender


final NeoStoreFileListing fileListing = new NeoStoreFileListing( storeDir, logFiles, labelScanStore, final NeoStoreFileListing fileListing = new NeoStoreFileListing( storeDir, logFiles, labelScanStore,
indexingService, explicitIndexProviderLookup, storageEngine ); indexingService, explicitIndexProviderLookup, storageEngine );
dependencies.satisfyDependency( fileListing ); dataSourceDependencies.satisfyDependency( fileListing );


return new NeoStoreKernelModule( transactionCommitProcess, kernel, kernelTransactions, fileListing ); return new NeoStoreKernelModule( transactionCommitProcess, kernel, kernelTransactions, fileListing );
} }
Expand Down Expand Up @@ -721,7 +718,7 @@ private void buildTransactionMonitor( KernelTransactions kernelTransactions, Clo
{ {
KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor = KernelTransactionTimeoutMonitor kernelTransactionTimeoutMonitor =
new KernelTransactionTimeoutMonitor( kernelTransactions, clock, logService ); new KernelTransactionTimeoutMonitor( kernelTransactions, clock, logService );
dependencies.satisfyDependency( kernelTransactionTimeoutMonitor ); dataSourceDependencies.satisfyDependency( kernelTransactionTimeoutMonitor );
KernelTransactionMonitorScheduler transactionMonitorScheduler = KernelTransactionMonitorScheduler transactionMonitorScheduler =
new KernelTransactionMonitorScheduler( kernelTransactionTimeoutMonitor, scheduler, new KernelTransactionMonitorScheduler( kernelTransactionTimeoutMonitor, scheduler,
config.get( GraphDatabaseSettings.transaction_monitor_check_interval ).toMillis() ); config.get( GraphDatabaseSettings.transaction_monitor_check_interval ).toMillis() );
Expand Down Expand Up @@ -803,14 +800,12 @@ public InwardKernel getKernel()


public ResourceIterator<StoreFileMetadata> listStoreFiles( boolean includeLogs ) throws IOException public ResourceIterator<StoreFileMetadata> listStoreFiles( boolean includeLogs ) throws IOException
{ {
if ( includeLogs ) NeoStoreFileListing.StoreFileListingBuilder fileListingBuilder = getNeoStoreFileListing().builder();
if ( !includeLogs )
{ {
return getNeoStoreFileListing().builder().build(); fileListingBuilder.excludeLogFiles();
}
else
{
return getNeoStoreFileListing().builder().excludeLogFiles().build();
} }
return fileListingBuilder.build();
} }


public NeoStoreFileListing getNeoStoreFileListing() public NeoStoreFileListing getNeoStoreFileListing()
Expand All @@ -826,7 +821,7 @@ public void registerDiagnosticsWith( DiagnosticsManager manager )


public DependencyResolver getDependencyResolver() public DependencyResolver getDependencyResolver()
{ {
return dependencies; return dataSourceDependencies;
} }


private StatementOperationParts buildStatementOperations( AtomicReference<CpuClock> cpuClockRef, private StatementOperationParts buildStatementOperations( AtomicReference<CpuClock> cpuClockRef,
Expand Down Expand Up @@ -888,6 +883,7 @@ public StoreCopyCheckPointMutex getStoreCopyCheckPointMutex()
} }


// For test purposes only, not thread safe // For test purposes only, not thread safe
@VisibleForTesting
public LifeSupport getLife() public LifeSupport getLife()
{ {
return life; return life;
Expand Down
Expand Up @@ -19,41 +19,28 @@
*/ */
package org.neo4j.kernel.impl.api; package org.neo4j.kernel.impl.api;


import java.util.Objects;

import org.neo4j.kernel.impl.api.operations.QueryRegistrationOperations; import org.neo4j.kernel.impl.api.operations.QueryRegistrationOperations;


import static org.apache.commons.lang3.ObjectUtils.firstNonNull;

public class StatementOperationParts public class StatementOperationParts
{ {
private final QueryRegistrationOperations queryRegistrationOperations; private final QueryRegistrationOperations queryRegistrationOperations;


public StatementOperationParts( public StatementOperationParts( QueryRegistrationOperations queryRegistrationOperations )
QueryRegistrationOperations queryRegistrationOperations )
{ {
this.queryRegistrationOperations = queryRegistrationOperations; this.queryRegistrationOperations = queryRegistrationOperations;
} }


public QueryRegistrationOperations queryRegistrationOperations() QueryRegistrationOperations queryRegistrationOperations()
{ {
return checkNotNull( queryRegistrationOperations, QueryRegistrationOperations.class ); return Objects.requireNonNull( queryRegistrationOperations, "No part of type " + QueryRegistrationOperations.class.getSimpleName() + " assigned" );
} }


public StatementOperationParts override( QueryRegistrationOperations queryRegistrationOperations ) public StatementOperationParts override( QueryRegistrationOperations queryRegistrationOperations )
{ {
return new StatementOperationParts( return new StatementOperationParts( firstNonNull( queryRegistrationOperations, this.queryRegistrationOperations ) );
eitherOr( queryRegistrationOperations, this.queryRegistrationOperations, QueryRegistrationOperations.class ) );
}

private <T> T checkNotNull( T object, Class<T> cls )
{
if ( object == null )
{
throw new IllegalStateException( "No part of type " + cls.getSimpleName() + " assigned" );
}
return object;
}

private <T> T eitherOr( T first, T other,
@SuppressWarnings( "UnusedParameters"/*used as type flag*/ ) Class<T> cls )
{
return first != null ? first : other;
} }
} }
Expand Up @@ -21,7 +21,6 @@


import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.helpers.Listeners; import org.neo4j.helpers.Listeners;
import org.neo4j.internal.kernel.api.Kernel; import org.neo4j.internal.kernel.api.Kernel;
import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.NeoStoreDataSource;
Expand Down Expand Up @@ -132,25 +131,4 @@ public Kernel get()
{ {
return dataSource.getKernel(); return dataSource.getKernel();
} }

public static class DependencyResolverSupplier implements Supplier<DependencyResolver>
{
private DataSourceManager dataSourceManager;

public DependencyResolverSupplier( DataSourceManager dataSourceManager )
{
this.dataSourceManager = dataSourceManager;
}

@Override
public DependencyResolver get()
{
NeoStoreDataSource dataSource = dataSourceManager.getDataSource();
if ( dataSource == null )
{
return null;
}
return dataSource.getDependencyResolver();
}
}
} }
Expand Up @@ -19,19 +19,20 @@
*/ */
package org.neo4j.kernel.impl.util; package org.neo4j.kernel.impl.util;


import java.util.ArrayList; import org.eclipse.collections.api.RichIterable;
import java.util.HashMap; import org.eclipse.collections.api.multimap.list.MutableListMultimap;
import java.util.List; import org.eclipse.collections.impl.factory.Multimaps;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.DependencyResolver;


@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
public class Dependencies extends DependencyResolver.Adapter implements DependencySatisfier public class Dependencies extends DependencyResolver.Adapter implements DependencySatisfier
{ {
private final Supplier<DependencyResolver> parent; private final DependencyResolver parent;
private final Map<Class<?>, List<?>> typeDependencies = new HashMap<>(); private final MutableListMultimap<Class, Object> typeDependencies = Multimaps.mutable.list.empty();


public Dependencies() public Dependencies()
{ {
Expand All @@ -40,33 +41,23 @@ public Dependencies()


public Dependencies( final DependencyResolver parent ) public Dependencies( final DependencyResolver parent )
{ {
this.parent = () -> parent; Objects.requireNonNull( parent );
}

public Dependencies( Supplier<DependencyResolver> parent )
{
this.parent = parent; this.parent = parent;
} }


@Override @Override
public <T> T resolveDependency( Class<T> type, SelectionStrategy selector ) public <T> T resolveDependency( Class<T> type, SelectionStrategy selector )
{ {
List<?> options = typeDependencies.get( type ); RichIterable options = typeDependencies.get( type );

if ( options.notEmpty() )
if ( options != null )
{ {
return selector.select( type, (Iterable<T>) options); return selector.select( type, (Iterable<T>) options );
} }


// Try parent // Try parent
if ( parent != null ) if ( parent != null )
{ {
DependencyResolver dependencyResolver = parent.get(); return parent.resolveDependency( type, selector );

if ( dependencyResolver != null )
{
return dependencyResolver.resolveDependency( type, selector );
}
} }


// Out of options // Out of options
Expand All @@ -92,13 +83,7 @@ public <T> T satisfyDependency( T dependency )
Class<?> type = dependency.getClass(); Class<?> type = dependency.getClass();
do do
{ {
List<Object> deps = (List<Object>) typeDependencies.get( type ); typeDependencies.put( type, dependency );
if ( deps == null )
{
deps = new ArrayList<>( );
typeDependencies.put(type, deps);
}
deps.add( dependency );


// Add as all interfaces // Add as all interfaces
Class<?>[] interfaces = type.getInterfaces(); Class<?>[] interfaces = type.getInterfaces();
Expand All @@ -123,15 +108,7 @@ private <T> void addInterfaces( Class<?>[] interfaces, T dependency )
{ {
for ( Class<?> type : interfaces ) for ( Class<?> type : interfaces )
{ {
List<Object> deps = (List<Object>) typeDependencies.get( type ); typeDependencies.put( type, dependency );
if ( deps == null )
{
deps = new ArrayList<>( );
typeDependencies.put(type, deps);
}
deps.add( dependency );

// Add as all sub-interfaces
addInterfaces(type.getInterfaces(), dependency); addInterfaces(type.getInterfaces(), dependency);
} }
} }
Expand Down

0 comments on commit cc73c11

Please sign in to comment.