Skip to content

Commit

Permalink
Remove synchronization abound KTI#initialize()
Browse files Browse the repository at this point in the history
Instead reference to it's Locks.Client is made volatile to be visible in
threads that try to terminate the KTI.
  • Loading branch information
lutovich committed Jul 26, 2016
1 parent 8c33ba6 commit 88e9f52
Showing 1 changed file with 28 additions and 38 deletions.
Expand Up @@ -144,7 +144,7 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private final StorageStatement storageStatement;
private CloseListener closeListener;
private AccessMode accessMode;
private Locks.Client locks;
private volatile Locks.Client locks;
private boolean beforeHookInvoked;
private volatile boolean closing, closed;
private boolean failure, success;
Expand All @@ -157,14 +157,13 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private volatile int reuseCount;

/**
* Lock prevents transaction termination ({@link #markForTermination(Status)} and
* {@link #markForTermination(long, Status)}) from interfering with
* {@link #initialize(long, long, Locks.Client, Type, AccessMode) transaction initialization} and
* {@link #close() transaction commit}. Termination can run concurrently with initialization and commit so we
* need to make sure that it terminates the right lock client and the right transaction
* (with the right {@link #reuseCount}) because {@link KernelTransactionImplementation} instances are pooled.
* Lock prevents transaction {@link #markForTermination(Status)} transaction termination} from interfering with
* {@link #close() transaction commit} and specifically with {@link #release()}.
* Termination can run concurrently with commit and we need to make sure that it terminates the right lock client
* and the right transaction (with the right {@link #reuseCount}) because {@link KernelTransactionImplementation}
* instances are pooled.
*/
private final Lock terminationReuseLock = new ReentrantLock();
private final Lock terminationReleaseLock = new ReentrantLock();

public KernelTransactionImplementation( StatementOperationParts operations,
SchemaWriteGuard schemaWriteGuard,
Expand Down Expand Up @@ -205,28 +204,19 @@ public KernelTransactionImplementation( StatementOperationParts operations,
public KernelTransactionImplementation initialize(
long lastCommittedTx, long lastTimeStamp, Locks.Client locks, Type type, AccessMode accessMode )
{
// guarded by the lock to coordinate with concurrent termination attempts
terminationReuseLock.lock();
try
{
this.type = type;
this.locks = locks;
this.terminationReason = null;
this.closing = closed = failure = success = beforeHookInvoked = false;
this.writeState = TransactionWriteState.NONE;
this.startTimeMillis = clock.currentTimeMillis();
this.lastTransactionIdWhenStarted = lastCommittedTx;
this.lastTransactionTimestampWhenStarted = lastTimeStamp;
this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.accessMode = accessMode;
this.currentStatement.initialize( locks );
return this;
}
finally
{
terminationReuseLock.unlock();
}
this.type = type;
this.locks = locks;
this.terminationReason = null;
this.closing = closed = failure = success = beforeHookInvoked = false;
this.writeState = TransactionWriteState.NONE;
this.startTimeMillis = clock.currentTimeMillis();
this.lastTransactionIdWhenStarted = lastCommittedTx;
this.lastTransactionTimestampWhenStarted = lastTimeStamp;
this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.accessMode = accessMode;
this.currentStatement.initialize( locks );
return this;
}

int getReuseCount()
Expand Down Expand Up @@ -266,7 +256,7 @@ public Status getReasonIfTerminated()

void markForTermination( long expectedReuseCount, Status reason )
{
terminationReuseLock.lock();
terminationReleaseLock.lock();
try
{
if ( expectedReuseCount == reuseCount )
Expand All @@ -276,27 +266,27 @@ void markForTermination( long expectedReuseCount, Status reason )
}
finally
{
terminationReuseLock.unlock();
terminationReleaseLock.unlock();
}
}

/**
* {@inheritDoc}
* <p>
* This method is guarded by {@link #terminationReuseLock} to coordinate concurrent
* This method is guarded by {@link #terminationReleaseLock} to coordinate concurrent
* {@link #close()} and {@link #release()} calls.
*/
@Override
public void markForTermination( Status reason )
{
terminationReuseLock.lock();
terminationReleaseLock.lock();
try
{
markForTerminationIfPossible( reason );
}
finally
{
terminationReuseLock.unlock();
terminationReleaseLock.unlock();
}
}

Expand Down Expand Up @@ -671,12 +661,12 @@ private void afterRollback()

/**
* Release resources held up by this transaction & return it to the transaction pool.
* This method is guarded by {@link #terminationReuseLock} to coordinate concurrent
* This method is guarded by {@link #terminationReleaseLock} to coordinate concurrent
* {@link #markForTermination(Status)} calls.
*/
private void release()
{
terminationReuseLock.lock();
terminationReleaseLock.lock();
try
{
locks.close();
Expand All @@ -694,7 +684,7 @@ private void release()
}
finally
{
terminationReuseLock.unlock();
terminationReleaseLock.unlock();
}
}

Expand Down

0 comments on commit 88e9f52

Please sign in to comment.