Skip to content

Commit

Permalink
Merge pull request #9618 from ragadeeshu/3.2-revert-revert
Browse files Browse the repository at this point in the history
Revert "Revert "Fixes race between starting transaction and shutdown""
  • Loading branch information
ragadeeshu committed Jul 5, 2017
2 parents b38ab2f + 40f2643 commit 400157e
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@

import java.time.Clock;

import org.neo4j.kernel.AvailabilityGuard.AvailabilityRequirement;
import org.neo4j.kernel.impl.transaction.TransactionStats;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.time.Clocks;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos;
import static org.neo4j.kernel.AvailabilityGuard.AvailabilityRequirement;

import static org.neo4j.kernel.AvailabilityGuard.availabilityRequirement;

/**
Expand Down Expand Up @@ -92,6 +93,5 @@ private void awaitTransactionsClosedWithinTimeout()
public void shutdown()
throws Throwable
{
// TODO: Starting database. Make sure none can access it through lock or CAS
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ private NeoStoreKernelModule buildKernel( TransactionAppender appender,
availabilityGuard, tracers, storageEngine, procedures, transactionIdStore, clock, accessCapability ) );

final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures,
config );
config, availabilityGuard );

kernel.registerTransactionHook( transactionEventHandlers );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
package org.neo4j.kernel.impl.api;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.AvailabilityGuard.UnavailableException;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.TransactionHook;
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.CallableUserAggregationFunction;
Expand Down Expand Up @@ -64,17 +67,22 @@ public class Kernel extends LifecycleAdapter implements KernelAPI
private final DatabaseHealth health;
private final TransactionMonitor transactionMonitor;
private final Procedures procedures;
private final AvailabilityGuard availabilityGuard;
private final long defaultTransactionTimeout;
private final long transactionStartTimeout;

public Kernel( KernelTransactions transactionFactory, TransactionHooks hooks, DatabaseHealth health,
TransactionMonitor transactionMonitor, Procedures procedures, Config config )
TransactionMonitor transactionMonitor, Procedures procedures, Config config,
AvailabilityGuard availabilityGuard )
{
this.transactions = transactionFactory;
this.hooks = hooks;
this.health = health;
this.transactionMonitor = transactionMonitor;
this.procedures = procedures;
this.availabilityGuard = availabilityGuard;
this.defaultTransactionTimeout = config.get( GraphDatabaseSettings.transaction_timeout ).toMillis();
this.transactionStartTimeout = config.get( GraphDatabaseSettings.transaction_start_timeout ).toMillis();
}

@Override
Expand All @@ -88,9 +96,21 @@ public KernelTransaction newTransaction( KernelTransaction.Type type, SecurityCo
TransactionFailureException
{
health.assertHealthy( TransactionFailureException.class );
KernelTransaction transaction = transactions.newInstance( type, securityContext, timeout );

// Increment transaction monitor before checking the availability guard (which needs to be checked
// before starting the transaction). This is because the DatabaseAvailability does this in the other
// order, where it closes the availability guard and then awaits started transactions to finish.
transactionMonitor.transactionStarted();
return transaction;
try
{
availabilityGuard.await( transactionStartTimeout );
}
catch ( UnavailableException e )
{
transactionMonitor.transactionFinished( false, false );
throw new TransactionFailureException( Status.Transaction.TransactionStartFailed, e, e.getMessage() );
}
return transactions.newInstance( type, securityContext, timeout );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private void assertCurrentThreadIsNotBlockingNewTransactions()

private class KernelTransactionImplementationFactory implements Factory<KernelTransactionImplementation>
{
private Set<KernelTransactionImplementation> transactions;
private final Set<KernelTransactionImplementation> transactions;

KernelTransactionImplementationFactory( Set<KernelTransactionImplementation> transactions )
{
Expand All @@ -345,7 +345,7 @@ public KernelTransactionImplementation newInstance()

private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation>
{
private Set<KernelTransactionImplementation> transactions;
private final Set<KernelTransactionImplementation> transactions;

GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions,
Factory<KernelTransactionImplementation> factory )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ public void assertDatabaseAvailable()
}
catch ( AvailabilityGuard.UnavailableException e )
{
if ( guard.isShutdown() )
{
throw new DatabaseShutdownException();
}
throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage() );
throw convertUnavailabilityException( e );
}
}

public RuntimeException convertUnavailabilityException( Exception e )
{
if ( guard.isShutdown() )
{
return new DatabaseShutdownException();
}
return new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public void shutdown()
{
try
{
msgLog.log( "Shutdown started" );
platform.availabilityGuard.shutdown();
msgLog.log( "Shutdown started" );
platform.life.shutdown();
}
catch ( LifecycleException throwable )
Expand All @@ -170,7 +170,6 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
{
try
{
availability.assertDatabaseAvailable();
KernelTransaction kernelTx = dataSource.kernelAPI.get().newTransaction( type, securityContext, timeout );
kernelTx.registerCloseListener(
(txId) -> dataSource.threadToTransactionBridge.unbindTransactionFromCurrentThread() );
Expand All @@ -179,7 +178,7 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
}
catch ( TransactionFailureException e )
{
throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e );
throw availability.convertUnavailabilityException( e );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.neo4j.storageengine.api.EntityType;

import static java.lang.String.format;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.map;
import static org.neo4j.helpers.collection.Iterators.emptyIterator;
import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
// Anyways please fix this.
dependencies.satisfyDependency( dataSourceManager );

availabilityGuard = dependencies.satisfyDependency(
new AvailabilityGuard( clock, logging.getInternalLog( AvailabilityGuard.class ) ) );
availabilityGuard = dependencies.satisfyDependency( createAvailabilityGuard() );

transactionMonitor = dependencies.satisfyDependency( createTransactionStats() );

Expand All @@ -200,6 +199,11 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
publishPlatformInfo( dependencies.resolveDependency( UsageData.class ) );
}

protected AvailabilityGuard createAvailabilityGuard()
{
return new AvailabilityGuard( clock, logging.getInternalLog( AvailabilityGuard.class ) );
}

protected SystemNanoClock createClock()
{
return Clocks.nanoClock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
{
try
{
availability.assertDatabaseAvailable();
KernelTransaction kernelTx = sourceModule.kernelAPI.get().newTransaction( type, this.securityContext, timeout );
kernelTx.registerCloseListener(
( txId ) -> sourceModule.threadToTransactionBridge.unbindTransactionFromCurrentThread() );
Expand All @@ -195,7 +194,7 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
}
catch ( TransactionFailureException e )
{
throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e );
throw availability.convertUnavailabilityException( e );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
*/
package org.neo4j.kernel.impl.api;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.mockito.InOrder;

