Skip to content

Commit

Permalink
Move bridge to edition module since now it has a dependency from global
Browse files Browse the repository at this point in the history
guard.
Use global guard for now in a bridge since changing that atm changing
observable exceptions that user will get in erroneous cases.
  • Loading branch information
MishaDemianenko committed Aug 24, 2018
1 parent 9d3941a commit 84a8af5
Show file tree
Hide file tree
Showing 23 changed files with 58 additions and 71 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.internal.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.availability.AvailabilityGuard;
import org.neo4j.kernel.availability.DatabaseAvailabilityGuard;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
Expand Down Expand Up @@ -123,16 +124,15 @@ private static TransactionStateMachineV1SPI createTxSpi( Supplier<TransactionIdS
}

private static TransactionStateMachineV1SPI createTxSpi( Supplier<TransactionIdStore> txIdStore, Duration txAwaitDuration,
DatabaseAvailabilityGuard databaseAvailabilityGuard, Clock clock )
AvailabilityGuard availabilityGuard, Clock clock )
{
QueryExecutionEngine queryExecutionEngine = mock( QueryExecutionEngine.class );

DependencyResolver dependencyResolver = mock( DependencyResolver.class );
ThreadToStatementContextBridge bridge = new ThreadToStatementContextBridge();
ThreadToStatementContextBridge bridge = new ThreadToStatementContextBridge( availabilityGuard );
when( dependencyResolver.resolveDependency( ThreadToStatementContextBridge.class ) ).thenReturn( bridge );
when( dependencyResolver.resolveDependency( QueryExecutionEngine.class ) ).thenReturn( queryExecutionEngine );
when( dependencyResolver.provideDependency( TransactionIdStore.class ) ).thenReturn( txIdStore );
when( dependencyResolver.resolveDependency( DatabaseAvailabilityGuard.class ) ).thenReturn( databaseAvailabilityGuard );

GraphDatabaseAPI db = mock( GraphDatabaseAPI.class );
when( db.getDependencyResolver() ).thenReturn( dependencyResolver );
Expand Down
Expand Up @@ -65,7 +65,6 @@
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.query.ExecutingQuery;
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.availability.AvailabilityGuard;
import org.neo4j.kernel.impl.api.ClockContext;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
Expand Down Expand Up @@ -796,12 +795,6 @@ public void setMetaData( Map<String,Object> metaData )
internal.setMetaData( metaData );
}

@Override
public AvailabilityGuard getAvailabilityGuard()
{
return internal.getAvailabilityGuard();
}

