Skip to content

Commit

Permalink
Use transaction counters to wait for all transactions completion on s…
Browse files Browse the repository at this point in the history
…hutdown

Use transaction statistics counters to wait for all transaction to
complete before NeoStoreDataSource will try to acquire transactional log
file lock to prevent deadlock on shutdown
  • Loading branch information
MishaDemianenko committed Jun 1, 2017
1 parent 479e58d commit 29f8023
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 12 deletions.
Expand Up @@ -98,7 +98,7 @@
import org.neo4j.kernel.impl.storemigration.monitoring.VisibleMigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.TransactionStats;
import org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppender;
import org.neo4j.kernel.impl.transaction.log.LogFile;
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;
Expand Down Expand Up @@ -280,7 +280,7 @@ boolean applicable( DiagnosticsPhase phase )
private final LockService lockService;
private final IndexingService.Monitor indexingServiceMonitor;
private final FileSystemAbstraction fs;
private final TransactionMonitor transactionMonitor;
private final TransactionStats transactionMonitor;
private final DatabaseHealth databaseHealth;
private final PhysicalLogFile.Monitor physicalLogMonitor;
private final TransactionHeaderInformationFactory transactionHeaderInformationFactory;
Expand Down Expand Up @@ -330,7 +330,7 @@ public NeoStoreDataSource(
TransactionEventHandlers transactionEventHandlers,
IndexingService.Monitor indexingServiceMonitor,
FileSystemAbstraction fs,
TransactionMonitor transactionMonitor,
TransactionStats transactionMonitor,
DatabaseHealth databaseHealth,
PhysicalLogFile.Monitor physicalLogMonitor,
TransactionHeaderInformationFactory transactionHeaderInformationFactory,
Expand Down Expand Up @@ -905,13 +905,21 @@ private void awaitAllTransactionsClosed()
// Only wait for committed transactions to be applied if the kernel is healthy (i.e. no panic)
// otherwise if there has been a panic transactions will not be applied properly anyway.
TransactionIdStore txIdStore = getDependencyResolver().resolveDependency( TransactionIdStore.class );
while ( databaseHealth.isHealthy() &&
while ( databaseHealth.isHealthy() && getNumberOfOngoingTransactions() > 0 &&
!txIdStore.closedTransactionIdIsOnParWithOpenedTransactionId() )
{
LockSupport.parkNanos( 10_000_000 ); // 10 ms
}
}

private long getNumberOfOngoingTransactions()
{
return transactionMonitor.getNumberOfStartedTransactions() -
transactionMonitor.getNumberOfCommittedTransactions() -
transactionMonitor.getNumberOfRolledBackTransactions() -
transactionMonitor.getNumberOfTerminatedTransactions();
}

private Lifecycle lifecycleToTriggerCheckPointOnShutdown()
{
// Write new checkpoint in the log only if the kernel is healthy.
Expand Down
Expand Up @@ -93,13 +93,20 @@ public KernelTransaction newTransaction( KernelTransaction.Type type, SecurityCo
}

@Override
public KernelTransaction newTransaction( KernelTransaction.Type type, SecurityContext securityContext, long timeout ) throws
TransactionFailureException
public KernelTransaction newTransaction( KernelTransaction.Type type, SecurityContext securityContext, long timeout )
throws TransactionFailureException
{
health.assertHealthy( TransactionFailureException.class );
KernelTransaction transaction = transactions.newInstance( type, securityContext, timeout );
transactionMonitor.transactionStarted();
return transaction;
try
{
return transactions.newInstance( type, securityContext, timeout );
}
catch ( Throwable t )
{
transactionMonitor.transactionFinished( false, false );
throw t;
}
}

@Override
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
Expand All @@ -36,6 +37,7 @@
import org.neo4j.kernel.impl.logging.SimpleLogService;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.configuration.CommunityIdTypeConfigurationProvider;
import org.neo4j.kernel.impl.transaction.TransactionStats;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
Expand All @@ -60,6 +62,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
Expand Down Expand Up @@ -273,6 +276,23 @@ public void shouldAlwaysShutdownLifeEvenWhenCheckPointingFails() throws Exceptio
}
}

