Skip to content

Commit

Permalink
Locks made transaction termination aware
Browse files Browse the repository at this point in the history
KernelTransactionImplementation can be marked for termination from any thread
by calling #markForTermination(). This commit makes lock clients know that the
owning transaction was terminated so they can abort waiting.
  • Loading branch information
lutovich committed Jun 8, 2016
1 parent 027e2e5 commit 3a64a6f
Show file tree
Hide file tree
Showing 18 changed files with 961 additions and 150 deletions.
Expand Up @@ -19,6 +19,8 @@
*/ */
package org.neo4j.graphdb; package org.neo4j.graphdb;


import static java.util.Objects.requireNonNull;

/** /**
* Signals that the transaction within which the failed operations ran * Signals that the transaction within which the failed operations ran
* has been terminated with {@link Transaction#terminate()}. * has been terminated with {@link Transaction#terminate()}.
Expand All @@ -27,6 +29,11 @@ public class TransactionTerminatedException extends TransactionFailureException
{ {
public TransactionTerminatedException() public TransactionTerminatedException()
{ {
super( "The transaction has been terminated." ); this( "" );
}

protected TransactionTerminatedException( String info )
{
super( "The transaction has been terminated. " + requireNonNull( info ) );
} }
} }
Expand Up @@ -1112,7 +1112,7 @@ public KernelAPI get()
statementOperations, updateableSchemaState, schemaWriteGuard, schemaIndexProviderMap, statementOperations, updateableSchemaState, schemaWriteGuard, schemaIndexProviderMap,
transactionHeaderInformationFactory, storeLayer, transactionCommitProcess, transactionHeaderInformationFactory, storeLayer, transactionCommitProcess,
indexConfigStore, legacyIndexProviderLookup, hooks, constraintSemantics, indexConfigStore, legacyIndexProviderLookup, hooks, constraintSemantics,
transactionMonitor, life, procedureCache, tracers ) ); transactionMonitor, life, procedureCache, config, tracers ) );


final Kernel kernel = new Kernel( kernelTransactions, hooks, kernelHealth, transactionMonitor ); final Kernel kernel = new Kernel( kernelTransactions, hooks, kernelHealth, transactionMonitor );


Expand Down
Expand Up @@ -159,6 +159,7 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private final TransactionToRecordStateVisitor txStateToRecordStateVisitor = new TransactionToRecordStateVisitor(); private final TransactionToRecordStateVisitor txStateToRecordStateVisitor = new TransactionToRecordStateVisitor();
private final Collection<Command> extractedCommands = new ArrayCollection<>( 32 ); private final Collection<Command> extractedCommands = new ArrayCollection<>( 32 );
private final Locks locksManager; private final Locks locksManager;
private final boolean txTerminationAwareLocks;
private TransactionState txState; private TransactionState txState;
private LegacyIndexTransactionState legacyIndexTransactionState; private LegacyIndexTransactionState legacyIndexTransactionState;
private TransactionType transactionType = TransactionType.ANY; private TransactionType transactionType = TransactionType.ANY;
Expand Down Expand Up @@ -204,7 +205,8 @@ public KernelTransactionImplementation( StatementOperationParts operations,
Clock clock, Clock clock,
TransactionTracer tracer, TransactionTracer tracer,
ProcedureCache procedureCache, ProcedureCache procedureCache,
NeoStoreTransactionContext context ) NeoStoreTransactionContext context,
boolean txTerminationAwareLocks )
{ {
this.operations = operations; this.operations = operations;
this.schemaWriteGuard = schemaWriteGuard; this.schemaWriteGuard = schemaWriteGuard;
Expand All @@ -214,6 +216,7 @@ public KernelTransactionImplementation( StatementOperationParts operations,
this.providerMap = providerMap; this.providerMap = providerMap;
this.schemaState = schemaState; this.schemaState = schemaState;
this.locksManager = locks; this.locksManager = locks;
this.txTerminationAwareLocks = txTerminationAwareLocks;
this.hooks = hooks; this.hooks = hooks;
this.constraintIndexCreator = constraintIndexCreator; this.constraintIndexCreator = constraintIndexCreator;
this.headerInformationFactory = headerInformationFactory; this.headerInformationFactory = headerInformationFactory;
Expand Down Expand Up @@ -279,6 +282,10 @@ public void markForTermination()
{ {
failure = true; failure = true;
terminated = true; terminated = true;
if ( txTerminationAwareLocks && locks != null )
{
locks.stop();
}
transactionMonitor.transactionTerminated(); transactionMonitor.transactionTerminated();
} }
} }
Expand Down
Expand Up @@ -29,10 +29,13 @@
import org.neo4j.function.Factory; import org.neo4j.function.Factory;
import org.neo4j.function.Supplier; import org.neo4j.function.Supplier;
import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.Clock; import org.neo4j.helpers.Clock;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.api.index.IndexingService; import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.SchemaIndexProviderMap; import org.neo4j.kernel.impl.api.index.SchemaIndexProviderMap;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
Expand All @@ -54,6 +57,7 @@
import org.neo4j.kernel.monitoring.tracing.Tracers; import org.neo4j.kernel.monitoring.tracing.Tracers;


import static java.util.Collections.newSetFromMap; import static java.util.Collections.newSetFromMap;
import static org.neo4j.kernel.configuration.Settings.setting;


/** /**
* Central source of transactions in the database. * Central source of transactions in the database.
Expand All @@ -67,11 +71,15 @@ public class KernelTransactions extends LifecycleAdapter
implements Factory<KernelTransaction>, // For providing KernelTransaction instances implements Factory<KernelTransaction>, // For providing KernelTransaction instances
Supplier<KernelTransactionsSnapshot> // For providing KernelTransactionSnapshots Supplier<KernelTransactionsSnapshot> // For providing KernelTransactionSnapshots
{ {
public static final Setting<Boolean> tx_termination_aware_locks = setting(
"experimental.tx_termination_aware_locks", Settings.BOOLEAN, Settings.FALSE );

// Transaction dependencies // Transaction dependencies


private final NeoStoreTransactionContextFactory neoStoreTransactionContextFactory; private final NeoStoreTransactionContextFactory neoStoreTransactionContextFactory;
private final NeoStores neoStores; private final NeoStores neoStores;
private final Locks locks; private final Locks locks;
private final boolean txTerminationAwareLocks;
private final IntegrityValidator integrityValidator; private final IntegrityValidator integrityValidator;
private final ConstraintIndexCreator constraintIndexCreator; private final ConstraintIndexCreator constraintIndexCreator;
private final IndexingService indexingService; private final IndexingService indexingService;
Expand Down Expand Up @@ -123,11 +131,13 @@ public KernelTransactions( NeoStoreTransactionContextFactory neoStoreTransaction
ConstraintSemantics constraintSemantics, ConstraintSemantics constraintSemantics,
TransactionMonitor transactionMonitor, TransactionMonitor transactionMonitor,
LifeSupport dataSourceLife, ProcedureCache procedureCache, LifeSupport dataSourceLife, ProcedureCache procedureCache,
Config config,
Tracers tracers ) Tracers tracers )
{ {
this.neoStoreTransactionContextFactory = neoStoreTransactionContextFactory; this.neoStoreTransactionContextFactory = neoStoreTransactionContextFactory;
this.neoStores = neoStores; this.neoStores = neoStores;
this.locks = locks; this.locks = locks;
this.txTerminationAwareLocks = config.get( tx_termination_aware_locks );
this.integrityValidator = integrityValidator; this.integrityValidator = integrityValidator;
this.constraintIndexCreator = constraintIndexCreator; this.constraintIndexCreator = constraintIndexCreator;
this.indexingService = indexingService; this.indexingService = indexingService;
Expand Down Expand Up @@ -168,7 +178,7 @@ public KernelTransactionImplementation newInstance()
neoStores, locks, hooks, constraintIndexCreator, transactionHeaderInformationFactory, neoStores, locks, hooks, constraintIndexCreator, transactionHeaderInformationFactory,
transactionCommitProcess, transactionMonitor, storeLayer, legacyIndexTransactionState, transactionCommitProcess, transactionMonitor, storeLayer, legacyIndexTransactionState,
localTxPool, constraintSemantics, Clock.SYSTEM_CLOCK, tracers.transactionTracer, procedureCache, localTxPool, constraintSemantics, Clock.SYSTEM_CLOCK, tracers.transactionTracer, procedureCache,
context ); context, txTerminationAwareLocks );
allTransactions.add( tx ); allTransactions.add( tx );


return tx; return tx;
Expand Down
Expand Up @@ -66,22 +66,23 @@ public void stopClient()


/** /**
* Increment active number of clients that use current state instance. * Increment active number of clients that use current state instance.
* @return false if already stopped and not possible to increment active clients counter, true in case if counter *
* was successfully incremented. * @param client the locks client associated with this state; used only to create pretty exception
* with {@link LockClientStoppedException#LockClientStoppedException(Locks.Client)}.
* @throws LockClientStoppedException when stopped.
*/ */
public boolean incrementActiveClients() public void incrementActiveClients( Locks.Client client )
{ {
int currentState; int currentState;
do do
{ {
currentState = clientState.get(); currentState = clientState.get();
if ( isStopped( currentState ) ) if ( isStopped( currentState ) )
{ {
return false; throw new LockClientStoppedException( client );
} }
} }
while ( !clientState.compareAndSet( currentState, statusWithUpdatedClients( currentState, 1 ) ) ); while ( !clientState.compareAndSet( currentState, statusWithUpdatedClients( currentState, 1 ) ) );
return true;
} }


