Skip to content

Commit

Permalink
Revert "Fixes race between starting transaction and shutdown"
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Jun 21, 2017
1 parent fc34d89 commit 3543bc0
Show file tree
Hide file tree
Showing 17 changed files with 91 additions and 551 deletions.

This file was deleted.

Expand Up @@ -21,14 +21,13 @@


import java.time.Clock; import java.time.Clock;


import org.neo4j.kernel.AvailabilityGuard.AvailabilityRequirement;
import org.neo4j.kernel.impl.transaction.TransactionStats; import org.neo4j.kernel.impl.transaction.TransactionStats;
import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.time.Clocks; import org.neo4j.time.Clocks;


import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos; import static java.util.concurrent.locks.LockSupport.parkNanos;

import static org.neo4j.kernel.AvailabilityGuard.AvailabilityRequirement;
import static org.neo4j.kernel.AvailabilityGuard.availabilityRequirement; import static org.neo4j.kernel.AvailabilityGuard.availabilityRequirement;


/** /**
Expand Down Expand Up @@ -93,5 +92,6 @@ private void awaitTransactionsClosedWithinTimeout()
public void shutdown() public void shutdown()
throws Throwable throws Throwable
{ {
// TODO: Starting database. Make sure none can access it through lock or CAS
} }
} }
Expand Up @@ -741,7 +741,7 @@ private NeoStoreKernelModule buildKernel( TransactionAppender appender,
availabilityGuard, tracers, storageEngine, procedures, transactionIdStore, clock, accessCapability ) ); availabilityGuard, tracers, storageEngine, procedures, transactionIdStore, clock, accessCapability ) );


final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures, final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures,
config, availabilityGuard ); config );


kernel.registerTransactionHook( transactionEventHandlers ); kernel.registerTransactionHook( transactionEventHandlers );


Expand Down
Expand Up @@ -20,13 +20,10 @@
package org.neo4j.kernel.impl.api; package org.neo4j.kernel.impl.api;


import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.AvailabilityGuard.UnavailableException;
import org.neo4j.kernel.api.KernelAPI; import org.neo4j.kernel.api.KernelAPI;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.TransactionHook; import org.neo4j.kernel.api.TransactionHook;
import org.neo4j.kernel.api.exceptions.ProcedureException; import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.proc.CallableProcedure; import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.CallableUserAggregationFunction; import org.neo4j.kernel.api.proc.CallableUserAggregationFunction;
Expand Down Expand Up @@ -67,22 +64,17 @@ public class Kernel extends LifecycleAdapter implements KernelAPI
private final DatabaseHealth health; private final DatabaseHealth health;
private final TransactionMonitor transactionMonitor; private final TransactionMonitor transactionMonitor;
private final Procedures procedures; private final Procedures procedures;
private final AvailabilityGuard availabilityGuard;
private final long defaultTransactionTimeout; private final long defaultTransactionTimeout;
private final long transactionStartTimeout;


public Kernel( KernelTransactions transactionFactory, TransactionHooks hooks, DatabaseHealth health, public Kernel( KernelTransactions transactionFactory, TransactionHooks hooks, DatabaseHealth health,
TransactionMonitor transactionMonitor, Procedures procedures, Config config, TransactionMonitor transactionMonitor, Procedures procedures, Config config )
AvailabilityGuard availabilityGuard )
{ {
this.transactions = transactionFactory; this.transactions = transactionFactory;
this.hooks = hooks; this.hooks = hooks;
this.health = health; this.health = health;
this.transactionMonitor = transactionMonitor; this.transactionMonitor = transactionMonitor;
this.procedures = procedures; this.procedures = procedures;
this.availabilityGuard = availabilityGuard;
this.defaultTransactionTimeout = config.get( GraphDatabaseSettings.transaction_timeout ).toMillis(); this.defaultTransactionTimeout = config.get( GraphDatabaseSettings.transaction_timeout ).toMillis();
this.transactionStartTimeout = config.get( GraphDatabaseSettings.transaction_start_timeout ).toMillis();
} }


@Override @Override
Expand All @@ -96,21 +88,9 @@ public KernelTransaction newTransaction( KernelTransaction.Type type, SecurityCo
TransactionFailureException TransactionFailureException
{ {
health.assertHealthy( TransactionFailureException.class ); health.assertHealthy( TransactionFailureException.class );

KernelTransaction transaction = transactions.newInstance( type, securityContext, timeout );
// Increment transaction monitor before checking the availability guard (which needs to be checked
// before starting the transaction). This is because the DatabaseAvailability does this in the other
// order, where it closes the availability guard and then awaits started transactions to finish.
transactionMonitor.transactionStarted(); transactionMonitor.transactionStarted();
try return transaction;
{
availabilityGuard.await( transactionStartTimeout );
}
catch ( UnavailableException e )
{
transactionMonitor.transactionFinished( false, false );
throw new TransactionFailureException( Status.Transaction.TransactionStartFailed, e, e.getMessage() );
}
return transactions.newInstance( type, securityContext, timeout );
} }


@Override @Override
Expand Down
Expand Up @@ -322,7 +322,7 @@ private void assertCurrentThreadIsNotBlockingNewTransactions()


private class KernelTransactionImplementationFactory implements Factory<KernelTransactionImplementation> private class KernelTransactionImplementationFactory implements Factory<KernelTransactionImplementation>
{ {
private final Set<KernelTransactionImplementation> transactions; private Set<KernelTransactionImplementation> transactions;


KernelTransactionImplementationFactory( Set<KernelTransactionImplementation> transactions ) KernelTransactionImplementationFactory( Set<KernelTransactionImplementation> transactions )
{ {
Expand All @@ -345,7 +345,7 @@ public KernelTransactionImplementation newInstance()


private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation> private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation>
{ {
private final Set<KernelTransactionImplementation> transactions; private Set<KernelTransactionImplementation> transactions;


GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions, GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions,
Factory<KernelTransactionImplementation> factory ) Factory<KernelTransactionImplementation> factory )
Expand Down
Expand Up @@ -50,16 +50,11 @@ public void assertDatabaseAvailable()
} }
catch ( AvailabilityGuard.UnavailableException e ) catch ( AvailabilityGuard.UnavailableException e )
{ {
throw convertUnavailabilityException( e ); if ( guard.isShutdown() )
{
throw new DatabaseShutdownException();
}
throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage() );
} }
} }

public RuntimeException convertUnavailabilityException( Exception e )
{
if ( guard.isShutdown() )
{
return new DatabaseShutdownException();
}
return new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e );
}
} }
Expand Up @@ -154,8 +154,8 @@ public void shutdown()
{ {
try try
{ {
platform.availabilityGuard.shutdown();
msgLog.log( "Shutdown started" ); msgLog.log( "Shutdown started" );
platform.availabilityGuard.shutdown();
platform.life.shutdown(); platform.life.shutdown();
} }
catch ( LifecycleException throwable ) catch ( LifecycleException throwable )
Expand All @@ -170,6 +170,7 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
{ {
try try
{ {
availability.assertDatabaseAvailable();
KernelTransaction kernelTx = dataSource.kernelAPI.get().newTransaction( type, securityContext, timeout ); KernelTransaction kernelTx = dataSource.kernelAPI.get().newTransaction( type, securityContext, timeout );
kernelTx.registerCloseListener( kernelTx.registerCloseListener(
(txId) -> dataSource.threadToTransactionBridge.unbindTransactionFromCurrentThread() ); (txId) -> dataSource.threadToTransactionBridge.unbindTransactionFromCurrentThread() );
Expand All @@ -178,7 +179,7 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
} }
catch ( TransactionFailureException e ) catch ( TransactionFailureException e )
{ {
throw availability.convertUnavailabilityException( e ); throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e );
} }
} }


Expand Down
Expand Up @@ -105,7 +105,6 @@
import org.neo4j.storageengine.api.EntityType; import org.neo4j.storageengine.api.EntityType;


import static java.lang.String.format; import static java.lang.String.format;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.map; import static org.neo4j.collection.primitive.PrimitiveLongCollections.map;
import static org.neo4j.helpers.collection.Iterators.emptyIterator; import static org.neo4j.helpers.collection.Iterators.emptyIterator;
import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED; import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED;
Expand Down
Expand Up @@ -183,7 +183,8 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
// Anyways please fix this. // Anyways please fix this.
dependencies.satisfyDependency( dataSourceManager ); dependencies.satisfyDependency( dataSourceManager );


availabilityGuard = dependencies.satisfyDependency( createAvailabilityGuard() ); availabilityGuard = dependencies.satisfyDependency(
new AvailabilityGuard( clock, logging.getInternalLog( AvailabilityGuard.class ) ) );


transactionMonitor = dependencies.satisfyDependency( createTransactionStats() ); transactionMonitor = dependencies.satisfyDependency( createTransactionStats() );


Expand All @@ -199,11 +200,6 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
publishPlatformInfo( dependencies.resolveDependency( UsageData.class ) ); publishPlatformInfo( dependencies.resolveDependency( UsageData.class ) );
} }


protected AvailabilityGuard createAvailabilityGuard()
{
return new AvailabilityGuard( clock, logging.getInternalLog( AvailabilityGuard.class ) );
}

protected SystemNanoClock createClock() protected SystemNanoClock createClock()
{ {
return Clocks.nanoClock(); return Clocks.nanoClock();
Expand Down
Expand Up @@ -186,6 +186,7 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
{ {
try try
{ {
availability.assertDatabaseAvailable();
KernelTransaction kernelTx = sourceModule.kernelAPI.get().newTransaction( type, this.securityContext, timeout ); KernelTransaction kernelTx = sourceModule.kernelAPI.get().newTransaction( type, this.securityContext, timeout );
kernelTx.registerCloseListener( kernelTx.registerCloseListener(
( txId ) -> sourceModule.threadToTransactionBridge.unbindTransactionFromCurrentThread() ); ( txId ) -> sourceModule.threadToTransactionBridge.unbindTransactionFromCurrentThread() );
Expand All @@ -194,7 +195,7 @@ public KernelTransaction beginTransaction( KernelTransaction.Type type, Security
} }
catch ( TransactionFailureException e ) catch ( TransactionFailureException e )
{ {
throw availability.convertUnavailabilityException( e ); throw new org.neo4j.graphdb.TransactionFailureException( e.getMessage(), e );
} }
} }
} }
Expand Up @@ -38,7 +38,6 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifecycleException; import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.AssertableLogProvider;
Expand All @@ -49,6 +48,7 @@
import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule; import org.neo4j.test.rule.fs.EphemeralFileSystemRule;


import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
Expand Down Expand Up @@ -86,11 +86,9 @@ public void databaseHealthShouldBeHealedOnStart() throws Throwable
{ {
DatabaseHealth databaseHealth = new DatabaseHealth( mock( DatabasePanicEventGenerator.class ), DatabaseHealth databaseHealth = new DatabaseHealth( mock( DatabasePanicEventGenerator.class ),
NullLogProvider.getInstance().getLog( DatabaseHealth.class ) ); NullLogProvider.getInstance().getLog( DatabaseHealth.class ) );
Dependencies dependencies = new Dependencies();
dependencies.satisfyDependency( databaseHealth );


theDataSource = dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCacheRule.getPageCache( fs.get() ), theDataSource = dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCacheRule.getPageCache( fs.get() ),
dependencies ); stringMap(), databaseHealth );


databaseHealth.panic( new Throwable() ); databaseHealth.panic( new Throwable() );


Expand All @@ -112,7 +110,7 @@ public void databaseHealthShouldBeHealedOnStart() throws Throwable
public void flushOfThePageCacheHappensOnlyOnceDuringShutdown() throws IOException public void flushOfThePageCacheHappensOnlyOnceDuringShutdown() throws IOException
{ {
PageCache pageCache = spy( pageCacheRule.getPageCache( fs.get() ) ); PageCache pageCache = spy( pageCacheRule.getPageCache( fs.get() ) );
NeoStoreDataSource ds = dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCache ); NeoStoreDataSource ds = dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCache, stringMap() );


ds.init(); ds.init();
ds.start(); ds.start();
Expand All @@ -127,9 +125,12 @@ public void flushOfThePageCacheHappensOnlyOnceDuringShutdown() throws IOExceptio
@Test @Test
public void flushOfThePageCacheOnShutdownHappensIfTheDbIsHealthy() throws IOException public void flushOfThePageCacheOnShutdownHappensIfTheDbIsHealthy() throws IOException
{ {
DatabaseHealth health = mock( DatabaseHealth.class );
when( health.isHealthy() ).thenReturn( true );

PageCache pageCache = spy( pageCacheRule.getPageCache( fs.get() ) ); PageCache pageCache = spy( pageCacheRule.getPageCache( fs.get() ) );


NeoStoreDataSource ds = dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCache ); NeoStoreDataSource ds = dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCache, stringMap(), health );


ds.init(); ds.init();
ds.start(); ds.start();
Expand All @@ -145,11 +146,10 @@ public void flushOfThePageCacheOnShutdownDoesNotHappenIfTheDbIsUnhealthy() throw
{ {
DatabaseHealth health = mock( DatabaseHealth.class ); DatabaseHealth health = mock( DatabaseHealth.class );
when( health.isHealthy() ).thenReturn( false ); when( health.isHealthy() ).thenReturn( false );

PageCache pageCache = spy( pageCacheRule.getPageCache( fs.get() ) ); PageCache pageCache = spy( pageCacheRule.getPageCache( fs.get() ) );


Dependencies dependencies = new Dependencies(); NeoStoreDataSource ds = dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCache, stringMap(), health );
dependencies.satisfyDependency( health );
NeoStoreDataSource ds = dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCache, dependencies );


ds.init(); ds.init();
ds.start(); ds.start();
Expand Down Expand Up @@ -224,11 +224,10 @@ public void logModuleSetUpError() throws Exception
AssertableLogProvider logProvider = new AssertableLogProvider(); AssertableLogProvider logProvider = new AssertableLogProvider();
SimpleLogService logService = new SimpleLogService( logProvider, logProvider ); SimpleLogService logService = new SimpleLogService( logProvider, logProvider );
PageCache pageCache = pageCacheRule.getPageCache( fs.get() ); PageCache pageCache = pageCacheRule.getPageCache( fs.get() );
Dependencies dependencies = new Dependencies();
dependencies.satisfyDependencies( idGeneratorFactory, idTypeConfigurationProvider, config, logService );


NeoStoreDataSource dataSource = dsRule.getDataSource( dir.graphDbDir(), fs.get(), NeoStoreDataSource dataSource = dsRule.getDataSource( dir.graphDbDir(), fs.get(), idGeneratorFactory,
pageCache, dependencies ); idTypeConfigurationProvider,
pageCache, config, mock( DatabaseHealth.class ), logService );


try try
{ {
Expand Down Expand Up @@ -257,9 +256,7 @@ public void shouldAlwaysShutdownLifeEvenWhenCheckPointingFails() throws Exceptio
IOException ex = new IOException( "boom!" ); IOException ex = new IOException( "boom!" );
doThrow( ex ).when( databaseHealth ) doThrow( ex ).when( databaseHealth )
.assertHealthy( IOException.class ); // <- this is a trick to simulate a failure during checkpointing .assertHealthy( IOException.class ); // <- this is a trick to simulate a failure during checkpointing
Dependencies dependencies = new Dependencies(); NeoStoreDataSource dataSource = dsRule.getDataSource( storeDir, fs, pageCache, emptyMap(), databaseHealth );
dependencies.satisfyDependencies( databaseHealth );
NeoStoreDataSource dataSource = dsRule.getDataSource( storeDir, fs, pageCache, dependencies );
dataSource.start(); dataSource.start();


try try
Expand Down

0 comments on commit 3543bc0

Please sign in to comment.