Skip to content

Commit

Permalink
Moves batching of transactions into the commit process
Browse files Browse the repository at this point in the history
since TransactionRepresentationStoreApplier moved into StorageEngine and
so the trick with BatchingTransactionRepresentationStoreApplier cannot be
used anymore. Batching had to become a first-class concept of the commit
prcocess, which benefits TransactionCommittingResponseUnpacker and other
places where batching needs to happen.

This simplifies code around the transaction commit process, for example
the TransactionAppender interface which previously had two ways to append
transactions to the log, one for "normal" commit and one for batching
commits. Now when batching is first-class inside TransactionAppender this
could be reduced to one method. Similar things have happened in nearby
areas.
  • Loading branch information
tinwelint committed Nov 30, 2015
1 parent a99d888 commit 6a6d020
Show file tree
Hide file tree
Showing 120 changed files with 3,669 additions and 5,289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeLabelsField;
Expand Down Expand Up @@ -345,7 +345,7 @@ protected void applyTransaction( Transaction transaction ) throws TransactionFai

GraphDatabaseBuilder builder = new TestGraphDatabaseFactory().newEmbeddedDatabaseBuilder( directory );
GraphDatabaseAPI database = (GraphDatabaseAPI) builder.newGraphDatabase();
try ( LockGroup locks = new LockGroup() )
try
{
DependencyResolver dependencyResolver = database.getDependencyResolver();

Expand All @@ -357,8 +357,9 @@ protected void applyTransaction( Transaction transaction ) throws TransactionFai
TransactionIdStore transactionIdStore = database.getDependencyResolver().resolveDependency(
TransactionIdStore.class );
NodeStore nodes = database.getDependencyResolver().resolveDependency( NeoStores.class ).getNodeStore();
commitProcess.commit( transaction.representation( idGenerator(), masterId(), myId(),
transactionIdStore.getLastCommittedTransactionId(), nodes ), locks, CommitEvent.NULL,
TransactionRepresentation representation = transaction.representation( idGenerator(), masterId(), myId(),
transactionIdStore.getLastCommittedTransactionId(), nodes );
commitProcess.commit( new TransactionToApply( representation ), CommitEvent.NULL,
TransactionApplicationMode.EXTERNAL );
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeLabelsField;
Expand Down Expand Up @@ -388,7 +388,7 @@ protected void applyTransaction( Transaction transaction ) throws TransactionFai

GraphDatabaseBuilder builder = new TestGraphDatabaseFactory().newEmbeddedDatabaseBuilder( directory );
GraphDatabaseAPI database = (GraphDatabaseAPI) builder.newGraphDatabase();
try ( LockGroup locks = new LockGroup() )
try
{
DependencyResolver dependencyResolver = database.getDependencyResolver();

Expand All @@ -400,8 +400,9 @@ protected void applyTransaction( Transaction transaction ) throws TransactionFai
TransactionIdStore transactionIdStore = database.getDependencyResolver().resolveDependency(
TransactionIdStore.class );
NodeStore nodes = database.getDependencyResolver().resolveDependency( NeoStores.class ).getNodeStore();
commitProcess.commit( transaction.representation( idGenerator(), masterId(), myId(),
transactionIdStore.getLastCommittedTransactionId(), nodes ), locks, CommitEvent.NULL,
TransactionRepresentation representation = transaction.representation( idGenerator(), masterId(), myId(),
transactionIdStore.getLastCommittedTransactionId(), nodes );
commitProcess.commit( new TransactionToApply( representation ), CommitEvent.NULL,
TransactionApplicationMode.EXTERNAL );
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ public static TestDirectory testDirForTest( Class<?> owningTest )
return new TargetDirectory( new DefaultFileSystemAbstraction(), owningTest ).testDirectory();
}

public static TestDirectory testDirForTest( Class<?> owningTest, FileSystemAbstraction fs )
{
return new TargetDirectory( fs, owningTest ).testDirectory();
}

public static TestDirectory testDirForTestWithEphemeralFS( EphemeralFileSystemAbstraction fileSystem,
Class<?> owningTest )
Class<?> owningTest )
{
return new TargetDirectory( fileSystem, owningTest ).testDirectory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.helpers.collection;

import java.io.Closeable;

public interface CloseableVisitor<E, FAILURE extends Exception> extends Visitor<E, FAILURE>, Closeable
public interface CloseableVisitor<E, FAILURE extends Exception> extends Visitor<E, FAILURE>, AutoCloseable
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.neo4j.kernel.impl.api.StatementOperationParts;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
import org.neo4j.kernel.impl.api.UpdateableSchemaState;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.api.index.IndexingService;
Expand Down Expand Up @@ -505,7 +504,6 @@ public void start() throws IOException
dependencies.satisfyDependency( storageEngine.schemaIndexProviderMap() );
dependencies.satisfyDependency( storageEngine.legacyIndexApplierLookup() );
dependencies.satisfyDependency( storageEngine.storeReadLayer() );
dependencies.satisfyDependency( storageEngine.transactionApplier() );
dependencies.satisfyDependency( storageEngine );
satisfyDependencies( transactionLogModule, kernelModule );
}
Expand Down Expand Up @@ -740,20 +738,19 @@ private void buildRecovery(
final StartupStatisticsProvider startupStatistics,
StorageEngine storageEngine )
{
@SuppressWarnings( "resource" )
MetaDataStore metaDataStore = neoStores.getMetaDataStore();
final TransactionRepresentationStoreApplier storeRecoverer = storageEngine.transactionApplierForRecovery();

RecoveryVisitor recoveryVisitor =
new RecoveryVisitor( metaDataStore, storeRecoverer, storageEngine.indexUpdatesValidatorForRecovery(),
recoveryVisitorMonitor );
@SuppressWarnings( "resource" )
RecoveryVisitor recoveryVisitor = new RecoveryVisitor( metaDataStore, storageEngine, recoveryVisitorMonitor );

LogEntryReader<ReadableLogChannel> logEntryReader = new VersionAwareLogEntryReader<>();
final Visitor<LogVersionedStoreChannel,IOException> logFileRecoverer =
final Visitor<LogVersionedStoreChannel,Exception> logFileRecoverer =
new LogFileRecoverer( logEntryReader, recoveryVisitor );

final LatestCheckPointFinder checkPointFinder =
new LatestCheckPointFinder( logFiles, fileSystemAbstraction, logEntryReader );
Recovery.SPI spi = new DefaultRecoverySPI( storageEngine.toCloseAfterRecovery(),
Recovery.SPI spi = new DefaultRecoverySPI(
storeFlusher, neoStores, logFileRecoverer, logFiles, fileSystemAbstraction, metaDataStore,
checkPointFinder );
Recovery recovery = new Recovery( spi, recoveryMonitor );
Expand Down Expand Up @@ -955,7 +952,10 @@ public synchronized void stop()

private void awaitAllTransactionsClosed()
{
while ( !storageEngine.neoStores().getMetaDataStore().closedTransactionIdIsOnParWithOpenedTransactionId() )
// Only wait for committed transactions to be applied if the kernel is healthy (i.e. no panic)
// otherwise if there has been a panic transactions will not be applied properly anyway.
while ( kernelHealth.isHealthy() &&
!storageEngine.neoStores().getMetaDataStore().closedTransactionIdIsOnParWithOpenedTransactionId() )
{
LockSupport.parkNanos( 10_000_000 ); // 10 ms
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ public CommandApplierFacade( CommandHandler... handlers )
this.handlers = handlers;
}

@Override
public void begin( TransactionToApply tx ) throws IOException
{
for ( CommandHandler handler : handlers )
{
handler.begin( tx );
}
}

@Override
public void end() throws Exception
{
for ( CommandHandler handler : handlers )
{
handler.end();
}
}

@Override
public void apply()
{
Expand Down

0 comments on commit 6a6d020

Please sign in to comment.