Skip to content

Commit

Permalink
Revert "Revert "Revert "Revert "Fixes race between starting transacti…
Browse files Browse the repository at this point in the history
…on and shutdown""""
  • Loading branch information
ragadeeshu committed Jul 6, 2017
1 parent 47e2af3 commit 2c4d750
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 36 deletions.
Expand Up @@ -21,13 +21,14 @@


import java.time.Clock; import java.time.Clock;


import org.neo4j.kernel.AvailabilityGuard.AvailabilityRequirement;
import org.neo4j.kernel.impl.transaction.TransactionStats; import org.neo4j.kernel.impl.transaction.TransactionStats;
import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos; import static java.util.concurrent.locks.LockSupport.parkNanos;
import static org.neo4j.kernel.AvailabilityGuard.AvailabilityRequirement;
import static org.neo4j.kernel.AvailabilityGuard.availabilityRequirement; import static org.neo4j.kernel.AvailabilityGuard.availabilityRequirement;


/** /**
Expand Down Expand Up @@ -92,6 +93,5 @@ private void awaitTransactionsClosedWithinTimeout()
public void shutdown() public void shutdown()
throws Throwable throws Throwable
{ {
// TODO: Starting database. Make sure none can access it through lock or CAS
} }
} }
Expand Up @@ -735,7 +735,7 @@ private NeoStoreKernelModule buildKernel( TransactionAppender appender,
availabilityGuard, tracers, storageEngine, procedures, transactionIdStore, clock, accessCapability ) ); availabilityGuard, tracers, storageEngine, procedures, transactionIdStore, clock, accessCapability ) );


final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures, final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures,
config ); config, availabilityGuard );


kernel.registerTransactionHook( transactionEventHandlers ); kernel.registerTransactionHook( transactionEventHandlers );


Expand Down
Expand Up @@ -20,10 +20,13 @@
package org.neo4j.kernel.impl.api; package org.neo4j.kernel.impl.api;


import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.AvailabilityGuard.UnavailableException;
import org.neo4j.kernel.api.KernelAPI; import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.TransactionHook; import org.neo4j.kernel.api.TransactionHook;
import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.CallableUserAggregationFunction; import org.neo4j.kernel.api.proc.CallableUserAggregationFunction;
Expand Down Expand Up @@ -64,17 +67,22 @@ public class Kernel extends LifecycleAdapter implements KernelAPI
private final DatabaseHealth health; private final DatabaseHealth health;
private final TransactionMonitor transactionMonitor; private final TransactionMonitor transactionMonitor;
private final Procedures procedures; private final Procedures procedures;
private final AvailabilityGuard availabilityGuard;
private final long defaultTransactionTimeout; private final long defaultTransactionTimeout;
private final long transactionStartTimeout;


public Kernel( KernelTransactions transactionFactory, TransactionHooks hooks, DatabaseHealth health, public Kernel( KernelTransactions transactionFactory, TransactionHooks hooks, DatabaseHealth health,
TransactionMonitor transactionMonitor, Procedures procedures, Config config ) TransactionMonitor transactionMonitor, Procedures procedures, Config config,
AvailabilityGuard availabilityGuard )
{ {
this.transactions = transactionFactory; this.transactions = transactionFactory;
this.hooks = hooks; this.hooks = hooks;
this.health = health; this.health = health;
this.transactionMonitor = transactionMonitor; this.transactionMonitor = transactionMonitor;
this.procedures = procedures; this.procedures = procedures;
this.availabilityGuard = availabilityGuard;
this.defaultTransactionTimeout = config.get( GraphDatabaseSettings.transaction_timeout ).toMillis(); this.defaultTransactionTimeout = config.get( GraphDatabaseSettings.transaction_timeout ).toMillis();
this.transactionStartTimeout = config.get( GraphDatabaseSettings.transaction_start_timeout ).toMillis();
} }


@Override @Override
Expand All @@ -88,9 +96,21 @@ public KernelTransaction newTransaction( KernelTransaction.Type type, SecurityCo
TransactionFailureException TransactionFailureException
{ {
health.assertHealthy( TransactionFailureException.class ); health.assertHealthy( TransactionFailureException.class );
KernelTransaction transaction = transactions.newInstance( type, securityContext, timeout );
// Increment transaction monitor before checking the availability guard (which needs to be checked
// before starting the transaction). This is because the DatabaseAvailability does this in the other
// order, where it closes the availability guard and then awaits started transactions to finish.
transactionMonitor.transactionStarted(); transactionMonitor.transactionStarted();
return transaction; try
{
availabilityGuard.await( transactionStartTimeout );
}
catch ( UnavailableException e )
{
transactionMonitor.transactionFinished( false, false );
throw new TransactionFailureException( Status.Transaction.TransactionStartFailed, e, e.getMessage() );
}
return transactions.newInstance( type, securityContext, timeout );
} }


@Override @Override
Expand Down
Expand Up @@ -322,7 +322,7 @@ private void assertCurrentThreadIsNotBlockingNewTransactions()


private class KernelTransactionImplementationFactory implements Factory<KernelTransactionImplementation> private class KernelTransactionImplementationFactory implements Factory<KernelTransactionImplementation>
{ {
private Set<KernelTransactionImplementation> transactions; private final Set<KernelTransactionImplementation> transactions;


KernelTransactionImplementationFactory( Set<KernelTransactionImplementation> transactions ) KernelTransactionImplementationFactory( Set<KernelTransactionImplementation> transactions )
{ {
Expand All @@ -345,7 +345,7 @@ public KernelTransactionImplementation newInstance()


private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation> private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation>
{ {
private Set<KernelTransactionImplementation> transactions; private final Set<KernelTransactionImplementation> transactions;


GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions, GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions,
Factory<KernelTransactionImplementation> factory ) Factory<KernelTransactionImplementation> factory )
Expand Down
Expand Up @@ -50,11 +50,16 @@ public void assertDatabaseAvailable()
} }
catch ( AvailabilityGuard.UnavailableException e ) catch ( AvailabilityGuard.UnavailableException e )
{ {
if ( guard.isShutdown() ) throw convertUnavailabilityException( e );
{
throw new DatabaseShutdownException();
}
throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage() );
} }
} }

public RuntimeException convertUnavailabilityException( Exception e )
{
if ( guard.isShutdown() )
{
return new DatabaseShutdownException();
}
return new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e );
}
} }
Expand Up @@ -154,8 +154,8 @@ public void shutdown()
{ {
try try
{ {
msgLog.log( "Shutdown started" );
platform.availabilityGuard.shutdown(); platform.availabilityGuard.shutdown();
msgLog.log( "Shutdown started" );
platform.life.shutdown(); platform.life.shutdown();
} }
catch ( LifecycleException throwable ) catch ( LifecycleException throwable )
Expand All @@ -170,7 +170,6 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
{ {
try try
{ {
availability.assertDatabaseAvailable();
KernelTransaction kernelTx = dataSource.kernelAPI.get().newTransaction( type, securityContext, timeout ); KernelTransaction kernelTx = dataSource.kernelAPI.get().newTransaction( type, securityContext, timeout );
kernelTx.registerCloseListener( kernelTx.registerCloseListener(
(txId) -> dataSource.threadToTransactionBridge.unbindTransactionFromCurrentThread() ); (txId) -> dataSource.threadToTransactionBridge.unbindTransactionFromCurrentThread() );
Expand All @@ -179,7 +178,7 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
} }
catch ( TransactionFailureException e ) catch ( TransactionFailureException e )
{ {
throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e ); throw availability.convertUnavailabilityException( e );
} }
} }


Expand Down
Expand Up @@ -105,6 +105,7 @@
import org.neo4j.storageengine.api.EntityType; import org.neo4j.storageengine.api.EntityType;


import static java.lang.String.format; import static java.lang.String.format;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.map; import static org.neo4j.collection.primitive.PrimitiveLongCollections.map;
import static org.neo4j.helpers.collection.Iterators.emptyIterator; import static org.neo4j.helpers.collection.Iterators.emptyIterator;
import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED; import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED;
Expand Down
Expand Up @@ -183,8 +183,7 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
// Anyways please fix this. // Anyways please fix this.
dependencies.satisfyDependency( dataSourceManager ); dependencies.satisfyDependency( dataSourceManager );


availabilityGuard = dependencies.satisfyDependency( availabilityGuard = dependencies.satisfyDependency( createAvailabilityGuard() );
new AvailabilityGuard( clock, logging.getInternalLog( AvailabilityGuard.class ) ) );


transactionMonitor = dependencies.satisfyDependency( createTransactionStats() ); transactionMonitor = dependencies.satisfyDependency( createTransactionStats() );


Expand All @@ -200,6 +199,11 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
publishPlatformInfo( dependencies.resolveDependency( UsageData.class ) ); publishPlatformInfo( dependencies.resolveDependency( UsageData.class ) );
} }


protected AvailabilityGuard createAvailabilityGuard()
{
return new AvailabilityGuard( clock, logging.getInternalLog( AvailabilityGuard.class ) );
}

protected SystemNanoClock createClock() protected SystemNanoClock createClock()
{ {
return Clocks.nanoClock(); return Clocks.nanoClock();
Expand Down
Expand Up @@ -186,7 +186,6 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
{ {
try try
{ {
availability.assertDatabaseAvailable();
KernelTransaction kernelTx = sourceModule.kernelAPI.get().newTransaction( type, this.securityContext, timeout ); KernelTransaction kernelTx = sourceModule.kernelAPI.get().newTransaction( type, this.securityContext, timeout );
kernelTx.registerCloseListener( kernelTx.registerCloseListener(
( txId ) -> sourceModule.threadToTransactionBridge.unbindTransactionFromCurrentThread() ); ( txId ) -> sourceModule.threadToTransactionBridge.unbindTransactionFromCurrentThread() );
Expand All @@ -195,7 +194,7 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
} }
catch ( TransactionFailureException e ) catch ( TransactionFailureException e )
{ {
throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e ); throw availability.convertUnavailabilityException( e );
} }
} }
} }
Expand Up @@ -19,13 +19,21 @@
*/ */
package org.neo4j.kernel.impl.api; package org.neo4j.kernel.impl.api;


import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.RuleChain;
import org.mockito.InOrder;


import java.io.File; import java.io.File;
import java.time.Clock;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement; import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException; import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
Expand All @@ -36,16 +44,34 @@
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory; import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory;
import org.neo4j.kernel.impl.factory.PlatformModule; import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.NullLog;
import org.neo4j.test.ImpermanentGraphDatabase; import org.neo4j.test.ImpermanentGraphDatabase;

import org.neo4j.test.rule.NeoStoreDataSourceRule;
import org.neo4j.test.rule.PageCacheAndDependenciesRule;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;

import static org.neo4j.kernel.api.schema.SchemaDescriptorFactory.forLabel; import static org.neo4j.kernel.api.schema.SchemaDescriptorFactory.forLabel;
import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED;
import static org.neo4j.kernel.api.KernelTransaction.Type.explicit;


public class KernelTest public class KernelTest
{ {
private final PageCacheAndDependenciesRule pageCacheRule = new PageCacheAndDependenciesRule();
private final NeoStoreDataSourceRule neoStoreRule = new NeoStoreDataSourceRule();

@Rule
public final RuleChain rules = RuleChain.outerRule( pageCacheRule ).around( neoStoreRule );

@Test @Test
public void shouldNotAllowCreationOfConstraintsWhenInHA() throws Exception public void shouldNotAllowCreationOfConstraintsWhenInHA() throws Exception
{ {
Expand All @@ -72,6 +98,29 @@ public void shouldNotAllowCreationOfConstraintsWhenInHA() throws Exception
db.shutdown(); db.shutdown();
} }


@Test
public void shouldIncrementTransactionMonitorBeforeCheckingDatabaseAvailability() throws Exception
{
// GIVEN
AvailabilityGuard availabilityGuard = spy( new AvailabilityGuard( Clock.systemUTC(), NullLog.getInstance() ) );
TransactionMonitor transactionMonitor = mock( TransactionMonitor.class );
Dependencies dependencies = new Dependencies();
dependencies.satisfyDependencies( availabilityGuard, transactionMonitor );
NeoStoreDataSource dataSource = neoStoreRule.getDataSource( pageCacheRule.directory().absolutePath(),
pageCacheRule.fileSystem(), pageCacheRule.pageCache(), dependencies );
dataSource.start();
KernelAPI kernel = dataSource.getKernel();

// WHEN
try ( KernelTransaction tx = kernel.newTransaction( explicit, AUTH_DISABLED ) )
{
// THEN
InOrder order = inOrder( transactionMonitor, availabilityGuard );
order.verify( transactionMonitor, times( 1 ) ).transactionStarted();
order.verify( availabilityGuard, times( 1 ) ).await( anyLong() );
}
}

@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
class FakeHaDatabase extends ImpermanentGraphDatabase class FakeHaDatabase extends ImpermanentGraphDatabase
{ {
Expand All @@ -96,8 +145,8 @@ protected SchemaWriteGuard createSchemaWriteGuard()
new GraphDatabaseFacadeFactory( DatabaseInfo.COMMUNITY, factory ) new GraphDatabaseFacadeFactory( DatabaseInfo.COMMUNITY, factory )
{ {
@Override @Override
protected PlatformModule createPlatform( File storeDir, Config config, Dependencies dependencies, protected PlatformModule createPlatform( File storeDir, Config config,
GraphDatabaseFacade graphDatabaseFacade ) GraphDatabaseFacadeFactory.Dependencies dependencies, GraphDatabaseFacade graphDatabaseFacade )
{ {
return new ImpermanentPlatformModule( storeDir, config, databaseInfo, dependencies, return new ImpermanentPlatformModule( storeDir, config, databaseInfo, dependencies,
graphDatabaseFacade ); graphDatabaseFacade );
Expand Down

0 comments on commit 2c4d750

Please sign in to comment.