Skip to content

Commit

Permalink
Merge pull request #7612 from lutovich/3.0-safer-tx-termination
Browse files Browse the repository at this point in the history
Safer KTI termination
  • Loading branch information
MishaDemianenko committed Jul 31, 2016
2 parents cd04bbe + c5f6f11 commit ed0e510
Show file tree
Hide file tree
Showing 12 changed files with 573 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ interface CloseListener

/**
* @return start time of this transaction, i.e. basically {@link System#currentTimeMillis()} when user called
* {@link Kernel#newTransaction()}.
* {@link Kernel#newTransaction(Type, AccessMode)}.
*/
long localStartTime();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.api;

import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.impl.api.Kernel;

/**
* View of a {@link KernelTransaction} that provides a limited set of actions against the transaction.
*/
public interface KernelTransactionHandle
{
/**
* The id of the last transaction that was committed to the store when the underlying transaction started.
*
* @return the committed transaction id.
*/
long lastTransactionIdWhenStarted();

/**
* The timestamp of the last transaction that was committed to the store when the underlying transaction started.
*
* @return the timestamp value obtained with {@link System#currentTimeMillis()}.
*/
long lastTransactionTimestampWhenStarted();

/**
* The start time of the underlying transaction. I.e. basically {@link System#currentTimeMillis()} when user
* called {@link Kernel#newTransaction(KernelTransaction.Type, AccessMode)}.
*
* @return the transaction start time.
*/
long localStartTime();

/**
* Check if the underlying transaction is open.
*
* @return {@code true} if the underlying transaction ({@link KernelTransaction#close()} was not called),
* {@code false} otherwise.
*/
boolean isOpen();

/**
* Mark the underlying transaction for termination.
*
* @param reason the reason for termination.
*/
void markForTermination( Status reason );
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,14 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private final StorageStatement storageStatement;
private CloseListener closeListener;
private AccessMode accessMode;
private Locks.Client locks;
private volatile Locks.Client locks;
private boolean beforeHookInvoked;
private boolean closing, closed;
private volatile boolean closing, closed;
private boolean failure, success;
private volatile Status terminationReason;
private long startTimeMillis;
private long lastTransactionIdWhenStarted;
private long lastTransactionTimestampWhenStarted;
private volatile long lastTransactionTimestampWhenStarted;
private TransactionEvent transactionEvent;
private Type type;
private volatile int reuseCount;
Expand Down Expand Up @@ -254,6 +254,22 @@ public Status getReasonIfTerminated()
return terminationReason;
}

void markForTermination( long expectedReuseCount, Status reason )
{
terminationReleaseLock.lock();
try
{
if ( expectedReuseCount == reuseCount )
{
markForTerminationIfPossible( reason );
}
}
finally
{
terminationReleaseLock.unlock();
}
}