import java.io.File;
import java.time.Clock;
import java.util.Map;
import java.util.function.Function;

import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.kernel.configuration.Config;
Expand All @@ -36,16 +44,34 @@
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.NullLog;
import org.neo4j.test.ImpermanentGraphDatabase;

import org.neo4j.test.rule.NeoStoreDataSourceRule;
import org.neo4j.test.rule.PageCacheAndDependenciesRule;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;

import static org.neo4j.kernel.api.schema.SchemaDescriptorFactory.forLabel;
import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED;
import static org.neo4j.kernel.api.KernelTransaction.Type.explicit;

public class KernelTest
{
private final PageCacheAndDependenciesRule pageCacheRule = new PageCacheAndDependenciesRule();
private final NeoStoreDataSourceRule neoStoreRule = new NeoStoreDataSourceRule();

@Rule
public final RuleChain rules = RuleChain.outerRule( pageCacheRule ).around( neoStoreRule );

@Test
public void shouldNotAllowCreationOfConstraintsWhenInHA() throws Exception
{
Expand All @@ -72,6 +98,29 @@ public void shouldNotAllowCreationOfConstraintsWhenInHA() throws Exception
db.shutdown();
}

@Test
public void shouldIncrementTransactionMonitorBeforeCheckingDatabaseAvailability() throws Exception
{
// GIVEN
AvailabilityGuard availabilityGuard = spy( new AvailabilityGuard( Clock.systemUTC(), NullLog.getInstance() ) );
TransactionMonitor transactionMonitor = mock( TransactionMonitor.class );
Dependencies dependencies = new Dependencies();
dependencies.satisfyDependencies( availabilityGuard, transactionMonitor );
NeoStoreDataSource dataSource = neoStoreRule.getDataSource( pageCacheRule.directory().absolutePath(),
pageCacheRule.fileSystem(), pageCacheRule.pageCache(), dependencies );
dataSource.start();
KernelAPI kernel = dataSource.getKernel();

// WHEN
try ( KernelTransaction tx = kernel.newTransaction( explicit, AUTH_DISABLED ) )
{
// THEN
InOrder order = inOrder( transactionMonitor, availabilityGuard );
order.verify( transactionMonitor, times( 1 ) ).transactionStarted();
order.verify( availabilityGuard, times( 1 ) ).await( anyLong() );
}
}

@SuppressWarnings("deprecation")
class FakeHaDatabase extends ImpermanentGraphDatabase
{
Expand All @@ -96,8 +145,8 @@ protected SchemaWriteGuard createSchemaWriteGuard()
new GraphDatabaseFacadeFactory( DatabaseInfo.COMMUNITY, factory )
{
@Override
protected PlatformModule createPlatform( File storeDir, Config config, Dependencies dependencies,
GraphDatabaseFacade graphDatabaseFacade )
protected PlatformModule createPlatform( File storeDir, Config config,
GraphDatabaseFacadeFactory.Dependencies dependencies, GraphDatabaseFacade graphDatabaseFacade )
{
return new ImpermanentPlatformModule( storeDir, config, databaseInfo, dependencies,
graphDatabaseFacade );
Expand Down

0 comments on commit 400157e

Please sign in to comment.