Skip to content

Commit

Permalink
Small cleanup around bolt tx state machine
Browse files Browse the repository at this point in the history
Simplified lifecycle and removed dependency of `TransactionStateMachineSPI`
on `GraphDatabaseQueryService`. It was unnecessary because component
already had dependency on `GraphDatabaseAPI` which is essentially the
same thing. SPI will also resolve all needed dependencies using the
provided database.
  • Loading branch information
lutovich committed May 28, 2018
1 parent a4b79ae commit d058d96
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 143 deletions.
Expand Up @@ -47,12 +47,12 @@
import org.neo4j.bolt.v1.runtime.BoltFactoryImpl; import org.neo4j.bolt.v1.runtime.BoltFactoryImpl;
import org.neo4j.configuration.Description; import org.neo4j.configuration.Description;
import org.neo4j.configuration.LoadableConfig; import org.neo4j.configuration.LoadableConfig;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.config.Setting; import org.neo4j.graphdb.config.Setting;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.ListenSocketAddress; import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.Service; import org.neo4j.helpers.Service;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker; import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.api.security.AuthManager; import org.neo4j.kernel.api.security.AuthManager;
import org.neo4j.kernel.api.security.UserManagerSupplier; import org.neo4j.kernel.api.security.UserManagerSupplier;
Expand All @@ -61,7 +61,6 @@
import org.neo4j.kernel.configuration.ConnectorPortRegister; import org.neo4j.kernel.configuration.ConnectorPortRegister;
import org.neo4j.kernel.configuration.ssl.SslPolicyLoader; import org.neo4j.kernel.configuration.ssl.SslPolicyLoader;
import org.neo4j.kernel.extension.KernelExtensionFactory; import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.spi.KernelContext; import org.neo4j.kernel.impl.spi.KernelContext;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
Expand Down Expand Up @@ -96,15 +95,15 @@ public interface Dependencies


Config config(); Config config();


GraphDatabaseService db(); GraphDatabaseAPI db();


JobScheduler scheduler(); JobScheduler scheduler();


UsageData usageData(); UsageData usageData();


Monitors monitors(); Monitors monitors();


ThreadToStatementContextBridge txBridge(); AvailabilityGuard availabilityGuard();


BoltConnectionTracker sessionTracker(); BoltConnectionTracker sessionTracker();


Expand All @@ -130,8 +129,6 @@ public BoltKernelExtension()
public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
{ {
Config config = dependencies.config(); Config config = dependencies.config();
GraphDatabaseService gdb = dependencies.db();
GraphDatabaseAPI api = (GraphDatabaseAPI) gdb;
LogService logService = dependencies.logService(); LogService logService = dependencies.logService();
Clock clock = dependencies.clock(); Clock clock = dependencies.clock();
SslPolicyLoader sslPolicyFactory = dependencies.sslPolicyFactory(); SslPolicyLoader sslPolicyFactory = dependencies.sslPolicyFactory();
Expand All @@ -150,8 +147,8 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies )


TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config, clock ); TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config, clock );


BoltFactory boltFactory = life.add( new BoltFactoryImpl( api, dependencies.usageData(), BoltFactory boltFactory = new BoltFactoryImpl( dependencies.db(), dependencies.usageData(), dependencies.availabilityGuard(),
logService, dependencies.txBridge(), authentication, dependencies.sessionTracker(), config ) ); authentication, dependencies.sessionTracker(), config, logService );
BoltSchedulerProvider boltSchedulerProvider = BoltSchedulerProvider boltSchedulerProvider =
life.add( new ExecutorBoltSchedulerProvider( config, new CachedThreadPoolExecutorFactory( log ), scheduler, logService ) ); life.add( new ExecutorBoltSchedulerProvider( config, new CachedThreadPoolExecutorFactory( log ), scheduler, logService ) );
BoltConnectionFactory boltConnectionFactory = BoltConnectionFactory boltConnectionFactory =
Expand Down
Expand Up @@ -24,63 +24,36 @@


import org.neo4j.bolt.BoltChannel; import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.security.auth.Authentication; import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker; import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageData;


