Skip to content

Commit

Permalink
Update how we track transactions that we need to wait during shutdown.
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Dec 13, 2016
1 parent 29e379c commit 1d1b88d
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 67 deletions.
Expand Up @@ -730,42 +730,19 @@ public synchronized void stop()
return;
}

//:TODO comments are obsolete need to be cleaned up

// First kindly await all committing transactions to close. Do this without interfering with the
// log file monitor. Keep in mind that at this point the availability guard is raised and some time spent
// awaiting active transaction to close, on a more coarse-grained level, so no new transactions
// should get started. With that said there's actually a race between checking the availability guard
// in beginTx, incrementing number of open transactions and raising the guard in shutdown, there might
// be some in flight that will get to commit at some point
// in the future. Such transactions will fail if they come to commit after our synchronized block below.
// Here we're zooming in and focusing on getting committed transactions to close.

// In order to prevent various issues with life components that can perform operations with logFile on their
// stop phase before performing further shutdown/cleanup work and taking a lock on a logfile
// we stop all other life components to make sure that we are the last and only one (from current life)
life.stop();
// Under the guard of the logFile monitor do a second pass of waiting committing transactions
// to close. This is because there might have been transactions that were in flight and just now
// want to commit. We will allow committed transactions be properly closed, but no new transactions
// will be able to start committing at this point.
awaitAllTransactionsClosed();

awaitAllClosingTransactions();
// Checkpointing is now triggered as part of life.shutdown see lifecycleToTriggerCheckPointOnShutdown()
// Shut down all services in here, effectively making the database unusable for anyone who tries.
life.shutdown();
// After we've released the logFile monitor there might be transactions that wants to commit, but had
// to wait for the logFile monitor until now. When they finally get it and try to commit they will
// fail since the logFile no longer works.
}

private void awaitAllTransactionsClosed()
private void awaitAllClosingTransactions()
{
// Only wait for committed transactions to be applied if the kernel is healthy (i.e. no panic)
// otherwise if there has been a panic transactions will not be applied properly anyway.
TransactionIdStore txIdStore = getDependencyResolver().resolveDependency( TransactionIdStore.class );
while ( databaseHealth.isHealthy() &&
!txIdStore.closedTransactionIdIsOnParWithOpenedTransactionId() )
KernelTransactions kernelTransactions = kernelModule.kernelTransactions();
kernelTransactions.terminateAllTransactions();

while ( kernelTransactions.haveClosingTransaction() )
{
LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( 10 ) );
}
Expand Down Expand Up @@ -905,6 +882,7 @@ public boolean unregisterIndexProvider( String name )
*/
public void beforeModeSwitch()
{
kernelModule.kernelTransactions().blockNewTransactions();
clearTransactions();
}

Expand All @@ -925,7 +903,7 @@ private void clearTransactions()
public void afterModeSwitch()
{
storageEngine.loadSchemaCache();
clearTransactions();
kernelModule.kernelTransactions().unblockNewTransactions();
}

@SuppressWarnings( "deprecation" )
Expand Down
Expand Up @@ -66,17 +66,12 @@ public PhysicalLogFiles logFiles()
return logFiles;
}

public LogFile logFile()
{
return logFile;
}

public CheckPointer checkPointing()
CheckPointer checkPointing()
{
return checkPointer;
}

