Skip to content

Commit

Permalink
Slave now dont see record reads contaminated by record reuse
Browse files Browse the repository at this point in the history
MASTER
Master now buffer ids for a specified time, called the safe zone. Within this safe zone window
records are guaranteed not to be reused.

SLAVE
UpdatePuller makes sure no transactions break the safe zone boundary
by killing transactions that fall outside during pull update.

The transactions store the latest_applied_transaction_commit_time when
they start and `TransactionCommitingResonseUnpacker` kills the transactions
as new transactions (pulled from master) get applied by comparing
the stored commit_time with the new latest_applied_transaction_commit_time
after applying the batch.

If the batch itself spans over a greater time than what fits inside the safe zone,
no more transacions can be started on slave until the entire batch is applied.

Safe zone is configurable through ... a setting

SECONDARY CHANGES
Added knowledge about last tx commit timestamp to MetaDataStore and StoreMigrator
  • Loading branch information
burqen committed Jun 30, 2016
1 parent b4307ad commit 0f2199d
Show file tree
Hide file tree
Showing 70 changed files with 2,122 additions and 499 deletions.
Expand Up @@ -32,7 +32,7 @@ public TransactionTerminatedException()
this( "" );
}

protected TransactionTerminatedException( String info )
public TransactionTerminatedException( String info )
{
super( "The transaction has been terminated. " + requireNonNull( info ) );
}
Expand Down
50 changes: 27 additions & 23 deletions community/kernel/src/main/java/org/neo4j/helpers/TimeUtil.java
Expand Up @@ -44,32 +44,36 @@ public Long apply( String timeWithOrWithoutUnit )
{
return DEFAULT_TIME_UNIT.toMillis( Integer.parseInt( timeWithOrWithoutUnit ) );
}

int amount = Integer.parseInt( timeWithOrWithoutUnit.substring( 0, unitIndex ) );
String unit = timeWithOrWithoutUnit.substring( unitIndex ).toLowerCase();
TimeUnit timeUnit = null;
int multiplyFactor = 1;
if ( unit.equals( "ms" ) )
{
timeUnit = TimeUnit.MILLISECONDS;
}
else if ( unit.equals( "s" ) )
{
timeUnit = TimeUnit.SECONDS;
}
else if ( unit.equals( "m" ) )
{
// SECONDS is only for having to rely on 1.6
timeUnit = TimeUnit.SECONDS;
multiplyFactor = 60;
}
else if ( unit.equals( "h" ) )
{
// SECONDS is only for having to rely on 1.6
timeUnit = TimeUnit.SECONDS;
multiplyFactor = 60*60;
}
else
{
int amount = Integer.parseInt( timeWithOrWithoutUnit.substring( 0, unitIndex ) );
String unit = timeWithOrWithoutUnit.substring( unitIndex ).toLowerCase();
TimeUnit timeUnit = null;
int multiplyFactor = 1;
if ( unit.equals( "ms" ) )
{
timeUnit = TimeUnit.MILLISECONDS;
}
else if ( unit.equals( "s" ) )
{
timeUnit = TimeUnit.SECONDS;
}
else if ( unit.equals( "m" ) )
{
// This is only for having to rely on 1.6
timeUnit = TimeUnit.SECONDS;
multiplyFactor = 60;
}
else
{
throw new RuntimeException( "Unrecognized unit " + unit );
}
return timeUnit.toMillis( amount * multiplyFactor );
throw new RuntimeException( "Unrecognized unit " + unit );
}
return timeUnit.toMillis( amount * multiplyFactor );
}
};

Expand Down
Expand Up @@ -108,6 +108,7 @@
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.id.BufferingIdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdReuseEligibility;
import org.neo4j.kernel.impl.store.record.SchemaRule;
import org.neo4j.kernel.impl.storemigration.StoreUpgrader;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck;
Expand Down Expand Up @@ -183,6 +184,7 @@
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.helpers.collection.Iterables.toList;
import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK_SERVICE;
import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue;
Expand Down Expand Up @@ -377,6 +379,8 @@ boolean applicable( DiagnosticsPhase phase )
private final IndexConfigStore indexConfigStore;
private final ConstraintSemantics constraintSemantics;
private final IdGeneratorFactory idGeneratorFactory;
private BufferingIdGeneratorFactory bufferingIdGeneratorFactory;
private final IdReuseEligibility idReuseEligibility;

private Dependencies dependencies;
private LifeSupport life;
Expand All @@ -390,9 +394,6 @@ boolean applicable( DiagnosticsPhase phase )
private StoreLayerModule storeLayerModule;
private TransactionLogModule transactionLogModule;
private KernelModule kernelModule;
private BufferingIdGeneratorFactory bufferingIdGeneratorFactory;

private final IdReuseEligibility eligibleForReuse;

