Skip to content

Commit

Permalink
Merge pull request #7511 from burqen/3.0-free-ids-in-ha
Browse files Browse the repository at this point in the history
Safe freeing of ids for HA slaves
  • Loading branch information
chrisvest committed Jul 11, 2016
2 parents 29781f7 + 2a2a7a6 commit 17d1379
Show file tree
Hide file tree
Showing 92 changed files with 2,802 additions and 634 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ public void interrupt()
KernelTransaction tx = this.currentTransaction;
if(tx != null)
{
tx.markForTermination();
tx.markForTermination( Status.Transaction.Terminated );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@
import org.neo4j.bolt.v1.runtime.internal.concurrent.ThreadedSessions;
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.lifecycle.LifeSupport;
Expand Down Expand Up @@ -265,17 +266,23 @@ public AccessMode mode()
}

@Override
public boolean shouldBeTerminated()
public Status getReasonIfTerminated()
{
return false;
return null;
}

@Override
public void markForTermination()
public void markForTermination( Status reason )
{

}

@Override
public long lastTransactionTimestampWhenStarted()
{
return 0;
}

@Override
public void registerCloseListener( CloseListener listener )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,15 @@ enum Transaction implements Status
"assumptions the instance has made about how to execute the transaction " +
"to be violated - meaning the transaction must be rolled " +
"back. If you see this error, you should retry your operation in a new transaction."),
ConstraintsChanged( TransientError, "Database constraints changed since the start of this transaction" );
ConstraintsChanged( TransientError,
"Database constraints changed since the start of this transaction" ),
Outdated( TransientError,
"Transaction has seen state which has been invalidated by applied updates while " +
"transaction was active. Transaction may succeed if retried." ),
LockClientStopped( TransientError,
"Transaction terminated, no more locks can be acquired." ),
Terminated( TransientError,
"Explicitly terminated by the user." );

private final Code code;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,31 @@
*/
package org.neo4j.graphdb;

import static java.util.Objects.requireNonNull;
import org.neo4j.kernel.api.exceptions.Status;

/**
* Signals that the transaction within which the failed operations ran
* has been terminated with {@link Transaction#terminate()}.
*/
public class TransactionTerminatedException extends TransactionFailureException
public class TransactionTerminatedException extends TransactionFailureException implements Status.HasStatus
{
public TransactionTerminatedException()
private final Status status;

public TransactionTerminatedException( Status status )
{
this( status, "" );
}

protected TransactionTerminatedException( Status status, String additionalInfo )
{
this( "" );
super( "The transaction has been terminated. Retry your operation in a new transaction, " +
"and you should see a successful result. " + status.code().description() + " " + additionalInfo );
this.status = status;
}

protected TransactionTerminatedException( String info )
@Override
public Status status()
{
super( requireNonNull( info ) + "\n" +
"The transaction has been terminated, no new operations in it " +
"are allowed. This normally happens because a client explicitly asks to terminate the transaction, " +
"for instance to stop a long-running operation. It may also happen because an operator has asked the " +
"database to be shut down, or because the current instance is about to perform a cluster role" +
"change. Simply retry your operation in a new transaction, and you should see a successful result." );
return status;
}
}
13 changes: 7 additions & 6 deletions community/kernel/src/main/java/org/neo4j/helpers/TimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public final class TimeUtil
{
int amount = Integer.parseInt( timeWithOrWithoutUnit.substring( 0, unitIndex ) );
String unit = timeWithOrWithoutUnit.substring( unitIndex ).toLowerCase();
TimeUnit timeUnit = null;
int multiplyFactor = 1;
TimeUnit timeUnit;
if ( unit.equals( "ms" ) )
{
timeUnit = TimeUnit.MILLISECONDS;
Expand All @@ -57,15 +56,17 @@ else if ( unit.equals( "s" ) )
}
else if ( unit.equals( "m" ) )
{
// This is only for having to rely on 1.6
timeUnit = TimeUnit.SECONDS;
multiplyFactor = 60;
timeUnit = TimeUnit.MINUTES;
}
else if ( unit.equals( "h" ) )
{
timeUnit = TimeUnit.HOURS;
}
else
{
throw new RuntimeException( "Unrecognized unit " + unit );
}
return timeUnit.toMillis( amount * multiplyFactor );
return timeUnit.toMillis( amount );
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdReuseEligibility;
import org.neo4j.kernel.impl.storemigration.DatabaseMigrator;
import org.neo4j.kernel.impl.storemigration.monitoring.VisibleMigrationProgressMonitor;
import org.neo4j.kernel.impl.storemigration.participant.StoreMigrator;
Expand Down Expand Up @@ -791,7 +792,7 @@ private KernelModule buildKernel( TransactionAppender appender,
KernelTransactions kernelTransactions = life.add( new KernelTransactions( locks, constraintIndexCreator,
statementOperations, schemaWriteGuard, transactionHeaderInformationFactory, transactionCommitProcess,
indexConfigStore, legacyIndexProviderLookup, hooks, transactionMonitor, life, tracers, storageEngine,
procedures, transactionIdStore, config ) );
procedures, transactionIdStore, config, Clock.SYSTEM_CLOCK ) );

final Kernel kernel = new Kernel( kernelTransactions, hooks, databaseHealth, transactionMonitor, procedures );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.api;

import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;

Expand Down Expand Up @@ -123,16 +124,21 @@ interface CloseListener
AccessMode mode();

/**
* @return {@code true} if {@link #markForTermination()} has been invoked, otherwise {@code false}.
* @return {@code true} if {@link #markForTermination(Status)} has been invoked, otherwise {@code false}.
*/
boolean shouldBeTerminated();
Status getReasonIfTerminated();