/**
* {@inheritDoc}
* <p>
Expand All @@ -263,36 +279,31 @@ public Status getReasonIfTerminated()
@Override
public void markForTermination( Status reason )
{
if ( !canBeTerminated() )
{
return;
}

int initialReuseCount = reuseCount;
terminationReleaseLock.lock();
try
{
// this instance could have been reused, make sure we are trying to terminate the right transaction
// without this check there exists a possibility to terminate lock client that has just been returned to
// the pool or a transaction that was reused and represents a completely different logical transaction
boolean stillSameTransaction = initialReuseCount == reuseCount;
if ( stillSameTransaction && canBeTerminated() )
{
failure = true;
terminationReason = reason;
if ( txTerminationAwareLocks && locks != null )
{
locks.stop();
}
transactionMonitor.transactionTerminated( hasTxStateWithChanges() );
}
markForTerminationIfPossible( reason );
}
finally
{
terminationReleaseLock.unlock();
}
}

private void markForTerminationIfPossible( Status reason )
{
if ( canBeTerminated() )
{
failure = true;
terminationReason = reason;
if ( txTerminationAwareLocks && locks != null )
{
locks.stop();
}
transactionMonitor.transactionTerminated( hasTxStateWithChanges() );
}
}

@Override
public boolean isOpen()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api;

import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.api.exceptions.Status;

/**
* A {@link KernelTransactionHandle} that wraps the given {@link KernelTransactionImplementation}.
* This handle knows that {@link KernelTransactionImplementation}s can be reused and represents a single logical
* transaction. This means that methods like {@link #markForTermination(Status)} can only terminate running
* transaction this handle was created for.
*/
class KernelTransactionImplementationHandle implements KernelTransactionHandle
{
private final long txReuseCount;
private final long lastTransactionIdWhenStarted;
private final long lastTransactionTimestampWhenStarted;
private final long localStartTime;
private final KernelTransactionImplementation tx;

KernelTransactionImplementationHandle( KernelTransactionImplementation tx )
{
this.txReuseCount = tx.getReuseCount();
this.lastTransactionIdWhenStarted = tx.lastTransactionIdWhenStarted();
this.lastTransactionTimestampWhenStarted = tx.lastTransactionTimestampWhenStarted();
this.localStartTime = tx.localStartTime();
this.tx = tx;
}

@Override
public long lastTransactionIdWhenStarted()
{
return lastTransactionIdWhenStarted;
}

@Override
public long lastTransactionTimestampWhenStarted()
{
return lastTransactionTimestampWhenStarted;
}

@Override
public long localStartTime()
{
return localStartTime;
}

@Override
public boolean isOpen()
{
return tx.isOpen() && txReuseCount == tx.getReuseCount();
}

@Override
public void markForTermination( Status reason )
{
tx.markForTermination( txReuseCount, reason );
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
KernelTransactionImplementationHandle that = (KernelTransactionImplementationHandle) o;
return txReuseCount == that.txReuseCount && tx.equals( that.tx );
}

@Override
public int hashCode()
{
return 31 * (int) (txReuseCount ^ (txReuseCount >>> 32)) + tx.hashCode();
}

@Override
public String toString()
{
return "KernelTransactionImplementationHandle{txReuseCount=" + txReuseCount + ", tx=" + tx + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.impl.api;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -32,6 +31,7 @@
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.KernelTransactionHandle;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
Expand All @@ -52,6 +52,7 @@
import org.neo4j.storageengine.api.StorageEngine;

import static java.util.Collections.newSetFromMap;
import static java.util.stream.Collectors.toSet;
import static org.neo4j.kernel.configuration.Settings.setting;

/**
Expand Down Expand Up @@ -199,18 +200,25 @@ protected void dispose( KernelTransactionImplementation tx )
*
* @return the set of open transactions.
*/
public Set<KernelTransaction> activeTransactions()
public Set<KernelTransactionHandle> activeTransactions()
{
Set<KernelTransaction> output = new HashSet<>();
for ( KernelTransactionImplementation tx : allTransactions )
{
if ( tx.isOpen() )
{
output.add( tx );
}
}
return allTransactions.stream()
.map( this::createHandle )
.filter( KernelTransactionHandle::isOpen )
.collect( toSet() );
}

return output;
/**
* Create new handle for the given transaction.
* <p>
* <b>Note:</b> this method is package-private for testing <b>only</b>.
*
* @param tx transaction to wrap.
* @return transaction handle.
*/
KernelTransactionHandle createHandle( KernelTransactionImplementation tx )
{
return new KernelTransactionImplementationHandle( tx );
}

/**
Expand Down Expand Up @@ -256,7 +264,7 @@ private void assertDatabaseIsRunning()
@Override
public KernelTransactionsSnapshot get()
{
return new KernelTransactionsSnapshot( allTransactions, clock.currentTimeMillis() );
return new KernelTransactionsSnapshot( activeTransactions(), clock.currentTimeMillis() );
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import java.util.Set;

import org.neo4j.kernel.api.KernelTransactionHandle;

/**
* An instance of this class can get a snapshot of all currently running transactions and be able to tell
* later if all transactions which were running when it was constructed have closed.
*
* <p>
* Creating a snapshot creates a list and one additional book keeping object per open transaction.
* No thread doing normal transaction work should create snapshots, only threads that monitor transactions.
*/
Expand All @@ -33,14 +35,14 @@ public class KernelTransactionsSnapshot
private Tx relevantTransactions;
private final long snapshotTime;

public KernelTransactionsSnapshot( Set<KernelTransactionImplementation> allTransactions, long snapshotTime )
public KernelTransactionsSnapshot( Set<KernelTransactionHandle> allTransactions, long snapshotTime )
{
Tx head = null;
for ( KernelTransactionImplementation tx : allTransactions )
for ( KernelTransactionHandle tx : allTransactions )
{
if ( tx.isOpen() )
{
Tx current = new Tx( tx, tx.getReuseCount() );
Tx current = new Tx( tx );
if ( head != null )
{
current.next = head;
Expand Down Expand Up @@ -81,19 +83,17 @@ public long snapshotTime()

private static class Tx
{
private final KernelTransactionImplementation transaction;
private final int reuseCount;
private final KernelTransactionHandle transaction;
private Tx next;

Tx( KernelTransactionImplementation tx, int reuseCount )
Tx( KernelTransactionHandle tx )
{
this.transaction = tx;
this.reuseCount = reuseCount;
}

boolean haveClosed()
{
return transaction.getReuseCount() != reuseCount;
return !transaction.isOpen();
}
}
}

0 comments on commit ed0e510

Please sign in to comment.