@Test
public void checkTransactionStatsWhenStopDataSource() throws IOException
{
NeoStoreDataSource dataSource =
dsRule.getDataSource( dir.graphDbDir(), fs.get(), pageCacheRule.getPageCache( fs ), emptyMap() );
TransactionStats transactionMonitor = dsRule.getTransactionMonitor();
dataSource.start();

Mockito.verifyZeroInteractions(transactionMonitor);

dataSource.stop();
verify( transactionMonitor, times( 2 ) ).getNumberOfTerminatedTransactions();
verify( transactionMonitor, times( 2 ) ).getNumberOfRolledBackTransactions();
verify( transactionMonitor, times( 2 ) ).getNumberOfStartedTransactions();
verify( transactionMonitor, times( 2 ) ).getNumberOfCommittedTransactions();
}

private NeoStoreDataSource neoStoreDataSourceWithLogFilesContainingLowestTxId( PhysicalLogFiles files )
{
DependencyResolver resolver = mock( DependencyResolver.class );
Expand Down
Expand Up @@ -66,8 +66,6 @@

public class KernelIT extends KernelIntegrationTest
{
// TODO: Split this into area-specific tests, see PropertyIT.

@Test
public void mixingBeansApiWithKernelAPI() throws Exception
{
Expand Down
Expand Up @@ -57,7 +57,7 @@
import org.neo4j.kernel.impl.store.id.configuration.CommunityIdTypeConfigurationProvider;
import org.neo4j.kernel.impl.store.id.configuration.IdTypeConfigurationProvider;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.TransactionStats;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.internal.DatabaseHealth;
Expand All @@ -75,6 +75,7 @@
public class NeoStoreDataSourceRule extends ExternalResource
{
private NeoStoreDataSource dataSource;
private TransactionStats transactionMonitor;

public NeoStoreDataSource getDataSource( File storeDir, FileSystemAbstraction fs,
PageCache pageCache, Map<String,String> additionalConfig, DatabaseHealth databaseHealth )
Expand Down Expand Up @@ -108,13 +109,14 @@ public NeoStoreDataSource getDataSource( File storeDir, FileSystemAbstraction fs

JobScheduler jobScheduler = mock( JobScheduler.class, RETURNS_MOCKS );
Monitors monitors = new Monitors();
transactionMonitor = mock( TransactionStats.class );
dataSource = new NeoStoreDataSource( storeDir, config, idGeneratorFactory, IdReuseEligibility.ALWAYS,
idConfigurationProvider,
logService, mock( JobScheduler.class, RETURNS_MOCKS ), mock( TokenNameLookup.class ),
dependencyResolverForNoIndexProvider(), mock( PropertyKeyTokenHolder.class ),
mock( LabelTokenHolder.class ), mock( RelationshipTypeTokenHolder.class ), locksFactory,
mock( SchemaWriteGuard.class ), mock( TransactionEventHandlers.class ), IndexingService.NO_MONITOR,
fs, mock( TransactionMonitor.class ), databaseHealth,
fs, transactionMonitor, databaseHealth,
mock( PhysicalLogFile.Monitor.class ), TransactionHeaderInformationFactory.DEFAULT,
new StartupStatisticsProvider(), null,
new CommunityCommitProcessFactory(), mock( InternalAutoIndexing.class ), pageCache,
Expand All @@ -126,6 +128,11 @@ fs, mock( TransactionMonitor.class ), databaseHealth,
return dataSource;
}

public TransactionStats getTransactionMonitor()
{
return transactionMonitor;
}

public NeoStoreDataSource getDataSource( File storeDir, FileSystemAbstraction fs,
PageCache pageCache, Map<String,String> additionalConfig )
{
Expand Down

0 comments on commit 29f8023

Please sign in to comment.