/**
* Marks this transaction for termination, such that it cannot commit successfully and will try to be
* terminated by having other methods throw a specific termination exception, as to sooner reach the assumed
* point where {@link #close()} will be invoked.
*/
void markForTermination();
void markForTermination( Status reason );

/**
* @return The timestamp of the last transaction that was committed to the store when this transaction started.
*/
long lastTransactionTimestampWhenStarted();

/**
* Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.api.cursor;

import org.neo4j.collection.primitive.PrimitiveIntCollection;
import org.neo4j.collection.primitive.PrimitiveIntIterator;
import org.neo4j.collection.primitive.PrimitiveIntStack;
import org.neo4j.cursor.Cursor;
Expand Down Expand Up @@ -57,7 +58,7 @@ public Object getProperty( int propertyKeyId )
}

@Override
public PrimitiveIntIterator getPropertyKeys()
public PrimitiveIntCollection getPropertyKeys()
{
PrimitiveIntStack keys = new PrimitiveIntStack();
try ( Cursor<PropertyItem> properties = properties() )
Expand All @@ -68,6 +69,6 @@ public PrimitiveIntIterator getPropertyKeys()
}
}

return keys.iterator();
return keys;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.TokenWriteOperations;
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.storageengine.api.StorageStatement;

/**
Expand Down Expand Up @@ -151,9 +152,11 @@ void assertOpen()
{
throw new NotInTransactionException( "The statement has been closed." );
}
if ( transaction.shouldBeTerminated() )

Status terminationReason = transaction.getReasonIfTerminated();
if ( terminationReason != null )
{
throw new TransactionTerminatedException();
throw new TransactionTerminatedException( terminationReason );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,17 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
private boolean beforeHookInvoked;
private boolean closing, closed;
private boolean failure, success;
private volatile boolean terminated;
private volatile Status terminationReason;
private long startTimeMillis;
private long lastTransactionIdWhenStarted;
private long lastTransactionTimestampWhenStarted;
private TransactionEvent transactionEvent;
private Type type;
private volatile int reuseCount;

/**
* Lock prevents transaction {@link #markForTermination() transction termination} from interfering with {@link
* #close() transaction commit} and specifically with {@link #release()}.
* Lock prevents transaction {@link #markForTermination(Status)} transaction termination} from interfering with
* {@link #close() transaction commit} and specifically with {@link #release()}.
* Termination can run concurrently with commit and we need to make sure that it terminates the right lock client
* and the right transaction (with the right {@link #reuseCount}) because {@link KernelTransactionImplementation}
* instances are pooled.
Expand Down Expand Up @@ -201,14 +202,16 @@ public KernelTransactionImplementation( StatementOperationParts operations,
* Reset this transaction to a vanilla state, turning it into a logically new transaction.
*/
public KernelTransactionImplementation initialize(
long lastCommittedTx, Locks.Client locks, Type type, AccessMode accessMode )
long lastCommittedTx, long lastTimeStamp, Locks.Client locks, Type type, AccessMode accessMode )
{
this.type = type;
this.locks = locks;
this.closing = closed = failure = success = terminated = beforeHookInvoked = false;
this.terminationReason = null;
this.closing = closed = failure = success = beforeHookInvoked = false;
this.writeState = TransactionWriteState.NONE;
this.startTimeMillis = clock.currentTimeMillis();
this.lastTransactionIdWhenStarted = lastCommittedTx;
this.lastTransactionTimestampWhenStarted = lastTimeStamp;
this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.accessMode = accessMode;
Expand All @@ -234,9 +237,9 @@ public void failure()
}

@Override
public boolean shouldBeTerminated()
public Status getReasonIfTerminated()
{
return terminated;
return terminationReason;
}

/**
Expand All @@ -246,7 +249,7 @@ public boolean shouldBeTerminated()
* {@link #close()} and {@link #release()} calls.
*/
@Override
public void markForTermination()
public void markForTermination( Status reason )
{
if ( !canBeTerminated() )
{
Expand All @@ -264,7 +267,7 @@ public void markForTermination()
if ( stillSameTransaction && canBeTerminated() )
{
failure = true;
terminated = true;
terminationReason = reason;
if ( txTerminationAwareLocks && locks != null )
{
locks.stop();
Expand Down Expand Up @@ -409,7 +412,7 @@ public void close() throws TransactionFailureException
closing = true;
try
{
if ( failure || !success || terminated )
if ( failure || !success || isTerminated() )
{
rollback();
failOnNonExplicitRollbackIfNeeded();
Expand Down Expand Up @@ -454,9 +457,9 @@ public void close() throws TransactionFailureException
*/
private void failOnNonExplicitRollbackIfNeeded() throws TransactionFailureException
{
if ( success && terminated )
if ( success && isTerminated() )
{
throw new TransactionTerminatedException();
throw new TransactionTerminatedException( terminationReason );
}
if ( success )
{
Expand Down Expand Up @@ -636,7 +639,7 @@ private void afterRollback()
/**
* Release resources held up by this transaction & return it to the transaction pool.
* This method is guarded by {@link #terminationReleaseLock} to coordinate concurrent
* {@link #markForTermination()} calls.
* {@link #markForTermination(Status)} calls.
*/
private void release()
{
Expand All @@ -645,7 +648,7 @@ private void release()
{
locks.close();
locks = null;
terminated = false;
terminationReason = null;
type = null;
accessMode = null;
transactionEvent = null;
Expand All @@ -668,7 +671,18 @@ private void release()
*/
private boolean canBeTerminated()
{
return !closed && !terminated;
return !closed && !isTerminated();
}

private boolean isTerminated()
{
return terminationReason != null;
}

@Override
public long lastTransactionTimestampWhenStarted()
{
return lastTransactionTimestampWhenStarted;
}

@Override
Expand Down

0 comments on commit 17d1379

Please sign in to comment.