public class BoltFactoryImpl extends LifecycleAdapter implements BoltFactory public class BoltFactoryImpl implements BoltFactory
{ {
private final GraphDatabaseAPI gds; private final GraphDatabaseAPI db;
private final UsageData usageData; private final UsageData usageData;
private final AvailabilityGuard availabilityGuard;
private final LogService logging; private final LogService logging;
private final Authentication authentication; private final Authentication authentication;
private final BoltConnectionTracker connectionTracker; private final BoltConnectionTracker connectionTracker;
private final ThreadToStatementContextBridge txBridge;
private final Config config; private final Config config;


private QueryExecutionEngine queryExecutionEngine; public BoltFactoryImpl( GraphDatabaseAPI db, UsageData usageData, AvailabilityGuard availabilityGuard,
private GraphDatabaseQueryService queryService; Authentication authentication, BoltConnectionTracker connectionTracker, Config config, LogService logging )
private AvailabilityGuard availabilityGuard;

public BoltFactoryImpl( GraphDatabaseAPI gds, UsageData usageData, LogService logging,
ThreadToStatementContextBridge txBridge, Authentication authentication,
BoltConnectionTracker connectionTracker, Config config )
{ {
this.gds = gds; this.db = db;
this.usageData = usageData; this.usageData = usageData;
this.availabilityGuard = availabilityGuard;
this.logging = logging; this.logging = logging;
this.txBridge = txBridge;
this.authentication = authentication; this.authentication = authentication;
this.connectionTracker = connectionTracker; this.connectionTracker = connectionTracker;
this.config = config; this.config = config;
} }


@Override
public void start()
{
DependencyResolver dependencyResolver = gds.getDependencyResolver();
queryExecutionEngine = dependencyResolver.resolveDependency( QueryExecutionEngine.class );
queryService = dependencyResolver.resolveDependency( GraphDatabaseQueryService.class );
availabilityGuard = dependencyResolver.resolveDependency( AvailabilityGuard.class );
}

@Override
public void stop()
{
queryExecutionEngine = null;
queryService = null;
availabilityGuard = null;
}

@Override @Override
public BoltStateMachine newMachine( BoltChannel boltChannel, Clock clock ) public BoltStateMachine newMachine( BoltChannel boltChannel, Clock clock )
{ {
Expand All @@ -95,7 +68,6 @@ private TransactionStateMachine.SPI createTxSpi( Clock clock )
long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis(); long bookmarkReadyTimeout = config.get( GraphDatabaseSettings.bookmark_ready_timeout ).toMillis();
Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout ); Duration txAwaitDuration = Duration.ofMillis( bookmarkReadyTimeout );


return new TransactionStateMachineSPI( gds, txBridge, queryExecutionEngine, return new TransactionStateMachineSPI( db, availabilityGuard, txAwaitDuration, clock );
availabilityGuard, queryService, txAwaitDuration, clock );
} }
} }
Expand Up @@ -28,11 +28,11 @@
import org.neo4j.cypher.internal.javacompat.QueryResultProvider; import org.neo4j.cypher.internal.javacompat.QueryResultProvider;
import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Result;
import org.neo4j.internal.kernel.api.exceptions.KernelException; import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.internal.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.internal.kernel.api.security.LoginContext; import org.neo4j.internal.kernel.api.security.LoginContext;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.GraphDatabaseQueryService; import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.internal.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.txtracking.TransactionIdTracker; import org.neo4j.kernel.api.txtracking.TransactionIdTracker;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction; import org.neo4j.kernel.impl.coreapi.InternalTransaction;
Expand All @@ -53,33 +53,23 @@


