Skip to content

Commit

Permalink
Let transactions fail gracefully on shutdown
Browse files Browse the repository at this point in the history
We do this by checking if the database was shutdown while we were waiting for the transaction lock.
  • Loading branch information
ragadeeshu committed Jun 15, 2017
1 parent 8d2efa7 commit 727743e
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 20 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.collection.pool.LinkedQueuePool; import org.neo4j.collection.pool.LinkedQueuePool;
import org.neo4j.collection.pool.MarshlandPool; import org.neo4j.collection.pool.MarshlandPool;
import org.neo4j.function.Factory; import org.neo4j.function.Factory;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.TransactionFailureException; import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
Expand Down Expand Up @@ -300,6 +301,10 @@ Set<KernelTransactionImplementation> getAllTransactions()


private void assertRunning() private void assertRunning()
{ {
if ( availabilityGuard.isShutdown() )
{
throw new DatabaseShutdownException();
}
if ( stopped ) if ( stopped )
{ {
throw new IllegalStateException( "Can't start new transaction with stopped " + getClass() ); throw new IllegalStateException( "Can't start new transaction with stopped " + getClass() );
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;


import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.security.AuthorizationExpiredException; import org.neo4j.graphdb.security.AuthorizationExpiredException;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
Expand Down Expand Up @@ -72,6 +73,8 @@
import org.neo4j.test.Race; import org.neo4j.test.Race;
import org.neo4j.test.rule.concurrent.OtherThreadRule; import org.neo4j.test.rule.concurrent.OtherThreadRule;


import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
Expand All @@ -91,10 +94,6 @@
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos;

import static org.neo4j.helpers.collection.Iterators.asSet; import static org.neo4j.helpers.collection.Iterators.asSet;
import static org.neo4j.kernel.api.KernelTransaction.Type.explicit; import static org.neo4j.kernel.api.KernelTransaction.Type.explicit;
import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED; import static org.neo4j.kernel.api.security.SecurityContext.AUTH_DISABLED;
Expand Down Expand Up @@ -411,22 +410,15 @@ public void shouldNotLeakTransactionOnSecurityContextFreezeFailure() throws Thro
} }


@Test @Test
public void shouldStartNewTransactionAfterGuardMarkedAsShutdown() throws Throwable public void exceptionWhenStartingNewTransactionOnShutdownInstance() throws Throwable
{ {
// This is because the first thing thast happens when shutting down is that AvailabilityGuard gets
// marked as shut down. After that, transactions that just made it passed some layers in db.beginTx()
// or similar will have time to complete (other parts of shutdown will take care of that).

KernelTransactions kernelTransactions = newKernelTransactions(); KernelTransactions kernelTransactions = newKernelTransactions();
SecurityContext securityContext = mock( SecurityContext.class ); SecurityContext securityContext = mock( SecurityContext.class );


availabilityGuard.shutdown(); availabilityGuard.shutdown();


try ( KernelTransaction transaction = expectedException.expect( DatabaseShutdownException.class );
kernelTransactions.newInstance( KernelTransaction.Type.explicit, securityContext, 0L ) ) kernelTransactions.newInstance( KernelTransaction.Type.explicit, securityContext, 0L );
{
assertNotNull( transaction );
}
} }


@Test @Test
Expand Down
Expand Up @@ -21,11 +21,13 @@


import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;


import java.io.File; import java.io.File;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
Expand All @@ -43,8 +45,8 @@
import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.concurrent.OtherThreadRule; import org.neo4j.test.rule.concurrent.OtherThreadRule;


import static org.hamcrest.core.Is.isA;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;

import static org.neo4j.helpers.collection.Iterables.single; import static org.neo4j.helpers.collection.Iterables.single;


/** /**
Expand All @@ -65,6 +67,8 @@ public class TransactionCompletionAndShutdownRaceIT
public final OtherThreadRule<Void> transactor = new OtherThreadRule<>( "Transactor" ); public final OtherThreadRule<Void> transactor = new OtherThreadRule<>( "Transactor" );
@Rule @Rule
public final OtherThreadRule<Void> shutter = new OtherThreadRule<>( "Shutter" ); public final OtherThreadRule<Void> shutter = new OtherThreadRule<>( "Shutter" );
@Rule
public ExpectedException expectedException = ExpectedException.none();


@Test @Test
public void shouldAlwaysAwaitTransactionCompletionBeforeShuttingDown() throws Exception public void shouldAlwaysAwaitTransactionCompletionBeforeShuttingDown() throws Exception
Expand All @@ -81,7 +85,7 @@ public void shouldAlwaysAwaitTransactionCompletionBeforeShuttingDown() throws Ex
barrierInstalled.set( true ); barrierInstalled.set( true );


// WHEN // WHEN
Future<Object> transactionFuture = transactor.execute( state -> doTransaction( db ) ); Future<Object> transactionFuture = transactor.execute( state -> doTransaction( db, barrier ) );
Future<Object> shutterFuture = shutter.execute( state -> doShutdown( db, barrier ) ); Future<Object> shutterFuture = shutter.execute( state -> doShutdown( db, barrier ) );


// THEN // THEN
Expand Down Expand Up @@ -141,10 +145,6 @@ public void require( AvailabilityRequirement requirement )
public void await( long millis ) throws UnavailableException public void await( long millis ) throws UnavailableException
{ {
super.await( millis ); super.await( millis );
if ( barrierInstaller.get() )
{
barrier.reached();
}
} }
}; };
} }
Expand All @@ -162,6 +162,94 @@ private Object doShutdown( GraphDatabaseService db, Barrier.Control barrier )
return null; return null;
} }


private Object doTransaction( GraphDatabaseService db, Barrier.Control barrier )
{
try ( Transaction tx = db.beginTx() )
{
barrier.reached();
db.createNode();
tx.success();
}
return null;
}

@Test
public void shouldNotStartTransactoinOnDatabaseThatIsKnownToBeShuttingDown() throws Exception
{
// GIVEN
Barrier.Control barrier = new Barrier.Control();
AtomicBoolean barrierInstalled = new AtomicBoolean();
File storeDir = directory.absolutePath();
TestGraphDatabaseFactory dbFactory = dbFactoryWithBarrierControlledTransactionStats2( barrier,
barrierInstalled );

{
GraphDatabaseService db = dbFactory.newImpermanentDatabase( storeDir );
barrierInstalled.set( true );

// WHEN
Future<Object> transactionFuture = transactor.execute( state -> doTransaction( db ) );
Future<Object> shutterFuture = shutter.execute( state -> doShutdown( db, barrier ) );

// THEN
shutterFuture.get();
expectedException.expectCause( isA( DatabaseShutdownException.class ) );
transactionFuture.get();
barrierInstalled.set( false );
}
}

private TestGraphDatabaseFactory dbFactoryWithBarrierControlledTransactionStats2( Barrier.Control barrier,
AtomicBoolean barrierInstaller )
{
return new TestGraphDatabaseFactory()
{
@Override
protected GraphDatabaseFacadeFactory newTestGraphDatabaseFacadeFactory( File storeDir, Config config,
TestGraphDatabaseFactoryState state )
{
return new TestGraphDatabaseFacadeFactory( state, true )
{
@Override
protected PlatformModule createPlatform( File storeDir, Config config, Dependencies dependencies,
GraphDatabaseFacade graphDatabaseFacade )
{
return new TestGraphDatabaseFacadeFactory.TestDatabasePlatformModule( storeDir, config,
databaseInfo, dependencies, graphDatabaseFacade )
{
@Override
protected AvailabilityGuard createAvailabilityGuard()
{
return new AvailabilityGuard( clock, NullLog.getInstance() )
{
@Override
public void require( AvailabilityRequirement requirement )
{
super.require( requirement );
if ( barrierInstaller.get() )
{
barrier.release();
}
}

@Override
public void await( long millis ) throws UnavailableException
{
super.await( millis );
if ( barrierInstaller.get() )
{
barrier.reached();
}
}
};
}
};
}
};
}
};
}

private Object doTransaction( GraphDatabaseService db ) private Object doTransaction( GraphDatabaseService db )
{ {
try ( Transaction tx = db.beginTx() ) try ( Transaction tx = db.beginTx() )
Expand Down

0 comments on commit 727743e

Please sign in to comment.