@Override
public void assertOpen()
{
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.neo4j.internal.kernel.api.security.AuthSubject;
import org.neo4j.internal.kernel.api.security.LoginContext;
import org.neo4j.internal.kernel.api.security.SecurityContext;
import org.neo4j.kernel.availability.AvailabilityGuard;
import org.neo4j.kernel.impl.api.ClockContext;
import org.neo4j.storageengine.api.schema.IndexDescriptor;

Expand Down Expand Up @@ -178,11 +177,6 @@ interface CloseListener
*/
void setMetaData( Map<String, Object> metaData );

/**
* @return database availability guard of database this transaction was started against.
*/
AvailabilityGuard getAvailabilityGuard();

@FunctionalInterface
interface Revertable extends AutoCloseable
{
Expand Down
Expand Up @@ -68,7 +68,6 @@
import org.neo4j.kernel.api.txstate.ExplicitIndexTransactionState;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.availability.AvailabilityGuard;
import org.neo4j.kernel.impl.api.index.IndexingProvidersService;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.state.TxState;
Expand Down Expand Up @@ -143,7 +142,6 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta
private final TransactionMonitor transactionMonitor;
private final PageCursorTracerSupplier cursorTracerSupplier;
private final VersionContextSupplier versionContextSupplier;
private final AvailabilityGuard databaseAvailabilityGuard;
private final StorageReader storageReader;
private final ClockContext clocks;
private final AccessCapability accessCapability;
Expand Down Expand Up @@ -195,8 +193,7 @@ public KernelTransactionImplementation( StatementOperationParts statementOperati
TransactionTracer transactionTracer, LockTracer lockTracer, PageCursorTracerSupplier cursorTracerSupplier, StorageEngine storageEngine,
AccessCapability accessCapability, AutoIndexing autoIndexing, ExplicitIndexStore explicitIndexStore, VersionContextSupplier versionContextSupplier,
CollectionsFactorySupplier collectionsFactorySupplier, ConstraintSemantics constraintSemantics, SchemaState schemaState,
IndexingProvidersService indexProviders, TokenHolders tokenHolders, Dependencies dataSourceDependencies,
AvailabilityGuard databaseAvailabilityGuard )
IndexingProvidersService indexProviders, TokenHolders tokenHolders, Dependencies dataSourceDependencies )
{
this.schemaWriteGuard = schemaWriteGuard;
this.hooks = hooks;
Expand All @@ -212,7 +209,6 @@ public KernelTransactionImplementation( StatementOperationParts statementOperati
this.transactionTracer = transactionTracer;
this.cursorTracerSupplier = cursorTracerSupplier;
this.versionContextSupplier = versionContextSupplier;
this.databaseAvailabilityGuard = databaseAvailabilityGuard;
this.currentStatement = new KernelStatement( this, this, storageReader,
lockTracer, statementOperations, this.clocks,
versionContextSupplier );
Expand Down Expand Up @@ -394,12 +390,6 @@ public void setMetaData( Map<String, Object> data )
this.userMetaData = data;
}

@Override
public AvailabilityGuard getAvailabilityGuard()
{
return databaseAvailabilityGuard;
}

public Map<String, Object> getMetaData()
{
return userMetaData;
Expand Down
Expand Up @@ -369,7 +369,7 @@ public KernelTransactionImplementation newInstance()
tracers.pageCursorTracerSupplier, storageEngine, accessCapability,
autoIndexing,
explicitIndexStore, versionContextSupplier, collectionsFactorySupplier, constraintSemantics,
schemaState, indexProviders, tokenHolders, dataSourceDependencies, databaseAvailabilityGuard );
schemaState, indexProviders, tokenHolders, dataSourceDependencies );
this.transactions.add( tx );
return tx;
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.availability.AvailabilityGuard;

/**
* This is meant to serve as the bridge that tie transactions to threads.
Expand All @@ -35,6 +36,12 @@
public class ThreadToStatementContextBridge implements Supplier<Statement>
{
private final ThreadLocal<KernelTransaction> threadToTransactionMap = new ThreadLocal<>();
private final AvailabilityGuard availabilityGuard;

public ThreadToStatementContextBridge( AvailabilityGuard availabilityGuard )
{
this.availabilityGuard = availabilityGuard;
}

public boolean hasTransaction()
{
Expand Down Expand Up @@ -82,15 +89,15 @@ public KernelTransaction getKernelTransactionBoundToThisThread( boolean strict )
return transaction;
}

private static void assertInUnterminatedTransaction( KernelTransaction transaction )
private void assertInUnterminatedTransaction( KernelTransaction transaction )
{
if ( transaction == null )
if ( availabilityGuard.isShutdown() )
{
throw new BridgeNotInTransactionException();
throw new DatabaseShutdownException();
}
if ( transaction.getAvailabilityGuard().isShutdown() )
if ( transaction == null )
{
throw new DatabaseShutdownException();
throw new BridgeNotInTransactionException();
}
if ( transaction.isTerminated() )
{
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier;
import org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier;
import org.neo4j.kernel.api.explicitindex.AutoIndexing;
import org.neo4j.kernel.availability.DatabaseAvailabilityGuard;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.kernel.impl.api.SchemaWriteGuard;
Expand Down Expand Up @@ -100,7 +99,7 @@ static Instances kernelTransactionWithInternals( LoginContext loginContext )
PageCursorTracerSupplier.NULL,
storageEngine, new CanWrite(), AutoIndexing.UNSUPPORTED,
mock( ExplicitIndexStore.class ), EmptyVersionContextSupplier.EMPTY, ON_HEAP, new StandardConstraintSemantics(),
mock( SchemaState.class), mock( IndexingService.class ), mockedTokenHolders(), new Dependencies(), mock( DatabaseAvailabilityGuard.class ) );
mock( SchemaState.class), mock( IndexingService.class ), mockedTokenHolders(), new Dependencies() );

StatementLocks statementLocks = new SimpleStatementLocks( new NoOpClient() );

Expand Down
Expand Up @@ -44,7 +44,6 @@
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.availability.AvailabilityGuard;
import org.neo4j.kernel.impl.api.ClockContext;
import org.neo4j.storageengine.api.schema.IndexDescriptor;

Expand Down Expand Up @@ -283,12 +282,6 @@ public void setMetaData( Map<String,Object> metaData )
throw new UnsupportedOperationException( "not implemented" );
}

@Override
public AvailabilityGuard getAvailabilityGuard()
{
throw new UnsupportedOperationException( "not implemented" );
}

@Override
public void assertOpen()
{
Expand Down
Expand Up @@ -38,7 +38,6 @@
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.explicitindex.AutoIndexing;
import org.neo4j.kernel.api.txstate.ExplicitIndexTransactionState;
import org.neo4j.kernel.availability.DatabaseAvailabilityGuard;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics;
Expand Down Expand Up @@ -353,7 +352,7 @@ private static class TestKernelTransaction extends KernelTransactionImplementat
mock( StorageEngine.class, RETURNS_MOCKS ), new CanWrite(),
AutoIndexing.UNSUPPORTED, mock( ExplicitIndexStore.class ),
EmptyVersionContextSupplier.EMPTY, ON_HEAP, new StandardConstraintSemantics(), mock( SchemaState.class),
mock( IndexingService.class ), mockedTokenHolders(), new Dependencies(), mock( DatabaseAvailabilityGuard.class ) );
mock( IndexingService.class ), mockedTokenHolders(), new Dependencies() );

this.monitor = monitor;
}
Expand Down
Expand Up @@ -173,7 +173,7 @@ public KernelTransactionImplementation newNotInitializedTransaction()
new CanWrite(), AutoIndexing.UNSUPPORTED,
mock( ExplicitIndexStore.class ), EmptyVersionContextSupplier.EMPTY, () -> collectionsFactory,
new StandardConstraintSemantics(), mock( SchemaState.class),
mock( IndexingService.class ), mockedTokenHolders(), new Dependencies(), availabilityGuard );
mock( IndexingService.class ), mockedTokenHolders(), new Dependencies() );
}

public class CapturingCommitProcess implements TransactionCommitProcess
Expand Down
Expand Up @@ -64,8 +64,8 @@ public GraphDatabaseFacade createDatabase( String name )
checkState( database == null, "Database is already created, fail to create another one." );

DataSourceModule dataSource = new DataSourceModule( name, platform, edition, procedures, graphDatabaseFacade );
ClassicCoreSPI spi = new ClassicCoreSPI( platform, dataSource, msgLog, dataSource.getCoreAPIAvailabilityGuard() );
graphDatabaseFacade.init( spi, platform.threadToTransactionBridge, platform.config, dataSource.neoStoreDataSource.getTokenHolders() );
ClassicCoreSPI spi = new ClassicCoreSPI( platform, dataSource, msgLog, dataSource.getCoreAPIAvailabilityGuard(), edition.threadToTransactionBridge );
graphDatabaseFacade.init( spi, edition.threadToTransactionBridge, platform.config, dataSource.neoStoreDataSource.getTokenHolders() );
platform.dataSourceManager.register( dataSource.neoStoreDataSource );
database = graphDatabaseFacade;
return database;
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.explicitindex.AutoIndexing;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.CoreAPIAvailabilityGuard;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;
Expand All @@ -56,14 +57,16 @@ public class ClassicCoreSPI implements GraphDatabaseFacade.SPI
private final DataSourceModule dataSource;
private final Logger msgLog;
private final CoreAPIAvailabilityGuard availability;
private final ThreadToStatementContextBridge threadToTransactionBridge;

public ClassicCoreSPI( PlatformModule platform, DataSourceModule dataSource, Logger msgLog,
CoreAPIAvailabilityGuard availability )
public ClassicCoreSPI( PlatformModule platform, DataSourceModule dataSource, Logger msgLog, CoreAPIAvailabilityGuard availability,
ThreadToStatementContextBridge threadToTransactionBridge )
{
this.platform = platform;
this.dataSource = dataSource;
this.msgLog = msgLog;
this.availability = availability;
this.threadToTransactionBridge = threadToTransactionBridge;
}

@Override
Expand Down Expand Up @@ -181,9 +184,8 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, LoginCon
{
availability.assertDatabaseAvailable();
KernelTransaction kernelTx = dataSource.kernelAPI.get().beginTransaction( type, loginContext, timeout );
kernelTx.registerCloseListener(
txId -> platform.threadToTransactionBridge.unbindTransactionFromCurrentThread() );
platform.threadToTransactionBridge.bindTransactionToCurrentThread( kernelTx );
kernelTx.registerCloseListener( txId -> threadToTransactionBridge.unbindTransactionFromCurrentThread() );
threadToTransactionBridge.bindTransactionToCurrentThread( kernelTx );
return kernelTx;
}
catch ( TransactionFailureException e )
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.neo4j.kernel.impl.core.DefaultRelationshipTypeCreator;
import org.neo4j.kernel.impl.core.DelegatingTokenHolder;
import org.neo4j.kernel.impl.core.ReadOnlyTokenCreator;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.core.TokenCreator;
import org.neo4j.kernel.impl.core.TokenHolder;
import org.neo4j.kernel.impl.core.TokenHolders;
Expand Down Expand Up @@ -102,6 +103,8 @@ public CommunityEditionModule( PlatformModule platformModule )

idTypeConfigurationProvider = createIdTypeConfigurationProvider( config );
eligibleForIdReuse = IdReuseEligibility.ALWAYS;
threadToTransactionBridge =
dependencies.satisfyDependency( new ThreadToStatementContextBridge( getGlobalAvailabilityGuard( platformModule.clock, logging ) ) );

createIdComponents( platformModule, dependencies, createIdGeneratorFactory( fileSystem, idTypeConfigurationProvider ) );
dependencies.satisfyDependency( idGeneratorFactory );
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class DataSourceModule

public final CoreAPIAvailabilityGuard coreAPIAvailabilityGuard;

public DataSourceModule( String databaseName, final PlatformModule platformModule, EditionModule editionModule, Procedures procedures,
public DataSourceModule( String databaseName, PlatformModule platformModule, EditionModule editionModule, Procedures procedures,
GraphDatabaseFacade graphDatabaseFacade )
{

Expand All @@ -53,7 +53,8 @@ public DataSourceModule( String databaseName, final PlatformModule platformModul
this.storeId = neoStoreDataSource::getStoreId;
this.kernelAPI = neoStoreDataSource::getKernel;

ProcedureGDSFactory gdsFactory = new ProcedureGDSFactory( platformModule, this, coreAPIAvailabilityGuard, context.getTokenHolders() );
ProcedureGDSFactory gdsFactory =
new ProcedureGDSFactory( platformModule, this, coreAPIAvailabilityGuard, context.getTokenHolders(), editionModule.threadToTransactionBridge );
procedures.registerComponent( GraphDatabaseService.class, gdsFactory::apply, true );
}

Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.neo4j.kernel.impl.api.CommitProcessFactory;
import org.neo4j.kernel.impl.api.SchemaWriteGuard;
import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.core.TokenHolders;
import org.neo4j.kernel.impl.factory.AccessCapability;
import org.neo4j.kernel.impl.factory.DatabaseInfo;
Expand Down Expand Up @@ -123,6 +124,8 @@ public abstract class EditionModule

public NetworkConnectionTracker connectionTracker;

public ThreadToStatementContextBridge threadToTransactionBridge;

private final DatabaseTransactionStats databaseStatistics = new DatabaseTransactionStats();

protected AvailabilityGuard globalAvailabilityGuard;
Expand Down
Expand Up @@ -43,7 +43,6 @@
import org.neo4j.kernel.impl.api.LogRotationMonitor;
import org.neo4j.kernel.impl.context.TransactionVersionContextSupplier;
import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.StoreLogService;
Expand Down Expand Up @@ -124,8 +123,6 @@ public class PlatformModule

public final JobScheduler jobScheduler;

public final ThreadToStatementContextBridge threadToTransactionBridge;

public final SystemNanoClock clock;

public final VersionContextSupplier versionContextSupplier;
Expand Down Expand Up @@ -207,8 +204,6 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa

dependencies.satisfyDependency( dataSourceManager );

threadToTransactionBridge = dependencies.satisfyDependency( new ThreadToStatementContextBridge() );

kernelExtensionFactories = externalDependencies.kernelExtensions();
engineProviders = externalDependencies.executionEngines();
globalKernelExtensions = dependencies.satisfyDependency(
Expand Down

0 comments on commit 84a8af5

Please sign in to comment.