class TransactionStateMachineSPI implements TransactionStateMachine.SPI class TransactionStateMachineSPI implements TransactionStateMachine.SPI
{ {
private static final PropertyContainerLocker locker = new PropertyContainerLocker();

private final GraphDatabaseAPI db; private final GraphDatabaseAPI db;
private final ThreadToStatementContextBridge txBridge; private final ThreadToStatementContextBridge txBridge;
private final QueryExecutionEngine queryExecutionEngine; private final QueryExecutionEngine queryExecutionEngine;
private final TransactionIdTracker transactionIdTracker; private final TransactionIdTracker transactionIdTracker;
private static final PropertyContainerLocker locker = new PropertyContainerLocker();
private final TransactionalContextFactory contextFactory; private final TransactionalContextFactory contextFactory;
private final GraphDatabaseQueryService queryService;
private final Duration txAwaitDuration; private final Duration txAwaitDuration;
private final Clock clock; private final Clock clock;


TransactionStateMachineSPI( GraphDatabaseAPI db, TransactionStateMachineSPI( GraphDatabaseAPI db, AvailabilityGuard availabilityGuard, Duration txAwaitDuration, Clock clock )
ThreadToStatementContextBridge txBridge,
QueryExecutionEngine queryExecutionEngine,
AvailabilityGuard availabilityGuard,
GraphDatabaseQueryService queryService,
Duration txAwaitDuration,
Clock clock )
{ {
this.db = db; this.db = db;
this.txBridge = txBridge; this.txBridge = db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class );
this.queryExecutionEngine = queryExecutionEngine; this.queryExecutionEngine = db.getDependencyResolver().resolveDependency( QueryExecutionEngine.class );

this.transactionIdTracker = newTransactionIdTracker( db, availabilityGuard );
Supplier<TransactionIdStore> transactionIdStoreSupplier = db.getDependencyResolver().provideDependency( TransactionIdStore.class ); this.contextFactory = newTransactionalContextFactory( db );
this.transactionIdTracker = new TransactionIdTracker( transactionIdStoreSupplier, availabilityGuard );

this.contextFactory = Neo4jTransactionalContextFactory.create( queryService, locker );
this.queryService = queryService;
this.txAwaitDuration = txAwaitDuration; this.txAwaitDuration = txAwaitDuration;
this.clock = clock; this.clock = clock;
} }
Expand Down Expand Up @@ -125,7 +115,7 @@ public boolean isPeriodicCommit( String query )
public BoltResultHandle executeQuery( BoltQuerySource querySource, public BoltResultHandle executeQuery( BoltQuerySource querySource,
LoginContext loginContext, String statement, MapValue params ) LoginContext loginContext, String statement, MapValue params )
{ {
InternalTransaction internalTransaction = queryService.beginTransaction( implicit, loginContext ); InternalTransaction internalTransaction = db.beginTransaction( implicit, loginContext );
ClientConnectionInfo sourceDetails = new BoltConnectionInfo( querySource.principalName, ClientConnectionInfo sourceDetails = new BoltConnectionInfo( querySource.principalName,
querySource.clientName, querySource.clientName,
querySource.connectionDescriptor.clientAddress(), querySource.connectionDescriptor.clientAddress(),
Expand Down Expand Up @@ -178,4 +168,16 @@ public void terminate()


}; };
} }

private static TransactionIdTracker newTransactionIdTracker( GraphDatabaseAPI db, AvailabilityGuard guard )
{
Supplier<TransactionIdStore> transactionIdStoreSupplier = db.getDependencyResolver().provideDependency( TransactionIdStore.class );
return new TransactionIdTracker( transactionIdStoreSupplier, guard );
}

private static TransactionalContextFactory newTransactionalContextFactory( GraphDatabaseAPI db )
{
GraphDatabaseQueryService queryService = db.getDependencyResolver().resolveDependency( GraphDatabaseQueryService.class );
return Neo4jTransactionalContextFactory.create( queryService, locker );
}
} }
Expand Up @@ -30,88 +30,29 @@
import org.neo4j.kernel.GraphDatabaseQueryService; import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker; import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.NullLog;
import org.neo4j.test.OnDemandJobScheduler; import org.neo4j.test.OnDemandJobScheduler;
import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageData;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;