/** /**
Expand Down Expand Up @@ -138,4 +139,4 @@ private int statusWithUpdatedClients( int clientState, int delta )
{ {
return getStatus( clientState ) | (getActiveClients( clientState ) + delta); return getStatus( clientState ) | (getActiveClients( clientState ) + delta);
} }
} }
Expand Up @@ -19,14 +19,17 @@
*/ */
package org.neo4j.kernel.impl.locking; package org.neo4j.kernel.impl.locking;


import org.neo4j.graphdb.TransactionTerminatedException;

import static java.util.Objects.requireNonNull;

/** /**
* Exception that will be thrown in case when closed {@link org.neo4j.kernel.impl.locking.Locks.Client} * Exception thrown when stopped {@link Locks.Client} used to acquire locks.
* will be used to acquire shared/exclusive lock
*/ */
public class LockClientAlreadyClosedException extends RuntimeException public class LockClientStoppedException extends TransactionTerminatedException
{ {
public LockClientAlreadyClosedException( String message ) public LockClientStoppedException( Locks.Client client )
{ {
super( message ); super( requireNonNull( client ) + " is stopped" );
} }
} }
Expand Up @@ -28,8 +28,8 @@
import org.neo4j.collection.primitive.PrimitiveLongObjectMap; import org.neo4j.collection.primitive.PrimitiveLongObjectMap;
import org.neo4j.collection.primitive.PrimitiveLongObjectVisitor; import org.neo4j.collection.primitive.PrimitiveLongObjectVisitor;
import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.impl.locking.LockClientAlreadyClosedException;
import org.neo4j.kernel.impl.locking.LockClientStateHolder; import org.neo4j.kernel.impl.locking.LockClientStateHolder;
import org.neo4j.kernel.impl.locking.LockClientStoppedException;
import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.Locks;


