Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safe freeing of ids for HA slaves #7511

Merged
merged 8 commits into from
Jul 11, 2016
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ public void interrupt()
KernelTransaction tx = this.currentTransaction;
if(tx != null)
{
tx.markForTermination();
tx.markForTermination( Status.Transaction.Terminated );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@
import org.neo4j.bolt.v1.runtime.internal.concurrent.ThreadedSessions;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -265,17 +266,23 @@ public AccessMode mode()
}

@Override
public boolean shouldBeTerminated()
public Status getReasonIfTerminated()
{
return false;
return null;
}

@Override
public void markForTermination()
public void markForTermination( Status reason )
{

}

@Override
public long lastTransactionTimestampWhenStarted()
{
return 0;
}

@Override
public void registerCloseListener( CloseListener listener )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,15 @@ enum Transaction implements Status
"assumptions the instance has made about how to execute the transaction " +
"to be violated - meaning the transaction must be rolled " +
"back. If you see this error, you should retry your operation in a new transaction."),
ConstraintsChanged( TransientError, "Database constraints changed since the start of this transaction" );
ConstraintsChanged( TransientError,
"Database constraints changed since the start of this transaction" ),
Outdated( TransientError,
"Transaction has seen state which has been invalidated by applied updates while " +
"transaction was active. Transaction may succeed if retried." ),
LockClientStopped( TransientError,
"Transaction terminated, no more locks can be acquired." ),
Terminated( TransientError,
"Explicitly terminated by the user." );

private final Code code;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,31 @@
*/
package org.neo4j.graphdb;

import static java.util.Objects.requireNonNull;
import org.neo4j.kernel.api.exceptions.Status;

/**
* Signals that the transaction within which the failed operations ran
* has been terminated with {@link Transaction#terminate()}.
*/
public class TransactionTerminatedException extends TransactionFailureException
public class TransactionTerminatedException extends TransactionFailureException implements Status.HasStatus
{
public TransactionTerminatedException()
private final Status status;

public TransactionTerminatedException( Status status )
{
this( status, "" );
}

protected TransactionTerminatedException( Status status, String additionalInfo )
{
this( "" );
super( "The transaction has been terminated. Retry your operation in a new transaction, " +
"and you should see a successful result. " + status.code().description() + " " + additionalInfo );
this.status = status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are missing a space in the message, before Simply. Also, I don't think the word "simply" adds very much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also no space between the message and the status code description.

}

protected TransactionTerminatedException( String info )
@Override
public Status status()
{
super( requireNonNull( info ) + "\n" +
"The transaction has been terminated, no new operations in it " +
"are allowed. This normally happens because a client explicitly asks to terminate the transaction, " +
"for instance to stop a long-running operation. It may also happen because an operator has asked the " +
"database to be shut down, or because the current instance is about to perform a cluster role" +
"change. Simply retry your operation in a new transaction, and you should see a successful result." );
return status;
}
}
13 changes: 7 additions & 6 deletions community/kernel/src/main/java/org/neo4j/helpers/TimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public final class TimeUtil
{
int amount = Integer.parseInt( timeWithOrWithoutUnit.substring( 0, unitIndex ) );
String unit = timeWithOrWithoutUnit.substring( unitIndex ).toLowerCase();
TimeUnit timeUnit = null;
int multiplyFactor = 1;
TimeUnit timeUnit;
if ( unit.equals( "ms" ) )
{
timeUnit = TimeUnit.MILLISECONDS;
Expand All @@ -57,15 +56,17 @@ else if ( unit.equals( "s" ) )
}
else if ( unit.equals( "m" ) )
{
// This is only for having to rely on 1.6
timeUnit = TimeUnit.SECONDS;
multiplyFactor = 60;
timeUnit = TimeUnit.MINUTES;
}
else if ( unit.equals( "h" ) )
{
timeUnit = TimeUnit.HOURS;
}
else
{
throw new RuntimeException( "Unrecognized unit " + unit );
}
return timeUnit.toMillis( amount * multiplyFactor );
return timeUnit.toMillis( amount );
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdReuseEligibility;
import org.neo4j.kernel.impl.storemigration.DatabaseMigrator;
import org.neo4j.kernel.impl.storemigration.monitoring.VisibleMigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
Expand Down Expand Up @@ -791,7 +792,7 @@ private KernelModule buildKernel( TransactionAppender appender,
KernelTransactions kernelTransactions = life.add( new KernelTransactions( locks, constraintIndexCreator,
statementOperations, schemaWriteGuard, transactionHeaderInformationFactory, transactionCommitProcess,
indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor, life, tracers, storageEngine,
procedures, transactionIdStore, config ) );
procedures, transactionIdStore, config, Clock.SYSTEM_CLOCK ) );

final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.api;

import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;

Expand Down Expand Up @@ -123,16 +124,21 @@ interface CloseListener
AccessMode mode();

/**
* @return {@code true} if {@link #markForTermination()} has been invoked, otherwise {@code false}.
* @return {@code true} if {@link #markForTermination(Status)} has been invoked, otherwise {@code false}.
*/
boolean shouldBeTerminated();
Status getReasonIfTerminated();

/**
* Marks this transaction for termination, such that it cannot commit successfully and will try to be
* terminated by having other methods throw a specific termination exception, as to sooner reach the assumed
* point where {@link #close()} will be invoked.
*/
void markForTermination();
void markForTermination( Status reason );