public class BoltFactoryImplTest public class BoltFactoryImplTest
{ {
private static final Clock CLOCK = Clock.systemUTC(); private static final Clock CLOCK = Clock.systemUTC();
private static final BoltChannel boltChannel = mock( BoltChannel.class ); private static final BoltChannel CHANNEL = mock( BoltChannel.class );


@Test @Test
public void newMachineThrowsWhenNotStarted() public void shouldCreateBoltStateMachines()
{ {
BoltFactoryImpl boltFactory = newBoltFactory(); BoltFactoryImpl factory = newBoltFactory();


try BoltStateMachine boltStateMachine = factory.newMachine( CHANNEL, CLOCK );
{
boltFactory.newMachine( boltChannel, CLOCK );
fail( "Exception expected" );
}
catch ( Exception e )
{
// expected
}
}

@Test
public void newMachineThrowsWhenStopped()
{
BoltFactoryImpl boltFactory = newBoltFactory();

boltFactory.start();

BoltStateMachine stateMachine = boltFactory.newMachine( boltChannel, CLOCK );

assertNotNull( stateMachine );

boltFactory.stop();

try
{
boltFactory.newMachine( boltChannel, CLOCK );
fail( "Exception expected" );
}
catch ( Exception e )
{
// expected
}
}

@Test
public void txIdStoreRefreshedAfterRestart()
{
GraphDatabaseAPI db = newDbMock();
DependencyResolver dependencyResolver = db.getDependencyResolver();
TransactionIdStore txIdStoreBeforeRestart = mock( TransactionIdStore.class );
when( txIdStoreBeforeRestart.getLastClosedTransactionId() ).thenReturn( 42L );
TransactionIdStore txIdStoreAfterRestart = mock( TransactionIdStore.class );
when( txIdStoreAfterRestart.getLastClosedTransactionId() ).thenReturn( 4242L );
when( dependencyResolver.provideDependency( TransactionIdStore.class ) )
.thenReturn( () -> txIdStoreBeforeRestart ).thenReturn( () -> txIdStoreAfterRestart );

BoltFactoryImpl boltFactory = newBoltFactory( db );

boltFactory.start();

BoltStateMachine stateMachine1 = boltFactory.newMachine( boltChannel, CLOCK );
assertEquals( 42, stateMachine1.spi.transactionSpi().newestEncounteredTxId() );

boltFactory.stop();
boltFactory.start();


BoltStateMachine stateMachine2 = boltFactory.newMachine( boltChannel, CLOCK ); assertNotNull( boltStateMachine );
assertEquals( 4242, stateMachine2.spi.transactionSpi().newestEncounteredTxId() );
} }


private static BoltFactoryImpl newBoltFactory() private static BoltFactoryImpl newBoltFactory()
Expand All @@ -121,9 +62,8 @@ private static BoltFactoryImpl newBoltFactory()


private static BoltFactoryImpl newBoltFactory( GraphDatabaseAPI db ) private static BoltFactoryImpl newBoltFactory( GraphDatabaseAPI db )
{ {
return new BoltFactoryImpl( db, new UsageData( new OnDemandJobScheduler() ), NullLogService.getInstance(), return new BoltFactoryImpl( db, new UsageData( new OnDemandJobScheduler() ), new AvailabilityGuard( CLOCK, NullLog.getInstance() ),
new ThreadToStatementContextBridge( mock( AvailabilityGuard.class ) ), mock( Authentication.class ), BoltConnectionTracker.NOOP, mock( Authentication.class ), BoltConnectionTracker.NOOP, Config.defaults(), NullLogService.getInstance() );
Config.defaults() );
} }


private static GraphDatabaseAPI newDbMock() private static GraphDatabaseAPI newDbMock()
Expand Down
Expand Up @@ -124,17 +124,21 @@ private static TransactionStateMachineSPI createTxSpi( Supplier<TransactionIdSto
private static TransactionStateMachineSPI createTxSpi( Supplier<TransactionIdStore> txIdStore, Duration txAwaitDuration, private static TransactionStateMachineSPI createTxSpi( Supplier<TransactionIdStore> txIdStore, Duration txAwaitDuration,
AvailabilityGuard availabilityGuard, Clock clock ) AvailabilityGuard availabilityGuard, Clock clock )
{ {
GraphDatabaseQueryService queryService = mock( GraphDatabaseQueryService.class ); QueryExecutionEngine queryExecutionEngine = mock( QueryExecutionEngine.class );

DependencyResolver dependencyResolver = mock( DependencyResolver.class ); DependencyResolver dependencyResolver = mock( DependencyResolver.class );
GraphDatabaseAPI db = mock( GraphDatabaseAPI.class ); ThreadToStatementContextBridge bridge = new ThreadToStatementContextBridge( availabilityGuard );
when( dependencyResolver.resolveDependency( ThreadToStatementContextBridge.class ) ).thenReturn( bridge );
when( dependencyResolver.resolveDependency( QueryExecutionEngine.class ) ).thenReturn( queryExecutionEngine );
when( dependencyResolver.provideDependency( TransactionIdStore.class ) ).thenReturn( txIdStore );


when( queryService.getDependencyResolver() ).thenReturn( dependencyResolver ); GraphDatabaseAPI db = mock( GraphDatabaseAPI.class );
when( db.getDependencyResolver() ).thenReturn( dependencyResolver ); when( db.getDependencyResolver() ).thenReturn( dependencyResolver );


when(dependencyResolver.provideDependency( TransactionIdStore.class )).thenReturn( txIdStore ); GraphDatabaseQueryService queryService = mock( GraphDatabaseQueryService.class );
when( queryService.getDependencyResolver() ).thenReturn( dependencyResolver );
when( dependencyResolver.resolveDependency( GraphDatabaseQueryService.class ) ).thenReturn( queryService );


return new TransactionStateMachineSPI( db, new ThreadToStatementContextBridge( mock( AvailabilityGuard.class ) ), return new TransactionStateMachineSPI( db, availabilityGuard, txAwaitDuration, clock );
mock( QueryExecutionEngine.class ), availabilityGuard, queryService, txAwaitDuration,
clock );
} }
} }
Expand Up @@ -40,11 +40,11 @@
import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.config.Setting; import org.neo4j.graphdb.config.Setting;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker; import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.api.security.AuthManager; import org.neo4j.kernel.api.security.AuthManager;
import org.neo4j.kernel.api.security.UserManagerSupplier; import org.neo4j.kernel.api.security.UserManagerSupplier;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
Expand Down Expand Up @@ -75,13 +75,12 @@ public void evaluate() throws Throwable
boltFactory = new BoltFactoryImpl( boltFactory = new BoltFactoryImpl(
gdb, gdb,
new UsageData( null ), new UsageData( null ),
NullLogService.getInstance(), resolver.resolveDependency( AvailabilityGuard.class ),
resolver.resolveDependency( ThreadToStatementContextBridge.class ),
authentication, authentication,
BoltConnectionTracker.NOOP, BoltConnectionTracker.NOOP,
Config.defaults() Config.defaults(),
NullLogService.getInstance()
); );
boltFactory.start();
try try
{ {
base.evaluate(); base.evaluate();
Expand Down

0 comments on commit d058d96

Please sign in to comment.