/**
* Creates a <CODE>NeoStoreXaDataSource</CODE> using configuration from
Expand Down Expand Up @@ -430,7 +431,7 @@ public NeoStoreDataSource( File storeDir, Config config, StoreFactory sf, LogPro
Monitors monitors,
Tracers tracers,
IdGeneratorFactory idGeneratorFactory,
IdReuseEligibility eligibleForReuse )
IdReuseEligibility idReuseEligibility )
{
this.storeDir = storeDir;
this.config = config;
Expand Down Expand Up @@ -459,7 +460,7 @@ public NeoStoreDataSource( File storeDir, Config config, StoreFactory sf, LogPro
this.monitors = monitors;
this.tracers = tracers;
this.idGeneratorFactory = idGeneratorFactory;
this.eligibleForReuse = eligibleForReuse;
this.idReuseEligibility = idReuseEligibility;

readOnly = config.get( Configuration.read_only );
msgLog = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -534,7 +535,7 @@ public void transactionRecovered( long txId )
LegacyIndexApplierLookup legacyIndexApplierLookup =
dependencies.satisfyDependency( new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup ) );

boolean safeIdBuffering = FeatureToggles.flag( getClass(), "safeIdBuffering", false );
boolean safeIdBuffering = FeatureToggles.flag( getClass(), "safeIdBuffering", true );
if ( safeIdBuffering )
{
// This buffering id generator factory will have properly buffering id generators injected into
Expand Down Expand Up @@ -583,7 +584,7 @@ public void transactionRecovered( long txId )
{
// Now that we've instantiated the component which can keep track of transaction boundaries
// we let the id generator know about it.
bufferingIdGeneratorFactory.initialize( kernelModule.kernelTransactions(), eligibleForReuse );
bufferingIdGeneratorFactory.initialize( kernelModule.kernelTransactions(), idReuseEligibility );
life.add( freeIdMaintenance( bufferingIdGeneratorFactory ) );
}

Expand Down
Expand Up @@ -30,7 +30,9 @@
import org.neo4j.kernel.api.KernelTransaction.CloseListener;
import org.neo4j.kernel.api.exceptions.ConstraintViolationTransactionFailureException;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.Status.Classification;
import org.neo4j.kernel.api.exceptions.Status.Code;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;

Expand Down Expand Up @@ -84,7 +86,7 @@ public final void finish()
@Override
public final void terminate()
{
this.transaction.markForTermination();
this.transaction.markForTermination( Status.Transaction.MarkedAsFailed );
}

@Override
Expand Down Expand Up @@ -113,10 +115,13 @@ public void close()
String userMessage = successCalled
? "Transaction was marked as successful, but unable to commit transaction so rolled back."
: "Unable to rollback transaction";
if ( e instanceof KernelException &&
((KernelException)e).status().code().classification() == Classification.TransientError )
if ( e instanceof KernelException )
{
throw new TransientTransactionFailureException( userMessage, e );
Code statusCode = ((KernelException) e).status().code();
if ( statusCode.classification() == Classification.TransientError )
{
throw new TransientTransactionFailureException( userMessage + ": " + statusCode.description(), e );
}
}
throw new TransactionFailureException( userMessage, e );
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.api;

import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;

/**
Expand Down Expand Up @@ -112,16 +113,16 @@ interface CloseListener
boolean isOpen();

/**
* @return {@code true} if {@link #markForTermination()} has been invoked, otherwise {@code false}.
* @return {@code true} if {@link #markForTermination(Status)} has been invoked, otherwise {@code false}.
*/
boolean shouldBeTerminated();
Status shouldBeTerminated();

/**
* Marks this transaction for termination, such that it cannot commit successfully and will try to be
* terminated by having other methods throw a specific termination exception, as to sooner reach the assumed
* point where {@link #close()} will be invoked.
*/
void markForTermination();
void markForTermination( Status typeOfReason );

/**
* Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;

import static java.lang.String.format;

import static org.neo4j.kernel.api.exceptions.Status.Classification.ClientError;
import static org.neo4j.kernel.api.exceptions.Status.Classification.ClientNotification;
import static org.neo4j.kernel.api.exceptions.Status.Classification.DatabaseError;
Expand Down Expand Up @@ -121,6 +122,8 @@ enum Transaction implements Status
MarkedAsFailed( ClientError, "Transaction was marked as both successful and failed. Failure takes precedence" +
" and so this transaction was rolled back although it may have looked like it was going to be " +
"committed" ),
Outdated( TransientError, "Transaction has seen state which has been invalidated by applied updates while " +
"transaction was active. Transaction should succeed if retried" )
;


Expand Down
Expand Up @@ -20,13 +20,13 @@
package org.neo4j.kernel.impl.api;

import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.kernel.api.DataWriteOperations;
import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.api.SchemaWriteOperations;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.TokenWriteOperations;
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.index.IndexReader;
Expand All @@ -38,6 +38,8 @@
import org.neo4j.kernel.impl.api.store.StoreStatement;
import org.neo4j.kernel.impl.locking.Locks;

import static org.neo4j.kernel.impl.api.TransactionTermination.throwCorrectExceptionBasedOnTerminationReason;

public class KernelStatement implements TxStateHolder, Statement
{
protected final Locks.Client locks;
Expand Down Expand Up @@ -126,9 +128,11 @@ void assertOpen()
{
throw new NotInTransactionException( "The statement has been closed." );
}
if ( transaction.shouldBeTerminated() )

Status terminationReason = transaction.shouldBeTerminated();
if ( terminationReason != null )
{
throw new TransactionTerminatedException();
throwCorrectExceptionBasedOnTerminationReason( terminationReason );
}
}

Expand Down
Expand Up @@ -173,9 +173,12 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private boolean closing, closed;
private boolean failure, success;
private volatile boolean terminated;
private Status terminationReason;
// Some header information
private long startTimeMillis;
private long lastTransactionIdWhenStarted;
private long lastTransactionTimestampWhenStarted;

/**
* Implements reusing the same underlying {@link KernelStatement} for overlapping statements.
*/
Expand All @@ -188,7 +191,7 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private volatile int reuseCount;

