Skip to content

Commit

Permalink
Make locks tx termination aware by default
Browse files Browse the repository at this point in the history
Commit removes `unsupported.dbms.tx_termination_aware_locks` setting and makes
locks abort waiting if the owning transaction was terminated. This behaviour
is now default and not configurable.
  • Loading branch information
lutovich committed Jul 21, 2016
1 parent 99edf23 commit 187c1a2
Show file tree
Hide file tree
Showing 16 changed files with 53 additions and 132 deletions.
Expand Up @@ -795,7 +795,7 @@ private KernelModule buildKernel( TransactionAppender appender,
KernelTransactions kernelTransactions = life.add( new KernelTransactions( locks, constraintIndexCreator,
statementOperations, schemaWriteGuard, transactionHeaderInformationFactory, transactionCommitProcess,
indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor, life, tracers, storageEngine,
procedures, transactionIdStore, config, Clock.SYSTEM_CLOCK ) );
procedures, transactionIdStore, Clock.SYSTEM_CLOCK ) );

final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures );

Expand Down
Expand Up @@ -127,7 +127,6 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private final TransactionTracer tracer;
private final Pool<KernelTransactionImplementation> pool;
private final Supplier<LegacyIndexTransactionState> legacyIndexTxStateSupplier;
private final boolean txTerminationAwareLocks;