/**
* @return The timestamp of the last transaction that was committed to the store when this transaction started.
*/
long lastTransactionTimestampWhenStarted();

/**
* Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.api.cursor;

import org.neo4j.collection.primitive.PrimitiveIntCollection;
import org.neo4j.collection.primitive.PrimitiveIntIterator;
import org.neo4j.collection.primitive.PrimitiveIntStack;
import org.neo4j.cursor.Cursor;
Expand Down Expand Up @@ -57,7 +58,7 @@ public Object getProperty( int propertyKeyId )
}

@Override
public PrimitiveIntIterator getPropertyKeys()
public PrimitiveIntCollection getPropertyKeys()
{
PrimitiveIntStack keys = new PrimitiveIntStack();
try ( Cursor<PropertyItem> properties = properties() )
Expand All @@ -68,6 +69,6 @@ public PrimitiveIntIterator getPropertyKeys()
}
}

return keys.iterator();
return keys;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.TokenWriteOperations;
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.storageengine.api.StorageStatement;

/**
Expand Down Expand Up @@ -151,9 +152,11 @@ void assertOpen()
{
throw new NotInTransactionException( "The statement has been closed." );
}
if ( transaction.shouldBeTerminated() )

Status terminationReason = transaction.getReasonIfTerminated();
if ( terminationReason != null )
{
throw new TransactionTerminatedException();
throw new TransactionTerminatedException( terminationReason );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,17 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private boolean beforeHookInvoked;
private boolean closing, closed;
private boolean failure, success;
private volatile boolean terminated;
private volatile Status terminationReason;
private long startTimeMillis;
private long lastTransactionIdWhenStarted;
private long lastTransactionTimestampWhenStarted;
private TransactionEvent transactionEvent;
private Type type;
private volatile int reuseCount;

/**
* Lock prevents transaction {@link #markForTermination() transction termination} from interfering with {@link
* #close() transaction commit} and specifically with {@link #release()}.
* Lock prevents transaction {@link #markForTermination(Status)} transaction termination} from interfering with
* {@link #close() transaction commit} and specifically with {@link #release()}.
* Termination can run concurrently with commit and we need to make sure that it terminates the right lock client
* and the right transaction (with the right {@link #reuseCount}) because {@link KernelTransactionImplementation}
* instances are pooled.
Expand Down Expand Up @@ -201,14 +202,16 @@ public KernelTransactionImplementation( StatementOperationParts operations,
* Reset this transaction to a vanilla state, turning it into a logically new transaction.
*/
public KernelTransactionImplementation initialize(
long lastCommittedTx, Locks.Client locks, Type type, AccessMode accessMode )
long lastCommittedTx, long lastTimeStamp, Locks.Client locks, Type type, AccessMode accessMode )
{
this.type = type;
this.locks = locks;
this.closing = closed = failure = success = terminated = beforeHookInvoked = false;
this.terminationReason = null;
this.closing = closed = failure = success = beforeHookInvoked = false;
this.writeState = TransactionWriteState.NONE;
this.startTimeMillis = clock.currentTimeMillis();
this.lastTransactionIdWhenStarted = lastCommittedTx;
this.lastTransactionTimestampWhenStarted = lastTimeStamp;
this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.accessMode = accessMode;
Expand All @@ -234,9 +237,9 @@ public void failure()
}

@Override
public boolean shouldBeTerminated()
public Status getReasonIfTerminated()
{
return terminated;
return terminationReason;
}

/**
Expand All @@ -246,7 +249,7 @@ public boolean shouldBeTerminated()
* {@link #close()} and {@link #release()} calls.
*/
@Override
public void markForTermination()
public void markForTermination( Status reason )
{
if ( !canBeTerminated() )
{
Expand All @@ -264,7 +267,7 @@ public void markForTermination()
if ( stillSameTransaction && canBeTerminated() )
{
failure = true;
terminated = true;
terminationReason = reason;
if ( txTerminationAwareLocks && locks != null )
{
locks.stop();
Expand Down Expand Up @@ -409,7 +412,7 @@ public void close() throws TransactionFailureException
closing = true;
try
{
if ( failure || !success || terminated )
if ( failure || !success || isTerminated() )
{
rollback();
failOnNonExplicitRollbackIfNeeded();
Expand Down Expand Up @@ -454,9 +457,9 @@ public void close() throws TransactionFailureException
*/
private void failOnNonExplicitRollbackIfNeeded() throws TransactionFailureException
{
if ( success && terminated )
if ( success && isTerminated() )
{
throw new TransactionTerminatedException();
throw new TransactionTerminatedException( terminationReason );
}
if ( success )
{
Expand Down Expand Up @@ -636,7 +639,7 @@ private void afterRollback()
/**
* Release resources held up by this transaction & return it to the transaction pool.
* This method is guarded by {@link #terminationReleaseLock} to coordinate concurrent
* {@link #markForTermination()} calls.
* {@link #markForTermination(Status)} calls.
*/
private void release()
{
Expand All @@ -645,7 +648,7 @@ private void release()
{
locks.close();
locks = null;
terminated = false;
terminationReason = null;
type = null;
accessMode = null;
transactionEvent = null;
Expand All @@ -668,7 +671,18 @@ private void release()
*/
private boolean canBeTerminated()
{
return !closed && !terminated;
return !closed && !isTerminated();
}

private boolean isTerminated()
{
return terminationReason != null;
}

@Override
public long lastTransactionTimestampWhenStarted()
{
return lastTransactionTimestampWhenStarted;
}

@Override
Expand Down