/**
* Lock prevents transaction {@link #markForTermination() transction termination} from interfering with {@link
* Lock prevents transaction {@link #markForTermination(Status)} transction termination} from interfering with {@link
* #close() transaction commit} and specifically with {@link #release()}.
* Termination can run concurrently with commit and we need to make sure that it terminates the right lock client
* and the right transaction (with the right {@link #reuseCount}) because {@link KernelTransactionImplementation}
Expand Down Expand Up @@ -248,15 +251,17 @@ public KernelTransactionImplementation( StatementOperationParts operations,
/**
* Reset this transaction to a vanilla state, turning it into a logically new transaction.
*/
public KernelTransactionImplementation initialize( long lastCommittedTx )
public KernelTransactionImplementation initialize( long lastCommittedTx, long lastTimeStamp )
{
this.locks = locksManager.newClient();
this.terminationReason = null;
this.closing = closed = failure = success = terminated = false;
this.transactionType = TransactionType.ANY;
this.beforeHookInvoked = false;
this.recordState.initialize( lastCommittedTx );
this.startTimeMillis = clock.currentTimeMillis();
this.lastTransactionIdWhenStarted = lastCommittedTx;
this.lastTransactionTimestampWhenStarted = lastTimeStamp;
this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.storeStatement = storeLayer.acquireStatement();
Expand All @@ -282,9 +287,9 @@ public void failure()
}

@Override
public boolean shouldBeTerminated()
public Status shouldBeTerminated()
{
return terminated;
return terminated ? terminationReason : null;
}

/**
Expand All @@ -294,7 +299,7 @@ public boolean shouldBeTerminated()
* {@link #close()} and {@link #release()} calls.
*/
@Override
public void markForTermination()
public void markForTermination( Status reason )
{
if ( !canBeTerminated() )
{
Expand All @@ -313,6 +318,7 @@ public void markForTermination()
{
failure = true;
terminated = true;
terminationReason = reason;
if ( txTerminationAwareLocks && locks != null )
{
locks.stop();
Expand Down Expand Up @@ -469,11 +475,6 @@ private boolean hasChanges()
counts.hasChanges();
}

private boolean hasDataChanges()
{
return hasTxStateWithChanges() && txState.hasDataChanges();
}

// Only for test-access
public TransactionRecordState getTransactionRecordState()
{
Expand All @@ -492,7 +493,7 @@ public void close() throws TransactionFailureException
if ( failure || !success || terminated )
{
rollback();
failOnNonExplicitRollbackIfNeeded();
failOnNonExplicitRollbackIfNeeded( );
}
else
{
Expand Down Expand Up @@ -543,7 +544,8 @@ private void failOnNonExplicitRollbackIfNeeded() throws TransactionFailureExcept
{
if ( success && terminated )
{
throw new TransactionTerminatedException();
String terminationMessage = terminationReason == null ? "" : terminationReason.code().description();
throw new TransactionTerminatedException( terminationMessage );
}
if ( success )
{
Expand Down Expand Up @@ -744,7 +746,7 @@ private void afterRollback()
/**
* Release resources held up by this transaction & return it to the transaction pool.
* This method is guarded by {@link #terminationReleaseLock} to coordinate concurrent
* {@link #markForTermination()} calls.
* {@link #markForTermination(Status)} calls.
*/
private void release()
{
Expand All @@ -754,6 +756,7 @@ private void release()
locks.close();
locks = null;
terminated = false;
terminationReason = null;
pool.release( this );
if ( storeStatement != null )
{
Expand All @@ -777,6 +780,11 @@ private boolean canBeTerminated()
return !closed && !terminated;
}

public long lastTransactionTimestampWhenStarted()
{
return lastTransactionTimestampWhenStarted;
}

private class TransactionToRecordStateVisitor extends TxStateVisitor.Adapter
{
private final RelationshipDataExtractor edge = new RelationshipDataExtractor();
Expand Down

0 comments on commit 0f2199d

Please sign in to comment.