public TransactionAppender transactionAppender()
TransactionAppender transactionAppender()
{
return appender;
}
Expand All @@ -85,7 +80,6 @@ public void satisfyDependencies( Dependencies dependencies )
{
dependencies.satisfyDependencies( checkPointer,
logFile,
logFiles,
logFileInformation,
legacyIndexTransactionOrdering,
logicalTransactionStore,
Expand Down
Expand Up @@ -155,8 +155,10 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private SecurityContext securityContext;
private volatile StatementLocks statementLocks;
private boolean beforeHookInvoked;
private volatile boolean closing, closed;
private boolean failure, success;
private volatile boolean closing;
private volatile boolean closed;
private boolean failure;
private boolean success;
private volatile Status terminationReason;
private long startTimeMillis;
private long timeoutMillis;
Expand Down Expand Up @@ -464,6 +466,11 @@ private boolean hasDataChanges()
return hasTxStateWithChanges() && txState.hasDataChanges();
}

public boolean isClosing()
{
return closing;
}

@Override
public long closeTransaction() throws TransactionFailureException
{
Expand Down
Expand Up @@ -179,6 +179,13 @@ public Set<KernelTransactionHandle> activeTransactions()
* require transactions to be re-created.
*/
public void disposeAll()
{
terminateAllTransactions();
localTxPool.disposeAll();
globalTxPool.disposeAll();
}

public void terminateAllTransactions()
{
for ( KernelTransactionImplementation tx : allTransactions )
{
Expand All @@ -187,8 +194,11 @@ public void disposeAll()
// certainly want to keep that from being reused from this point.
tx.markForTermination( Status.General.DatabaseUnavailable );
}
localTxPool.disposeAll();
globalTxPool.disposeAll();
}

public boolean haveClosingTransaction()
{
return allTransactions.stream().anyMatch( KernelTransactionImplementation::isClosing );
}

@Override
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.impl.api;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -98,14 +97,13 @@
import static org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory.DEFAULT;
import static org.neo4j.test.assertion.Assert.assertException;

@Ignore
public class KernelTransactionsTest
{
@Rule
public final OtherThreadRule<Void> t2 = new OtherThreadRule<>( "T2-" + getClass().getName() );

@Test
public void shouldListActiveTransactions() throws Exception
public void shouldListActiveTransactions() throws Throwable
{
// Given
KernelTransactions transactions = newTestKernelTransactions();
Expand All @@ -122,7 +120,7 @@ public void shouldListActiveTransactions() throws Exception
}

@Test
public void shouldDisposeTransactionsWhenAsked() throws Exception
public void shouldDisposeTransactionsWhenAsked() throws Throwable
{
// Given
KernelTransactions transactions = newKernelTransactions();
Expand All @@ -147,7 +145,7 @@ public void shouldDisposeTransactionsWhenAsked() throws Exception
}

@Test
public void shouldIncludeRandomBytesInAdditionalHeader() throws Exception
public void shouldIncludeRandomBytesInAdditionalHeader() throws Throwable
{
// Given
TransactionRepresentation[] transactionRepresentation = new TransactionRepresentation[1];
Expand All @@ -169,7 +167,7 @@ public void shouldIncludeRandomBytesInAdditionalHeader() throws Exception
}

@Test
public void shouldReuseClosedTransactionObjects() throws Exception
public void shouldReuseClosedTransactionObjects() throws Throwable
{
// GIVEN
KernelTransactions transactions = newKernelTransactions();
Expand All @@ -184,7 +182,7 @@ public void shouldReuseClosedTransactionObjects() throws Exception
}

@Test
public void shouldTellWhenTransactionsFromSnapshotHaveBeenClosed() throws Exception
public void shouldTellWhenTransactionsFromSnapshotHaveBeenClosed() throws Throwable
{
// GIVEN
KernelTransactions transactions = newKernelTransactions();
Expand Down Expand Up @@ -269,7 +267,7 @@ public void shouldBeAbleToSnapshotDuringHeavyLoad() throws Throwable
}

@Test
public void transactionCloseRemovesTxFromActiveTransactions() throws Exception
public void transactionCloseRemovesTxFromActiveTransactions() throws Throwable
{
KernelTransactions kernelTransactions = newTestKernelTransactions();

Expand All @@ -284,7 +282,7 @@ public void transactionCloseRemovesTxFromActiveTransactions() throws Exception
}

@Test
public void disposeAllMarksAllTransactionsForTermination() throws Exception
public void disposeAllMarksAllTransactionsForTermination() throws Throwable
{
KernelTransactions kernelTransactions = newKernelTransactions();

Expand All @@ -300,7 +298,7 @@ public void disposeAllMarksAllTransactionsForTermination() throws Exception
}

@Test
public void transactionClosesUnderlyingStoreStatementWhenDisposed() throws Exception
public void transactionClosesUnderlyingStoreStatementWhenDisposed() throws Throwable
{
StorageStatement storeStatement1 = mock( StorageStatement.class );
StorageStatement storeStatement2 = mock( StorageStatement.class );
Expand All @@ -322,7 +320,7 @@ public void transactionClosesUnderlyingStoreStatementWhenDisposed() throws Excep
}

@Test
public void threadThatBlocksNewTxsCantStartNewTxs() throws Exception
public void threadThatBlocksNewTxsCantStartNewTxs() throws Throwable
{
KernelTransactions kernelTransactions = newKernelTransactions();
kernelTransactions.blockNewTransactions();
Expand All @@ -338,7 +336,7 @@ public void threadThatBlocksNewTxsCantStartNewTxs() throws Exception
}

@Test
public void blockNewTransactions() throws Exception
public void blockNewTransactions() throws Throwable
{
KernelTransactions kernelTransactions = newKernelTransactions();
kernelTransactions.blockNewTransactions();
Expand All @@ -354,7 +352,7 @@ public void blockNewTransactions() throws Exception
}

@Test
public void unblockNewTransactionsFromWrongThreadThrows() throws Exception
public void unblockNewTransactionsFromWrongThreadThrows() throws Throwable
{
KernelTransactions kernelTransactions = newKernelTransactions();
kernelTransactions.blockNewTransactions();
Expand Down Expand Up @@ -383,7 +381,8 @@ public void unblockNewTransactionsFromWrongThreadThrows() throws Exception
}

@Test
public void shouldNotLeakTransactionOnSecurityContextFreezeFailure() throws Exception {
public void shouldNotLeakTransactionOnSecurityContextFreezeFailure() throws Throwable
{
KernelTransactions kernelTransactions = newKernelTransactions();
SecurityContext securityContext = mock(SecurityContext.class);
when(securityContext.freeze()).thenThrow(new AuthorizationExpiredException("Freeze failed."));
Expand All @@ -406,30 +405,30 @@ private static void startAndCloseTransaction(KernelTransactions kernelTransactio
}
}

private static KernelTransactions newKernelTransactions() throws Exception
private static KernelTransactions newKernelTransactions() throws Throwable
{
return newKernelTransactions( mock( TransactionCommitProcess.class ) );
}

private static KernelTransactions newTestKernelTransactions() throws Exception
private static KernelTransactions newTestKernelTransactions() throws Throwable
{
return newKernelTransactions( true, mock( TransactionCommitProcess.class ), mock( StorageStatement.class ) );
}

private static KernelTransactions newKernelTransactions( TransactionCommitProcess commitProcess ) throws Exception
private static KernelTransactions newKernelTransactions( TransactionCommitProcess commitProcess ) throws Throwable
{
return newKernelTransactions( false, commitProcess, mock( StorageStatement.class ) );
}

private static KernelTransactions newKernelTransactions( TransactionCommitProcess commitProcess,
StorageStatement firstStoreStatements, StorageStatement... otherStorageStatements ) throws Exception
StorageStatement firstStoreStatements, StorageStatement... otherStorageStatements ) throws Throwable
{
return newKernelTransactions( false, commitProcess, firstStoreStatements, otherStorageStatements );
}

private static KernelTransactions newKernelTransactions( boolean testKernelTransactions,
TransactionCommitProcess commitProcess, StorageStatement firstStoreStatements,
StorageStatement... otherStorageStatements ) throws Exception
StorageStatement... otherStorageStatements ) throws Throwable
{
Locks locks = mock( Locks.class );
when( locks.newClient() ).thenReturn( mock( Locks.Client.class ) );
Expand All @@ -454,7 +453,7 @@ private static KernelTransactions newKernelTransactions( boolean testKernelTrans
}

private static KernelTransactions newKernelTransactions( Locks locks, StorageEngine storageEngine,
TransactionCommitProcess commitProcess, boolean testKernelTransactions )
TransactionCommitProcess commitProcess, boolean testKernelTransactions ) throws Throwable
{
LifeSupport life = new LifeSupport();
life.start();
Expand All @@ -468,21 +467,45 @@ private static KernelTransactions newKernelTransactions( Locks locks, StorageEng
StatementOperationContainer statementOperationsContianer = new StatementOperationContainer( null, null );
Clock clock = Clocks.systemClock();
AvailabilityGuard availabilityGuard = new AvailabilityGuard( clock, NullLog.getInstance() );
KernelTransactions transactions;
if ( testKernelTransactions )
{
return new TestKernelTransactions( statementLocksFactory, null, statementOperationsContianer,
null, DEFAULT,
commitProcess, null, null, new TransactionHooks(), mock( TransactionMonitor.class ),
availabilityGuard, tracers, storageEngine, new Procedures(), transactionIdStore, clock,
new CanWrite() );
transactions = createTestTransactions( storageEngine, commitProcess, transactionIdStore, tracers,
statementLocksFactory, statementOperationsContianer, clock, availabilityGuard );
}
else
{
transactions = createTransactions( storageEngine, commitProcess, transactionIdStore, tracers,
statementLocksFactory, statementOperationsContianer, clock, availabilityGuard );
}
transactions.start();
return transactions;
}

private static KernelTransactions createTransactions( StorageEngine storageEngine,
TransactionCommitProcess commitProcess, TransactionIdStore transactionIdStore, Tracers tracers,
StatementLocksFactory statementLocksFactory, StatementOperationContainer statementOperationsContianer,
Clock clock, AvailabilityGuard availabilityGuard )
{
return new KernelTransactions( statementLocksFactory,
null, statementOperationsContianer, null, DEFAULT,
commitProcess, null, null, new TransactionHooks(), mock( TransactionMonitor.class ),
availabilityGuard,
tracers, storageEngine, new Procedures(), transactionIdStore, clock, new CanWrite() );
}

private static TestKernelTransactions createTestTransactions( StorageEngine storageEngine,
TransactionCommitProcess commitProcess, TransactionIdStore transactionIdStore, Tracers tracers,
StatementLocksFactory statementLocksFactory, StatementOperationContainer statementOperationsContianer,
Clock clock, AvailabilityGuard availabilityGuard )
{
return new TestKernelTransactions( statementLocksFactory, null, statementOperationsContianer,
null, DEFAULT,
commitProcess, null, null, new TransactionHooks(), mock( TransactionMonitor.class ),
availabilityGuard, tracers, storageEngine, new Procedures(), transactionIdStore, clock,
new CanWrite() );
}

private static TransactionCommitProcess newRememberingCommitProcess( final TransactionRepresentation[] slot )
throws TransactionFailureException
{
Expand Down

0 comments on commit 1d1b88d

Please sign in to comment.