import static java.lang.String.format; import static java.lang.String.format;
Expand Down Expand Up @@ -62,10 +62,7 @@ public CommunityLockClient( LockManagerImpl manager )
@Override @Override
public void acquireShared( Locks.ResourceType resourceType, long resourceId ) public void acquireShared( Locks.ResourceType resourceType, long resourceId )
{ {
if ( !stateHolder.incrementActiveClients() ) stateHolder.incrementActiveClients( this );
{
throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) );
}
try try
{ {
PrimitiveLongObjectMap<LockResource> localLocks = localShared( resourceType ); PrimitiveLongObjectMap<LockResource> localLocks = localShared( resourceType );
Expand All @@ -83,7 +80,7 @@ public void acquireShared( Locks.ResourceType resourceType, long resourceId )
} }
else else
{ {
throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); throw new LockClientStoppedException( this );
} }
} }
} }
Expand All @@ -98,10 +95,7 @@ public void acquireShared( Locks.ResourceType resourceType, long resourceId )
@Override @Override
public void acquireExclusive( Locks.ResourceType resourceType, long resourceId ) public void acquireExclusive( Locks.ResourceType resourceType, long resourceId )
{ {
if ( !stateHolder.incrementActiveClients() ) stateHolder.incrementActiveClients( this );
{
throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) );
}
try try
{ {
PrimitiveLongObjectMap<LockResource> localLocks = localExclusive( resourceType ); PrimitiveLongObjectMap<LockResource> localLocks = localExclusive( resourceType );
Expand All @@ -119,7 +113,7 @@ public void acquireExclusive( Locks.ResourceType resourceType, long resourceId )
} }
else else
{ {
throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); throw new LockClientStoppedException( this );
} }
} }
} }
Expand All @@ -132,10 +126,7 @@ public void acquireExclusive( Locks.ResourceType resourceType, long resourceId )
@Override @Override
public boolean tryExclusiveLock( Locks.ResourceType resourceType, long resourceId ) public boolean tryExclusiveLock( Locks.ResourceType resourceType, long resourceId )
{ {
if ( !stateHolder.incrementActiveClients() ) stateHolder.incrementActiveClients( this );
{
return false;
}
try try
{ {
PrimitiveLongObjectMap<LockResource> localLocks = localExclusive( resourceType ); PrimitiveLongObjectMap<LockResource> localLocks = localExclusive( resourceType );
Expand Down Expand Up @@ -168,10 +159,7 @@ public boolean tryExclusiveLock( Locks.ResourceType resourceType, long resourceI
@Override @Override
public boolean trySharedLock( Locks.ResourceType resourceType, long resourceId ) public boolean trySharedLock( Locks.ResourceType resourceType, long resourceId )
{ {
if ( !stateHolder.incrementActiveClients() ) stateHolder.incrementActiveClients( this );
{
return false;
}
try try
{ {
PrimitiveLongObjectMap<LockResource> localLocks = localShared( resourceType ); PrimitiveLongObjectMap<LockResource> localLocks = localShared( resourceType );
Expand Down Expand Up @@ -204,10 +192,7 @@ public boolean trySharedLock( Locks.ResourceType resourceType, long resourceId )
@Override @Override
public void releaseShared( Locks.ResourceType resourceType, long resourceId ) public void releaseShared( Locks.ResourceType resourceType, long resourceId )
{ {
if ( !stateHolder.incrementActiveClients() ) stateHolder.incrementActiveClients( this );
{
throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) );
}
try try
{ {
PrimitiveLongObjectMap<LockResource> localLocks = localShared( resourceType ); PrimitiveLongObjectMap<LockResource> localLocks = localShared( resourceType );
Expand All @@ -230,10 +215,7 @@ public void releaseShared( Locks.ResourceType resourceType, long resourceId )
@Override @Override
public void releaseExclusive( Locks.ResourceType resourceType, long resourceId ) public void releaseExclusive( Locks.ResourceType resourceType, long resourceId )
{ {
if ( !stateHolder.incrementActiveClients() ) stateHolder.incrementActiveClients( this );
{
throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) );
}
try try
{ {
PrimitiveLongObjectMap<LockResource> localLocks = localExclusive( resourceType ); PrimitiveLongObjectMap<LockResource> localLocks = localExclusive( resourceType );
Expand All @@ -255,10 +237,7 @@ public void releaseExclusive( Locks.ResourceType resourceType, long resourceId )
@Override @Override
public void releaseAll() public void releaseAll()
{ {
if ( !stateHolder.incrementActiveClients() ) stateHolder.incrementActiveClients( this );
{
throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) );
}
try try
{ {
releaseLocks(); releaseLocks();
Expand Down
Expand Up @@ -19,25 +19,25 @@
*/ */
package org.neo4j.graphdb; package org.neo4j.graphdb;


import org.junit.Test;

import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;


import org.junit.Test;

import org.neo4j.kernel.GraphDatabaseAPI; import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.impl.locking.LockCountVisitor; import org.neo4j.kernel.impl.locking.LockCountVisitor;
import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.TestGraphDatabaseFactory;


import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;

import static org.junit.Assert.fail;
import static org.neo4j.graphdb.DynamicLabel.label; import static org.neo4j.graphdb.DynamicLabel.label;
import static org.neo4j.helpers.Exceptions.rootCause; import static org.neo4j.helpers.Exceptions.rootCause;


Expand Down Expand Up @@ -130,10 +130,12 @@ public Void call() throws Exception
try try
{ {
secondTxResult.get( 60, SECONDS ); secondTxResult.get( 60, SECONDS );
fail( "Exception expected" );
} }
catch ( Exception e ) catch ( Exception e )
{ {
assertThat( rootCause( e ), instanceOf( TransactionTerminatedException.class ) ); assertThat( rootCause( e ), anyOf( instanceOf( TransactionFailureException.class ),
instanceOf( TransactionTerminatedException.class ) ) );
} }
} }


Expand Down
Expand Up @@ -64,6 +64,7 @@ null, mock( NeoStores.class ), new NoOpLocks(), new TransactionHooks(),
Clock.SYSTEM_CLOCK, Clock.SYSTEM_CLOCK,
TransactionTracer.NULL, TransactionTracer.NULL,
new ProcedureCache(), new ProcedureCache(),
mock( NeoStoreTransactionContext.class )); mock( NeoStoreTransactionContext.class ),
false );
} }
} }
Expand Up @@ -31,11 +31,6 @@
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;
import org.neo4j.kernel.impl.api.TransactionApplicationMode;
import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionHeaderInformation;
import org.neo4j.kernel.impl.api.TransactionHooks;
import org.neo4j.kernel.impl.api.store.ProcedureCache; import org.neo4j.kernel.impl.api.store.ProcedureCache;
import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.api.store.StoreReadLayer;
import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.api.store.StoreStatement;
Expand Down Expand Up @@ -459,7 +454,7 @@ private KernelTransactionImplementation newTransaction()
null, null, null, null, null, recordState, null, neoStores, locks, null, null, null, null, null, recordState, null, neoStores, locks,
hooks, null, headerInformationFactory, commitProcess, transactionMonitor, storeReadLayer, legacyIndexState, hooks, null, headerInformationFactory, commitProcess, transactionMonitor, storeReadLayer, legacyIndexState,
pool, new StandardConstraintSemantics(), clock, TransactionTracer.NULL, new ProcedureCache(), mock( NeoStoreTransactionContext pool, new StandardConstraintSemantics(), clock, TransactionTracer.NULL, new ProcedureCache(), mock( NeoStoreTransactionContext
.class ) ); .class ), false );
transaction.initialize( 0 ); transaction.initialize( 0 );
return transaction; return transaction;
} }
Expand Down

0 comments on commit 3a64a6f

Please sign in to comment.