Skip to content

Commit

Permalink
Acquiring locks are deferred to commit time
Browse files Browse the repository at this point in the history
This is a reimplementation of PR #7545 for 3.0 branch.

Deferred locks is a kernel extension and jar is not part of the distribution
but is published to maven.

It is also guarded by `unsupported.dbms.deferred_locks.enabled` setting which
is off by default.

We separate between optimistic and pessimistic locking:

**Optimistic locking**
Acquire locks triggered by some write or read event through the `Statement` cake.
Instead of grabbing those locks we only remind what resources we want to lock
and then acquire the them right before commit.

**Pessimistic locking**
Locks grabbed explicitly by the user or grabbed on master by a slave. Those
locks are taken immediately.

**Note**
Grabbing locks just before commit guards for store inconsistencies but not
logical inconsistencies.

**Warning**
This feature should not be used. Most of all code in Neo4j is assumed to be
executed under some sort of lock, exclusive or shared. This commit makes that
assumption false and there has not been enough investigation to foresee what
consequences that will have for transaction execution. These problems are
still in the unknown but there are also known, purposely undocumented,
problems.
  • Loading branch information
lutovich committed Aug 7, 2016
1 parent d6f19b8 commit c42d5a5
Show file tree
Hide file tree
Showing 51 changed files with 3,850 additions and 395 deletions.
Expand Up @@ -78,8 +78,8 @@
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.index.LegacyIndexStore;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ReentrantLockService;
import org.neo4j.kernel.impl.locking.StatementLocksFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
Expand Down Expand Up @@ -268,7 +268,7 @@ boolean applicable( DiagnosticsPhase phase )
private final PropertyKeyTokenHolder propertyKeyTokenHolder;
private final LabelTokenHolder labelTokens;
private final RelationshipTypeTokenHolder relationshipTypeTokens;
private final Locks locks;
private final StatementLocksFactory statementLocksFactory;
private final SchemaWriteGuard schemaWriteGuard;
private final TransactionEventHandlers transactionEventHandlers;
private final IdGeneratorFactory idGeneratorFactory;
Expand Down Expand Up @@ -323,7 +323,7 @@ public NeoStoreDataSource(
PropertyKeyTokenHolder propertyKeyTokens,
LabelTokenHolder labelTokens,
RelationshipTypeTokenHolder relationshipTypeTokens,
Locks lockManager,
StatementLocksFactory statementLocksFactory,
SchemaWriteGuard schemaWriteGuard,
TransactionEventHandlers transactionEventHandlers,
IndexingService.Monitor indexingServiceMonitor,
Expand Down Expand Up @@ -357,7 +357,7 @@ public NeoStoreDataSource(
this.propertyKeyTokenHolder = propertyKeyTokens;
this.labelTokens = labelTokens;
this.relationshipTypeTokens = relationshipTypeTokens;
this.locks = lockManager;
this.statementLocksFactory = statementLocksFactory;
this.schemaWriteGuard = schemaWriteGuard;
this.transactionEventHandlers = transactionEventHandlers;
this.indexingServiceMonitor = indexingServiceMonitor;
Expand Down Expand Up @@ -793,10 +793,10 @@ private KernelModule buildKernel( TransactionAppender appender,
legacyIndexStore ) );

TransactionHooks hooks = new TransactionHooks();
KernelTransactions kernelTransactions = life.add( new KernelTransactions( locks, constraintIndexCreator,
statementOperations, schemaWriteGuard, transactionHeaderInformationFactory, transactionCommitProcess,
indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor, life, tracers, storageEngine,
procedures, transactionIdStore, config, Clock.SYSTEM_CLOCK ) );
KernelTransactions kernelTransactions = life.add( new KernelTransactions( statementLocksFactory,
constraintIndexCreator, statementOperations, schemaWriteGuard, transactionHeaderInformationFactory,
transactionCommitProcess, indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor,
life, tracers, storageEngine, procedures, transactionIdStore, config, Clock.SYSTEM_CLOCK ) );

final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures );

Expand Down
Expand Up @@ -154,7 +154,7 @@ private void validateNoExistingNodeWithLabelAndProperty( KernelStatement state,
{
IndexDescriptor indexDescriptor = new IndexDescriptor( labelId, propertyKeyId );
assertIndexOnline( state, indexDescriptor );
state.locks().acquireExclusive( INDEX_ENTRY,
state.locks().optimistic().acquireExclusive( INDEX_ENTRY,
indexEntryResourceId( labelId, propertyKeyId, Strings.prettyPrint( value ) ) );

long existing = entityReadOperations.nodeGetFromUniqueIndexSeek( state, indexDescriptor, value );
Expand Down Expand Up @@ -329,7 +329,7 @@ public long nodeGetFromUniqueIndexSeek(
}

// If we find the node - hold a shared lock. If we don't find a node - hold an exclusive lock.
Locks.Client locks = state.locks();
Locks.Client locks = state.locks().optimistic();
long indexEntryId = indexEntryResourceId( labelId, propertyKeyId, stringVal );

locks.acquireShared( INDEX_ENTRY, indexEntryId );
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.StatementLocks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.storageengine.api.StorageStatement;

Expand All @@ -44,14 +44,14 @@
* <ol>
* <li>Construct {@link KernelStatement} when {@link KernelTransactionImplementation} is constructed</li>
* <li>For every transaction...</li>
* <li>Call {@link #initialize(org.neo4j.kernel.impl.locking.Locks.Client)} which makes this instance
* <li>Call {@link #initialize(StatementLocks)} which makes this instance
* full available and ready to use. Call when the {@link KernelTransactionImplementation} is initialized.</li>
* <li>Alternate {@link #acquire()} / {@link #close()} when acquiring / closing a statement for the transaction...
* Temporarily asymmetric number of calls to {@link #acquire()} / {@link #close()} is supported, although in
* the end an equal number of calls must have been issued.</li>
* <li>To be safe call {@link #forceClose()} at the end of a transaction to force a close of the statement,
* even if there are more than one current call to {@link #acquire()}. This instance is now again ready
* to be {@link #initialize(org.neo4j.kernel.impl.locking.Locks.Client) initialized} and used for the transaction
* to be {@link #initialize(StatementLocks) initialized} and used for the transaction
* instance again, when it's initialized.</li>
* </ol>
*/
Expand All @@ -61,7 +61,7 @@ public class KernelStatement implements TxStateHolder, Statement
private final StorageStatement storeStatement;
private final KernelTransactionImplementation transaction;
private final OperationsFacade facade;
private Locks.Client locks;
private StatementLocks statementLocks;
private int referenceCount;

public KernelStatement( KernelTransactionImplementation transaction,
Expand Down Expand Up @@ -160,14 +160,14 @@ void assertOpen()
}
}

void initialize( Locks.Client locks )
void initialize( StatementLocks statementLocks )
{
this.locks = locks;
this.statementLocks = statementLocks;
}

public Locks.Client locks()
public StatementLocks locks()
{
return locks;
return statementLocks;
}

final void acquire()
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.state.TxState;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.StatementLocks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
Expand Down Expand Up @@ -144,7 +145,7 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private final StorageStatement storageStatement;
private CloseListener closeListener;
private AccessMode accessMode;
private volatile Locks.Client locks;
private volatile StatementLocks statementLocks;
private boolean beforeHookInvoked;
private volatile boolean closing, closed;
private boolean failure, success;
Expand Down Expand Up @@ -202,10 +203,10 @@ public KernelTransactionImplementation( StatementOperationParts operations,
* Reset this transaction to a vanilla state, turning it into a logically new transaction.
*/
public KernelTransactionImplementation initialize(
long lastCommittedTx, long lastTimeStamp, Locks.Client locks, Type type, AccessMode accessMode )
long lastCommittedTx, long lastTimeStamp, StatementLocks statementLocks, Type type, AccessMode accessMode )
{
this.type = type;
this.locks = locks;
this.statementLocks = statementLocks;
this.terminationReason = null;
this.closing = closed = failure = success = beforeHookInvoked = false;
this.writeState = TransactionWriteState.NONE;
Expand All @@ -215,7 +216,7 @@ public KernelTransactionImplementation initialize(
this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.accessMode = accessMode;
this.currentStatement.initialize( locks );
this.currentStatement.initialize( statementLocks );
return this;
}

Expand Down Expand Up @@ -296,9 +297,9 @@ private void markForTerminationIfPossible( Status reason )
{
failure = true;
terminationReason = reason;
if ( txTerminationAwareLocks && locks != null )
if ( txTerminationAwareLocks && statementLocks != null )
{
locks.stop();
statementLocks.stop();
}
transactionMonitor.transactionTerminated( hasTxStateWithChanges() );
}
Expand Down Expand Up @@ -522,13 +523,18 @@ private void commit() throws TransactionFailureException
// Convert changes into commands and commit
if ( hasChanges() )
{
// grab all optimistic locks now, locks can't be deferred any further
statementLocks.prepareForCommit();
// use pessimistic locks for the rest of the commit process, locks can't be deferred any further
Locks.Client commitLocks = statementLocks.pessimistic();

// Gather up commands from the various sources
Collection<StorageCommand> extractedCommands = new ArrayList<>();
storageEngine.createCommands(
extractedCommands,
txState,
storageStatement,
locks,
commitLocks,
lastTransactionIdWhenStarted );
if ( hasLegacyIndexChanges() )
{
Expand All @@ -554,7 +560,7 @@ private void commit() throws TransactionFailureException
headerInformation.getMasterId(),
headerInformation.getAuthorId(),
startTimeMillis, lastTransactionIdWhenStarted, clock.currentTimeMillis(),
locks.getLockSessionId() );
commitLocks.getLockSessionId() );

// Commit the transaction
commitProcess.commit( new TransactionToApply( transactionRepresentation ), commitEvent, INTERNAL );
Expand Down Expand Up @@ -669,8 +675,8 @@ private void release()
terminationReleaseLock.lock();
try
{
locks.close();
locks = null;
statementLocks.close();
statementLocks = null;
terminationReason = null;
type = null;
accessMode = null;
Expand Down Expand Up @@ -732,7 +738,10 @@ public Revertable restrict( AccessMode mode )
@Override
public String toString()
{
String lockSessionId = locks == null ? "locks == null" : String.valueOf( locks.getLockSessionId() );
String lockSessionId = statementLocks == null
? "statementLocks == null"
: String.valueOf( statementLocks.pessimistic().getLockSessionId() );

return "KernelTransaction[" + lockSessionId + "]";
}

Expand Down
Expand Up @@ -40,7 +40,8 @@
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.state.LegacyIndexTransactionStateImpl;
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.StatementLocks;
import org.neo4j.kernel.impl.locking.StatementLocksFactory;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.store.TransactionId;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
Expand Down Expand Up @@ -71,7 +72,7 @@ public class KernelTransactions extends LifecycleAdapter

// Transaction dependencies

private final Locks locks;
private final StatementLocksFactory statementLocksFactory;
private final boolean txTerminationAwareLocks;
private final ConstraintIndexCreator constraintIndexCreator;
private final StatementOperationParts statementOperations;
Expand Down Expand Up @@ -104,7 +105,7 @@ public class KernelTransactions extends LifecycleAdapter
*/
private final Set<KernelTransactionImplementation> allTransactions = newSetFromMap( new ConcurrentHashMap<>() );

public KernelTransactions( Locks locks,
public KernelTransactions( StatementLocksFactory statementLocksFactory,
ConstraintIndexCreator constraintIndexCreator,
StatementOperationParts statementOperations,
SchemaWriteGuard schemaWriteGuard,
Expand All @@ -122,7 +123,7 @@ public KernelTransactions( Locks locks,
Config config,
Clock clock )
{
this.locks = locks;
this.statementLocksFactory = statementLocksFactory;
this.txTerminationAwareLocks = config.get( tx_termination_aware_locks );
this.constraintIndexCreator = constraintIndexCreator;
this.statementOperations = statementOperations;
Expand Down Expand Up @@ -169,8 +170,9 @@ public KernelTransaction newInstance( KernelTransaction.Type type, AccessMode ac
assertDatabaseIsRunning();
TransactionId lastCommittedTransaction = transactionIdStore.getLastCommittedTransaction();
KernelTransactionImplementation tx = localTxPool.acquire();
StatementLocks statementLocks = statementLocksFactory.newInstance();
tx.initialize( lastCommittedTransaction.transactionId(),
lastCommittedTransaction.commitTimestamp(), locks.newClient(), type, accessMode );
lastCommittedTransaction.commitTimestamp(), statementLocks, type, accessMode );
return tx;
}
finally
Expand Down

0 comments on commit c42d5a5

Please sign in to comment.