// For committing
private final TransactionHeaderInformationFactory headerInformationFactory;
Expand Down Expand Up @@ -179,8 +178,7 @@ public KernelTransactionImplementation( StatementOperationParts operations,
Pool<KernelTransactionImplementation> pool,
Clock clock,
TransactionTracer tracer,
StorageEngine storageEngine,
boolean txTerminationAwareLocks )
StorageEngine storageEngine )
{
this.operations = operations;
this.schemaWriteGuard = schemaWriteGuard;
Expand All @@ -197,7 +195,6 @@ public KernelTransactionImplementation( StatementOperationParts operations,
this.tracer = tracer;
this.storageStatement = storeLayer.newStatement();
this.currentStatement = new KernelStatement( this, this, operations, storageStatement, procedures );
this.txTerminationAwareLocks = txTerminationAwareLocks;
}

/**
Expand Down Expand Up @@ -270,7 +267,7 @@ public void markForTermination( Status reason )
{
failure = true;
terminationReason = reason;
if ( txTerminationAwareLocks && locks != null )
if ( locks != null )
{
locks.stop();
}
Expand Down
Expand Up @@ -29,14 +29,11 @@
import org.neo4j.collection.pool.MarshlandPool;
import org.neo4j.function.Factory;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.state.LegacyIndexTransactionStateImpl;
import org.neo4j.kernel.impl.index.IndexConfigStore;
Expand All @@ -52,7 +49,6 @@
import org.neo4j.storageengine.api.StorageEngine;

import static java.util.Collections.newSetFromMap;
import static org.neo4j.kernel.configuration.Settings.setting;

/**
* Central source of transactions in the database.
Expand All @@ -65,13 +61,9 @@
public class KernelTransactions extends LifecycleAdapter
implements Supplier<KernelTransactionsSnapshot> // For providing KernelTransactionSnapshots
{
public static final Setting<Boolean> tx_termination_aware_locks = setting(
"unsupported.dbms.tx_termination_aware_locks", Settings.BOOLEAN, Settings.FALSE );

// Transaction dependencies

private final Locks locks;
private final boolean txTerminationAwareLocks;
private final ConstraintIndexCreator constraintIndexCreator;
private final StatementOperationParts statementOperations;
private final SchemaWriteGuard schemaWriteGuard;
Expand Down Expand Up @@ -118,11 +110,9 @@ public KernelTransactions( Locks locks,
StorageEngine storageEngine,
Procedures procedures,
TransactionIdStore transactionIdStore,
Config config,
Clock clock )
{
this.locks = locks;
this.txTerminationAwareLocks = config.get( tx_termination_aware_locks );
this.constraintIndexCreator = constraintIndexCreator;
this.statementOperations = statementOperations;
this.schemaWriteGuard = schemaWriteGuard;
Expand Down Expand Up @@ -152,7 +142,7 @@ public KernelTransactionImplementation newInstance()
statementOperations, schemaWriteGuard, hooks, constraintIndexCreator, procedures,
transactionHeaderInformationFactory, transactionCommitProcess, transactionMonitor,
legacyIndexTxStateSupplier, localTxPool, clock, tracers.transactionTracer,
storageEngine, txTerminationAwareLocks );
storageEngine );

allTransactions.add( tx );
return tx;
Expand Down
Expand Up @@ -84,7 +84,7 @@ static Instances kernelTransactionWithInternals( AccessMode accessMode )
mock( Pool.class ),
Clock.SYSTEM_CLOCK,
TransactionTracer.NULL,
storageEngine, false );
storageEngine );

transaction.initialize( 0, 0, new NoOpClient(), KernelTransaction.Type.implicit, accessMode );

Expand Down
Expand Up @@ -442,7 +442,7 @@ public void shouldIncrementReuseCounterOnReuse() throws Exception
@Test
public void markForTerminationNotInitializedTransaction()
{
KernelTransactionImplementation tx = newNotInitializedTransaction( true );
KernelTransactionImplementation tx = newNotInitializedTransaction();

tx.markForTermination( Status.General.UnknownError );

Expand All @@ -453,7 +453,7 @@ public void markForTerminationNotInitializedTransaction()
public void markForTerminationInitializedTransaction()
{
Locks.Client locksClient = mock( Locks.Client.class );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient, true );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient );

tx.markForTermination( Status.General.UnknownError );

Expand All @@ -465,7 +465,7 @@ public void markForTerminationInitializedTransaction()
public void markForTerminationTerminatedTransaction()
{
Locks.Client locksClient = mock( Locks.Client.class );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient, true );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient );
transactionInitializer.accept( tx );

tx.markForTermination( Status.Transaction.Terminated );
Expand All @@ -481,7 +481,7 @@ public void markForTerminationTerminatedTransaction()
public void terminatedTxMarkedNeitherSuccessNorFailureClosesWithoutThrowing() throws TransactionFailureException
{
Locks.Client locksClient = mock( Locks.Client.class );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient, true );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient );
transactionInitializer.accept( tx );
tx.markForTermination( Status.General.UnknownError );

Expand All @@ -495,7 +495,7 @@ public void terminatedTxMarkedNeitherSuccessNorFailureClosesWithoutThrowing() th
public void terminatedTxMarkedForSuccessThrowsOnClose()
{
Locks.Client locksClient = mock( Locks.Client.class );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient, true );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient );
transactionInitializer.accept( tx );
tx.success();
tx.markForTermination( Status.General.UnknownError );
Expand All @@ -515,7 +515,7 @@ public void terminatedTxMarkedForSuccessThrowsOnClose()
public void terminatedTxMarkedForFailureClosesWithoutThrowing() throws TransactionFailureException
{
Locks.Client locksClient = mock( Locks.Client.class );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient, true );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient );
transactionInitializer.accept( tx );
tx.failure();
tx.markForTermination( Status.General.UnknownError );
Expand All @@ -530,7 +530,7 @@ public void terminatedTxMarkedForFailureClosesWithoutThrowing() throws Transacti
public void terminatedTxMarkedForBothSuccessAndFailureThrowsOnClose()
{
Locks.Client locksClient = mock( Locks.Client.class );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient, true );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient );
transactionInitializer.accept( tx );
tx.success();
tx.failure();
Expand All @@ -550,7 +550,7 @@ public void terminatedTxMarkedForBothSuccessAndFailureThrowsOnClose()
public void txMarkedForBothSuccessAndFailureThrowsOnClose()
{
Locks.Client locksClient = mock( Locks.Client.class );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient, true );
KernelTransactionImplementation tx = newTransaction( accessMode(), locksClient );
tx.success();
tx.failure();

Expand Down
Expand Up @@ -360,7 +360,7 @@ private static class TestKernelTransaction extends KernelTransactionImplementati
mock( ConstraintIndexCreator.class ), new Procedures(), TransactionHeaderInformationFactory.DEFAULT,
mock( TransactionCommitProcess.class ), monitor, () -> mock( LegacyIndexTransactionState.class ),
mock( Pool.class ), new FakeClock(), TransactionTracer.NULL,
mock( StorageEngine.class, RETURNS_MOCKS ), true );
mock( StorageEngine.class, RETURNS_MOCKS ) );

this.monitor = monitor;
}
Expand Down
Expand Up @@ -97,30 +97,29 @@ public KernelTransactionImplementation newTransaction( AccessMode accessMode )
return newTransaction( 0, accessMode );
}

public KernelTransactionImplementation newTransaction( AccessMode accessMode, Locks.Client locks,
boolean txTerminationAwareLocks )
public KernelTransactionImplementation newTransaction( AccessMode accessMode, Locks.Client locks )
{
return newTransaction( 0, accessMode, locks, txTerminationAwareLocks );
return newTransaction( 0, accessMode, locks );
}

public KernelTransactionImplementation newTransaction( long lastTransactionIdWhenStarted, AccessMode accessMode )
{
return newTransaction( lastTransactionIdWhenStarted, accessMode, new NoOpClient(), false );
return newTransaction( lastTransactionIdWhenStarted, accessMode, new NoOpClient() );
}

public KernelTransactionImplementation newTransaction( long lastTransactionIdWhenStarted, AccessMode accessMode,
Locks.Client locks, boolean txTerminationAwareLocks )
Locks.Client locks )
{
KernelTransactionImplementation tx = newNotInitializedTransaction( txTerminationAwareLocks );
KernelTransactionImplementation tx = newNotInitializedTransaction();
tx.initialize( lastTransactionIdWhenStarted, BASE_TX_COMMIT_TIMESTAMP,locks, Type.implicit, accessMode );
return tx;
}

public KernelTransactionImplementation newNotInitializedTransaction( boolean txTerminationAwareLocks )
public KernelTransactionImplementation newNotInitializedTransaction()
{
return new KernelTransactionImplementation( null, schemaWriteGuard, hooks, null, null, headerInformationFactory,
commitProcess, transactionMonitor, legacyIndexStateSupplier, txPool, clock, TransactionTracer.NULL,
storageEngine, txTerminationAwareLocks );
storageEngine );
}

public class CapturingCommitProcess implements TransactionCommitProcess
Expand Down
Expand Up @@ -39,13 +39,9 @@
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.TransactionId;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
Expand Down Expand Up @@ -436,8 +432,7 @@ private static KernelTransactions newKernelTransactions( TransactionCommitProces
null, null, null, TransactionHeaderInformationFactory.DEFAULT,
commitProcess, null,
null, new TransactionHooks(), mock( TransactionMonitor.class ), life,
tracers, storageEngine, new Procedures(), transactionIdStore, Config.empty(),
Clock.SYSTEM_CLOCK );
tracers, storageEngine, new Procedures(), transactionIdStore, Clock.SYSTEM_CLOCK );
}

private static TransactionCommitProcess newRememberingCommitProcess( final TransactionRepresentation[] slot )
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.logging.LogProvider;

Expand All @@ -34,7 +33,6 @@ public class SlaveLockManager implements Locks
private final Master master;
private final AvailabilityGuard availabilityGuard;
private final LogProvider logProvider;
private final boolean txTerminationAwareLocks;

public SlaveLockManager( Locks localLocks, RequestContextFactory requestContextFactory, Master master,
AvailabilityGuard availabilityGuard, LogProvider logProvider, Config config )
Expand All @@ -44,15 +42,13 @@ public SlaveLockManager( Locks localLocks, RequestContextFactory requestContextF
this.local = localLocks;
this.master = master;
this.logProvider = logProvider;
this.txTerminationAwareLocks = config.get( KernelTransactions.tx_termination_aware_locks );
}

@Override
public Client newClient()
{
Client client = local.newClient();
return new SlaveLocksClient( master, client, local, requestContextFactory, availabilityGuard, logProvider,
txTerminationAwareLocks );
return new SlaveLocksClient( master, client, local, requestContextFactory, availabilityGuard, logProvider );
}

@Override
Expand Down
Expand Up @@ -35,10 +35,10 @@
import org.neo4j.kernel.impl.locking.LockClientStoppedException;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.storageengine.api.lock.ResourceType;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.storageengine.api.lock.ResourceType;

import static org.neo4j.kernel.impl.locking.LockType.READ;
import static org.neo4j.kernel.impl.locking.LockType.WRITE;
Expand All @@ -63,7 +63,6 @@ class SlaveLocksClient implements Locks.Client
private final Map<ResourceType, Map<Long, AtomicInteger>> sharedLocks;
private final Map<ResourceType, Map<Long, AtomicInteger>> exclusiveLocks;
private final Log log;
private final boolean txTerminationAwareLocks;
private boolean initialized;
private volatile boolean stopped;

Expand All @@ -73,16 +72,14 @@ public SlaveLocksClient(
Locks localLockManager,
RequestContextFactory requestContextFactory,
AvailabilityGuard availabilityGuard,
LogProvider logProvider,
boolean txTerminationAwareLocks )
LogProvider logProvider )
{
this.master = master;
this.client = local;
this.localLockManager = localLockManager;
this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard;
this.log = logProvider.getLog( getClass() );
this.txTerminationAwareLocks = txTerminationAwareLocks;
sharedLocks = new HashMap<>();
exclusiveLocks = new HashMap<>();
}
Expand Down Expand Up @@ -206,12 +203,9 @@ public void releaseExclusive( ResourceType resourceType, long resourceId )
@Override
public void stop()
{
if ( txTerminationAwareLocks )
{
client.stop();
stopLockSessionOnMaster();
stopped = true;
}
client.stop();
stopLockSessionOnMaster();
stopped = true;
}

@Override
Expand Down
7 changes: 2 additions & 5 deletions enterprise/ha/src/test/java/org/neo4j/ha/TestPullUpdates.java
Expand Up @@ -45,10 +45,8 @@
import org.neo4j.graphdb.TransientTransactionFailureException;
import org.neo4j.graphdb.factory.TestHighlyAvailableGraphDatabaseFactory;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.ha.HaSettings;
import org.neo4j.kernel.ha.HighlyAvailableGraphDatabase;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.ha.ClusterManager;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
Expand Down Expand Up @@ -133,15 +131,14 @@ public void makeSureUpdatePullerGetsGoingAfterMasterSwitch() throws Throwable
}

@Test
public void terminatedTransactionDoesNotForceUpdatePullingWithTxTerminationAwareLocks() throws Throwable
public void terminatedTransactionDoesNotForceUpdatePulling() throws Throwable
{
int testTxsOnMaster = 42;
File root = testDirectory.directory( testName.getMethodName() );
ClusterManager clusterManager = new ClusterManager.Builder( root )
.withSharedConfig( MapUtil.stringMap(
HaSettings.pull_interval.name(), "0s",
HaSettings.tx_push_factor.name(), "0",
KernelTransactions.tx_termination_aware_locks.name(), Settings.TRUE ) ).build();
HaSettings.tx_push_factor.name(), "0" ) ).build();
clusterManager.start();
cluster = clusterManager.getCluster();

Expand Down

0 comments on commit 187c1a2

Please sign in to comment.