Skip to content

Commit

Permalink
Moved Neo4jTransactionalContextFactory creation out of DataSourceModule
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd committed Sep 16, 2016
1 parent afb14d8 commit ef25a31
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 110 deletions.
Expand Up @@ -72,7 +72,7 @@ class TransactionBoundQueryContextTest extends CypherFunSuite {
// GIVEN // GIVEN
when(outerTx.failure()).thenThrow(new AssertionError("Shouldn't be called")) when(outerTx.failure()).thenThrow(new AssertionError("Shouldn't be called"))
val tc = new Neo4jTransactionalContext(graph, outerTx, KernelTransaction.Type.`implicit`, AccessMode.Static.FULL, val tc = new Neo4jTransactionalContext(graph, outerTx, KernelTransaction.Type.`implicit`, AccessMode.Static.FULL,
statement, null, locker, null, null, null) statement, null, locker, null, null, null, null)
val transactionalContext = TransactionalContextWrapperv3_1(tc) val transactionalContext = TransactionalContextWrapperv3_1(tc)
val context = new TransactionBoundQueryContext(transactionalContext)(indexSearchMonitor) val context = new TransactionBoundQueryContext(transactionalContext)(indexSearchMonitor)
// WHEN // WHEN
Expand All @@ -88,7 +88,7 @@ class TransactionBoundQueryContextTest extends CypherFunSuite {
// GIVEN // GIVEN
when(outerTx.success()).thenThrow(new AssertionError("Shouldn't be called")) when(outerTx.success()).thenThrow(new AssertionError("Shouldn't be called"))
val tc = new Neo4jTransactionalContext(graph, outerTx, KernelTransaction.Type.`implicit`, AccessMode.Static.FULL, val tc = new Neo4jTransactionalContext(graph, outerTx, KernelTransaction.Type.`implicit`, AccessMode.Static.FULL,
statement, null, locker, null, null, null) statement, null, locker, null, null, null, null)
val transactionalContext = TransactionalContextWrapperv3_1(tc) val transactionalContext = TransactionalContextWrapperv3_1(tc)
val context = new TransactionBoundQueryContext(transactionalContext)(indexSearchMonitor) val context = new TransactionBoundQueryContext(transactionalContext)(indexSearchMonitor)
// WHEN // WHEN
Expand Down
Expand Up @@ -21,7 +21,6 @@


import java.io.File; import java.io.File;
import java.time.Clock; import java.time.Clock;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.DependencyResolver;
Expand Down Expand Up @@ -67,7 +66,6 @@
import org.neo4j.kernel.impl.proc.ProcedureGDSFactory; import org.neo4j.kernel.impl.proc.ProcedureGDSFactory;
import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.proc.TypeMappers.SimpleConverter; import org.neo4j.kernel.impl.proc.TypeMappers.SimpleConverter;
import org.neo4j.kernel.impl.query.QueryEngineProvider;
import org.neo4j.kernel.impl.query.QueryExecutionEngine; import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile; import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
Expand Down Expand Up @@ -118,8 +116,8 @@ public class DataSourceModule


public final AutoIndexing autoIndexing; public final AutoIndexing autoIndexing;


public DataSourceModule( final GraphDatabaseFacadeFactory.Dependencies dependencies, public DataSourceModule( final PlatformModule platformModule, EditionModule editionModule,
final PlatformModule platformModule, EditionModule editionModule ) Supplier<QueryExecutionEngine> queryExecutionEngineSupplier )
{ {
final Dependencies deps = platformModule.dependencies; final Dependencies deps = platformModule.dependencies;
Config config = platformModule.config; Config config = platformModule.config;
Expand All @@ -131,6 +129,7 @@ public DataSourceModule( final GraphDatabaseFacadeFactory.Dependencies dependenc
RelationshipTypeTokenHolder relationshipTypeTokenHolder = editionModule.relationshipTypeTokenHolder; RelationshipTypeTokenHolder relationshipTypeTokenHolder = editionModule.relationshipTypeTokenHolder;
File storeDir = platformModule.storeDir; File storeDir = platformModule.storeDir;
DiagnosticsManager diagnosticsManager = platformModule.diagnosticsManager; DiagnosticsManager diagnosticsManager = platformModule.diagnosticsManager;
this.queryExecutor = queryExecutionEngineSupplier;


threadToTransactionBridge = deps.satisfyDependency( life.add( new ThreadToStatementContextBridge() ) ); threadToTransactionBridge = deps.satisfyDependency( life.add( new ThreadToStatementContextBridge() ) );


Expand Down Expand Up @@ -169,8 +168,6 @@ public DataSourceModule( final GraphDatabaseFacadeFactory.Dependencies dependenc


autoIndexing = new InternalAutoIndexing( platformModule.config, editionModule.propertyKeyTokenHolder ); autoIndexing = new InternalAutoIndexing( platformModule.config, editionModule.propertyKeyTokenHolder );


AtomicReference<QueryExecutionEngine> queryExecutor = new AtomicReference<>( QueryEngineProvider.noEngine() );
this.queryExecutor = queryExecutor::get;
Procedures procedures = setupProcedures( platformModule, editionModule ); Procedures procedures = setupProcedures( platformModule, editionModule );


deps.satisfyDependency( new NonTransactionalDbmsOperations.Factory( procedures ) ); deps.satisfyDependency( new NonTransactionalDbmsOperations.Factory( procedures ) );
Expand Down Expand Up @@ -228,29 +225,6 @@ public DataSourceModule( final GraphDatabaseFacadeFactory.Dependencies dependenc
// Kernel event handlers should be the very last, i.e. very first to receive shutdown events // Kernel event handlers should be the very last, i.e. very first to receive shutdown events
life.add( kernelEventHandlers ); life.add( kernelEventHandlers );


dataSourceManager.addListener( new DataSourceManager.Listener()
{
private QueryExecutionEngine engine;

@Override
public void registered( NeoStoreDataSource dataSource )
{
if ( engine == null )
{
engine = QueryEngineProvider.initialize( deps, platformModule.graphDatabaseFacade,
dependencies.executionEngines() );
}

queryExecutor.set( engine );
}

@Override
public void unregistered( NeoStoreDataSource dataSource )
{
queryExecutor.set( QueryEngineProvider.noEngine() );
}
} );

this.storeId = neoStoreDataSource::getStoreId; this.storeId = neoStoreDataSource::getStoreId;
this.kernelAPI = neoStoreDataSource::getKernel; this.kernelAPI = neoStoreDataSource::getKernel;
} }
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.net.URL; import java.net.URL;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.neo4j.collection.primitive.PrimitiveLongCollections; import org.neo4j.collection.primitive.PrimitiveLongCollections;
Expand Down Expand Up @@ -99,7 +98,6 @@


import static java.lang.String.format; import static java.lang.String.format;
import static org.neo4j.collection.primitive.PrimitiveLongCollections.map; import static org.neo4j.collection.primitive.PrimitiveLongCollections.map;
import static org.neo4j.function.Suppliers.lazySingleton;
import static org.neo4j.helpers.collection.Iterators.emptyIterator; import static org.neo4j.helpers.collection.Iterators.emptyIterator;
import static org.neo4j.kernel.impl.api.operations.KeyReadOperations.NO_SUCH_LABEL; import static org.neo4j.kernel.impl.api.operations.KeyReadOperations.NO_SUCH_LABEL;
import static org.neo4j.kernel.impl.api.operations.KeyReadOperations.NO_SUCH_PROPERTY_KEY; import static org.neo4j.kernel.impl.api.operations.KeyReadOperations.NO_SUCH_PROPERTY_KEY;
Expand All @@ -118,7 +116,7 @@ public class GraphDatabaseFacade implements GraphDatabaseAPI
private NodeProxy.NodeActions nodeActions; private NodeProxy.NodeActions nodeActions;
private RelationshipProxy.RelationshipActions relActions; private RelationshipProxy.RelationshipActions relActions;
private SPI spi; private SPI spi;
private Supplier<TransactionalContextFactory> contextFactorySupplier; private TransactionalContextFactory contextFactory;
private long defaultTransactionTimeout; private long defaultTransactionTimeout;


/** /**
Expand Down Expand Up @@ -197,7 +195,7 @@ public GraphDatabaseFacade()
/** /**
* Create a new Core API facade, backed by the given SPI. * Create a new Core API facade, backed by the given SPI.
*/ */
public void init( SPI spi, Config config ) public void initSPI( SPI spi, Config config )
{ {
defaultTransactionTimeout = config.get( GraphDatabaseSettings.transaction_timeout ); defaultTransactionTimeout = config.get( GraphDatabaseSettings.transaction_timeout );
IndexProviderImpl idxProvider = new IndexProviderImpl( this, spi::currentStatement ); IndexProviderImpl idxProvider = new IndexProviderImpl( this, spi::currentStatement );
Expand All @@ -219,7 +217,17 @@ public void init( SPI spi, Config config )
.getOrCreateRelationshipIndex( InternalAutoIndexing.RELATIONSHIP_AUTO_INDEX, null ) ), .getOrCreateRelationshipIndex( InternalAutoIndexing.RELATIONSHIP_AUTO_INDEX, null ) ),
spi.autoIndexing().relationships() ); spi.autoIndexing().relationships() );
this.indexManager = new IndexManagerImpl( spi::currentStatement, idxProvider, nodeAutoIndexer, relAutoIndexer ); this.indexManager = new IndexManagerImpl( spi::currentStatement, idxProvider, nodeAutoIndexer, relAutoIndexer );
this.contextFactorySupplier = lazySingleton( () -> new Neo4jTransactionalContextFactory( spi, locker ) ); }

/**
* This needs to be called after data source creation, and the execution engine dependency is satisfied by the
* cypher module GraphData
*
* @see GraphDatabaseFacadeFactory#initFacade(File, Map, GraphDatabaseFacadeFactory.Dependencies, GraphDatabaseFacade)
*/
public void initTransactionalContextFactoryFromSPI()
{
contextFactory = new Neo4jTransactionalContextFactory( spi, locker );
} }


@Override @Override
Expand Down Expand Up @@ -380,7 +388,6 @@ public Result execute( String query, Map<String,Object> parameters ) throws Quer
// ensure we have a tx and create a context (the tx is gonna get closed by the Cypher result) // ensure we have a tx and create a context (the tx is gonna get closed by the Cypher result)
InternalTransaction transaction = beginTransaction( KernelTransaction.Type.implicit, AccessMode.Static.FULL ); InternalTransaction transaction = beginTransaction( KernelTransaction.Type.implicit, AccessMode.Static.FULL );


TransactionalContextFactory contextFactory = contextFactorySupplier.get();
TransactionalContext context = TransactionalContext context =
contextFactory.newContext( QueryEngineProvider.describe(), transaction, query, parameters ); contextFactory.newContext( QueryEngineProvider.describe(), transaction, query, parameters );
return spi.executeQuery( query, parameters, context ); return spi.executeQuery( query, parameters, context );
Expand All @@ -399,7 +406,6 @@ public Result execute( String query, Map<String,Object> parameters, long timeout
public Result execute( InternalTransaction transaction, String query, Map<String,Object> parameters ) public Result execute( InternalTransaction transaction, String query, Map<String,Object> parameters )
throws QueryExecutionException throws QueryExecutionException
{ {
TransactionalContextFactory contextFactory = contextFactorySupplier.get();
TransactionalContext context = TransactionalContext context =
contextFactory.newContext( QueryEngineProvider.describe(), transaction, query, parameters ); contextFactory.newContext( QueryEngineProvider.describe(), transaction, query, parameters );
return spi.executeQuery( query, parameters, context ); return spi.executeQuery( query, parameters, context );
Expand Down
Expand Up @@ -21,16 +21,21 @@


import java.io.File; import java.io.File;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier;


import org.neo4j.graphdb.config.Setting; import org.neo4j.graphdb.config.Setting;
import org.neo4j.graphdb.security.URLAccessRule; import org.neo4j.graphdb.security.URLAccessRule;
import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.extension.KernelExtensionFactory; import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.coreapi.CoreAPIAvailabilityGuard; import org.neo4j.kernel.impl.coreapi.CoreAPIAvailabilityGuard;
import org.neo4j.kernel.impl.query.QueryEngineProvider; import org.neo4j.kernel.impl.query.QueryEngineProvider;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.logging.Logger; import org.neo4j.logging.Logger;
Expand All @@ -40,6 +45,7 @@
import static org.neo4j.kernel.configuration.Settings.illegalValueMessage; import static org.neo4j.kernel.configuration.Settings.illegalValueMessage;
import static org.neo4j.kernel.configuration.Settings.matches; import static org.neo4j.kernel.configuration.Settings.matches;
import static org.neo4j.kernel.configuration.Settings.setting; import static org.neo4j.kernel.configuration.Settings.setting;
import static org.neo4j.kernel.impl.query.QueryEngineProvider.noEngine;


/** /**
* This is the main factory for creating database instances. It delegates creation to three different modules * This is the main factory for creating database instances. It delegates creation to three different modules
Expand Down Expand Up @@ -133,14 +139,40 @@ public GraphDatabaseFacade initFacade( File storeDir, Map<String,String> params,
{ {
PlatformModule platform = createPlatform( storeDir, params, dependencies, graphDatabaseFacade ); PlatformModule platform = createPlatform( storeDir, params, dependencies, graphDatabaseFacade );
EditionModule edition = editionFactory.apply( platform ); EditionModule edition = editionFactory.apply( platform );
final DataSourceModule dataSource = createDataSource( dependencies, platform, edition );
AtomicReference<QueryExecutionEngine> queryEngine = new AtomicReference<>( noEngine() );
final DataSourceModule dataSource = createDataSource( platform, edition, queryEngine::get );
Logger msgLog = platform.logging.getInternalLog( getClass() ).infoLogger(); Logger msgLog = platform.logging.getInternalLog( getClass() ).infoLogger();
CoreAPIAvailabilityGuard coreAPIAvailabilityGuard = edition.coreAPIAvailabilityGuard; CoreAPIAvailabilityGuard coreAPIAvailabilityGuard = edition.coreAPIAvailabilityGuard;


// Start it // Start it
graphDatabaseFacade.init( new ClassicCoreSPI( platform, dataSource, msgLog, coreAPIAvailabilityGuard ), graphDatabaseFacade.initSPI( new ClassicCoreSPI( platform, dataSource, msgLog, coreAPIAvailabilityGuard ),
platform.config ); platform.config );


platform.dataSourceManager.addListener( new DataSourceManager.Listener()
{
private QueryExecutionEngine engine;

@Override
public void registered( NeoStoreDataSource dataSource )
{
if ( engine == null )
{
engine = QueryEngineProvider.initialize(
platform.dependencies, platform.graphDatabaseFacade, dependencies.executionEngines() );
graphDatabaseFacade.initTransactionalContextFactoryFromSPI();
}

queryEngine.set( engine );
}

@Override
public void unregistered( NeoStoreDataSource dataSource )
{
queryEngine.set( noEngine() );
}
} );

Throwable error = null; Throwable error = null;
try try
{ {
Expand Down Expand Up @@ -191,10 +223,10 @@ protected PlatformModule createPlatform( File storeDir, Map<String,String> param
/** /**
* Create the datasource module. Override to replace with custom module. * Create the datasource module. Override to replace with custom module.
*/ */
protected DataSourceModule createDataSource( final Dependencies dependencies, final PlatformModule platformModule, protected DataSourceModule createDataSource( final PlatformModule platformModule, EditionModule editionModule,
EditionModule editionModule ) Supplier<QueryExecutionEngine> queryEngine )
{ {
return new DataSourceModule( dependencies, platformModule, editionModule ); return new DataSourceModule( platformModule, editionModule, queryEngine );
} }


private void enableAvailabilityLogging( AvailabilityGuard availabilityGuard, final Logger msgLog ) private void enableAvailabilityLogging( AvailabilityGuard availabilityGuard, final Logger msgLog )
Expand Down
Expand Up @@ -71,8 +71,9 @@ public GraphDatabaseService apply( Context context ) throws ProcedureException
KernelTransaction transaction = context.get( Context.KERNEL_TRANSACTION ); KernelTransaction transaction = context.get( Context.KERNEL_TRANSACTION );
Thread owningThread = context.get( Context.THREAD ); Thread owningThread = context.get( Context.THREAD );
GraphDatabaseFacade facade = new GraphDatabaseFacade(); GraphDatabaseFacade facade = new GraphDatabaseFacade();
facade.init( new ProcedureGDBFacadeSPI( owningThread, transaction, queryExecutor, storeDir, resolver, facade.initSPI( new ProcedureGDBFacadeSPI( owningThread, transaction, queryExecutor, storeDir, resolver,
AutoIndexing.UNSUPPORTED, storeId, availability, urlValidator ), config ); AutoIndexing.UNSUPPORTED, storeId, availability, urlValidator ), config );
facade.initTransactionalContextFactoryFromSPI();
return facade; return facade;
} }


Expand Down
Expand Up @@ -19,8 +19,6 @@
*/ */
package org.neo4j.kernel.impl.query; package org.neo4j.kernel.impl.query;


import java.util.function.Function;

import org.neo4j.graphdb.Lock; import org.neo4j.graphdb.Lock;
import org.neo4j.graphdb.PropertyContainer; import org.neo4j.graphdb.PropertyContainer;
import org.neo4j.kernel.GraphDatabaseQueryService; import org.neo4j.kernel.GraphDatabaseQueryService;
Expand All @@ -46,6 +44,7 @@ public class Neo4jTransactionalContext implements TransactionalContext
private final AccessMode mode; private final AccessMode mode;
private final DbmsOperations.Factory dbmsOperationsFactory; private final DbmsOperations.Factory dbmsOperationsFactory;
private final Guard guard; private final Guard guard;
private final TransactionalContextFactory factory;


private InternalTransaction transaction; private InternalTransaction transaction;
private Statement statement; private Statement statement;
Expand All @@ -64,8 +63,9 @@ public Neo4jTransactionalContext(
PropertyContainerLocker locker, PropertyContainerLocker locker,
ThreadToStatementContextBridge txBridge, ThreadToStatementContextBridge txBridge,
DbmsOperations.Factory dbmsOperationsFactory, DbmsOperations.Factory dbmsOperationsFactory,
Guard guard ) Guard guard,
{ TransactionalContextFactory factory
) {
this.graph = graph; this.graph = graph;
this.transaction = initialTransaction; this.transaction = initialTransaction;
this.transactionType = transactionType; this.transactionType = transactionType;
Expand All @@ -76,6 +76,7 @@ public Neo4jTransactionalContext(
this.txBridge = txBridge; this.txBridge = txBridge;
this.dbmsOperationsFactory = dbmsOperationsFactory; this.dbmsOperationsFactory = dbmsOperationsFactory;
this.guard = guard; this.guard = guard;
this.factory = factory;
} }


@Override @Override
Expand Down Expand Up @@ -203,20 +204,7 @@ public TransactionalContext getOrBeginNewIfClosed()
else else
{ {
InternalTransaction transaction = graph.beginTransaction( transactionType, mode ); InternalTransaction transaction = graph.beginTransaction( transactionType, mode );
Statement statement = txBridge.get(); return factory.newContext( this.executingQuery, transaction );
statement.queryRegistration().registerExecutingQuery( executingQuery );
return new Neo4jTransactionalContext(
graph,
transaction,
transaction.transactionType(),
transaction.mode(),
statement,
executingQuery,
locker,
graph.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ),
graph.getDependencyResolver().resolveDependency( DbmsOperations.Factory.class ),
guard
);
} }
} }


Expand Down

0 comments on commit ef25a31

Please sign in to comment.