diff --git a/community/kernel/src/main/java/org/neo4j/graphdb/TransactionTerminatedException.java b/community/kernel/src/main/java/org/neo4j/graphdb/TransactionTerminatedException.java
index c42a2f339e9d6..e3f030517e28e 100644
--- a/community/kernel/src/main/java/org/neo4j/graphdb/TransactionTerminatedException.java
+++ b/community/kernel/src/main/java/org/neo4j/graphdb/TransactionTerminatedException.java
@@ -32,7 +32,7 @@ public TransactionTerminatedException()
this( "" );
}
- protected TransactionTerminatedException( String info )
+ public TransactionTerminatedException( String info )
{
super( "The transaction has been terminated. " + requireNonNull( info ) );
}
diff --git a/community/kernel/src/main/java/org/neo4j/helpers/TimeUtil.java b/community/kernel/src/main/java/org/neo4j/helpers/TimeUtil.java
index b0d17827c1dfe..d8cbc0be9f610 100644
--- a/community/kernel/src/main/java/org/neo4j/helpers/TimeUtil.java
+++ b/community/kernel/src/main/java/org/neo4j/helpers/TimeUtil.java
@@ -44,32 +44,36 @@ public Long apply( String timeWithOrWithoutUnit )
{
return DEFAULT_TIME_UNIT.toMillis( Integer.parseInt( timeWithOrWithoutUnit ) );
}
+
+ int amount = Integer.parseInt( timeWithOrWithoutUnit.substring( 0, unitIndex ) );
+ String unit = timeWithOrWithoutUnit.substring( unitIndex ).toLowerCase();
+ TimeUnit timeUnit = null;
+ int multiplyFactor = 1;
+ if ( unit.equals( "ms" ) )
+ {
+ timeUnit = TimeUnit.MILLISECONDS;
+ }
+ else if ( unit.equals( "s" ) )
+ {
+ timeUnit = TimeUnit.SECONDS;
+ }
+ else if ( unit.equals( "m" ) )
+ {
+ // SECONDS is only for having to rely on 1.6
+ timeUnit = TimeUnit.SECONDS;
+ multiplyFactor = 60;
+ }
+ else if ( unit.equals( "h" ) )
+ {
+ // SECONDS is only for having to rely on 1.6
+ timeUnit = TimeUnit.SECONDS;
+ multiplyFactor = 60*60;
+ }
else
{
- int amount = Integer.parseInt( timeWithOrWithoutUnit.substring( 0, unitIndex ) );
- String unit = timeWithOrWithoutUnit.substring( unitIndex ).toLowerCase();
- TimeUnit timeUnit = null;
- int multiplyFactor = 1;
- if ( unit.equals( "ms" ) )
- {
- timeUnit = TimeUnit.MILLISECONDS;
- }
- else if ( unit.equals( "s" ) )
- {
- timeUnit = TimeUnit.SECONDS;
- }
- else if ( unit.equals( "m" ) )
- {
- // This is only for having to rely on 1.6
- timeUnit = TimeUnit.SECONDS;
- multiplyFactor = 60;
- }
- else
- {
- throw new RuntimeException( "Unrecognized unit " + unit );
- }
- return timeUnit.toMillis( amount * multiplyFactor );
+ throw new RuntimeException( "Unrecognized unit " + unit );
}
+ return timeUnit.toMillis( amount * multiplyFactor );
}
};
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java
index d7322ff6de5a0..beae306fd5f5c 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java
@@ -108,6 +108,7 @@
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.id.BufferingIdGeneratorFactory;
+import org.neo4j.kernel.impl.store.id.IdReuseEligibility;
import org.neo4j.kernel.impl.store.record.SchemaRule;
import org.neo4j.kernel.impl.storemigration.StoreUpgrader;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck;
@@ -183,6 +184,7 @@
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;
import static java.util.concurrent.TimeUnit.SECONDS;
+
import static org.neo4j.helpers.collection.Iterables.toList;
import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK_SERVICE;
import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue;
@@ -377,6 +379,8 @@ boolean applicable( DiagnosticsPhase phase )
private final IndexConfigStore indexConfigStore;
private final ConstraintSemantics constraintSemantics;
private final IdGeneratorFactory idGeneratorFactory;
+ private BufferingIdGeneratorFactory bufferingIdGeneratorFactory;
+ private final IdReuseEligibility idReuseEligibility;
private Dependencies dependencies;
private LifeSupport life;
@@ -390,9 +394,6 @@ boolean applicable( DiagnosticsPhase phase )
private StoreLayerModule storeLayerModule;
private TransactionLogModule transactionLogModule;
private KernelModule kernelModule;
- private BufferingIdGeneratorFactory bufferingIdGeneratorFactory;
-
- private final IdReuseEligibility eligibleForReuse;
/**
* Creates a NeoStoreXaDataSource using configuration from
@@ -430,7 +431,7 @@ public NeoStoreDataSource( File storeDir, Config config, StoreFactory sf, LogPro
Monitors monitors,
Tracers tracers,
IdGeneratorFactory idGeneratorFactory,
- IdReuseEligibility eligibleForReuse )
+ IdReuseEligibility idReuseEligibility )
{
this.storeDir = storeDir;
this.config = config;
@@ -459,7 +460,7 @@ public NeoStoreDataSource( File storeDir, Config config, StoreFactory sf, LogPro
this.monitors = monitors;
this.tracers = tracers;
this.idGeneratorFactory = idGeneratorFactory;
- this.eligibleForReuse = eligibleForReuse;
+ this.idReuseEligibility = idReuseEligibility;
readOnly = config.get( Configuration.read_only );
msgLog = logProvider.getLog( getClass() );
@@ -534,7 +535,7 @@ public void transactionRecovered( long txId )
LegacyIndexApplierLookup legacyIndexApplierLookup =
dependencies.satisfyDependency( new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup ) );
- boolean safeIdBuffering = FeatureToggles.flag( getClass(), "safeIdBuffering", false );
+ boolean safeIdBuffering = FeatureToggles.flag( getClass(), "safeIdBuffering", true );
if ( safeIdBuffering )
{
// This buffering id generator factory will have properly buffering id generators injected into
@@ -583,7 +584,7 @@ public void transactionRecovered( long txId )
{
// Now that we've instantiated the component which can keep track of transaction boundaries
// we let the id generator know about it.
- bufferingIdGeneratorFactory.initialize( kernelModule.kernelTransactions(), eligibleForReuse );
+ bufferingIdGeneratorFactory.initialize( kernelModule.kernelTransactions(), idReuseEligibility );
life.add( freeIdMaintenance( bufferingIdGeneratorFactory ) );
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/TopLevelTransaction.java b/community/kernel/src/main/java/org/neo4j/kernel/TopLevelTransaction.java
index 2f0ceca5a276b..c93be052f2bfe 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/TopLevelTransaction.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/TopLevelTransaction.java
@@ -30,7 +30,9 @@
import org.neo4j.kernel.api.KernelTransaction.CloseListener;
import org.neo4j.kernel.api.exceptions.ConstraintViolationTransactionFailureException;
import org.neo4j.kernel.api.exceptions.KernelException;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.Status.Classification;
+import org.neo4j.kernel.api.exceptions.Status.Code;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
@@ -84,7 +86,7 @@ public final void finish()
@Override
public final void terminate()
{
- this.transaction.markForTermination();
+ this.transaction.markForTermination( Status.Transaction.MarkedAsFailed );
}
@Override
@@ -113,10 +115,13 @@ public void close()
String userMessage = successCalled
? "Transaction was marked as successful, but unable to commit transaction so rolled back."
: "Unable to rollback transaction";
- if ( e instanceof KernelException &&
- ((KernelException)e).status().code().classification() == Classification.TransientError )
+ if ( e instanceof KernelException )
{
- throw new TransientTransactionFailureException( userMessage, e );
+ Code statusCode = ((KernelException) e).status().code();
+ if ( statusCode.classification() == Classification.TransientError )
+ {
+ throw new TransientTransactionFailureException( userMessage + ": " + statusCode.description(), e );
+ }
}
throw new TransactionFailureException( userMessage, e );
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransaction.java b/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransaction.java
index ddad94464499b..fdf762f17c0bb 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransaction.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransaction.java
@@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.api;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
/**
@@ -112,16 +113,16 @@ interface CloseListener
boolean isOpen();
/**
- * @return {@code true} if {@link #markForTermination()} has been invoked, otherwise {@code false}.
+ * @return {@code true} if {@link #markForTermination(Status)} has been invoked, otherwise {@code false}.
*/
- boolean shouldBeTerminated();
+ Status shouldBeTerminated();
/**
* Marks this transaction for termination, such that it cannot commit successfully and will try to be
* terminated by having other methods throw a specific termination exception, as to sooner reach the assumed
* point where {@link #close()} will be invoked.
*/
- void markForTermination();
+ void markForTermination( Status typeOfReason );
/**
* Register a {@link CloseListener} to be invoked after commit, but before transaction events "after" hooks
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/exceptions/Status.java b/community/kernel/src/main/java/org/neo4j/kernel/api/exceptions/Status.java
index 673c07a50117d..e5901d472925d 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/api/exceptions/Status.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/api/exceptions/Status.java
@@ -24,6 +24,7 @@
import java.util.Collections;
import static java.lang.String.format;
+
import static org.neo4j.kernel.api.exceptions.Status.Classification.ClientError;
import static org.neo4j.kernel.api.exceptions.Status.Classification.ClientNotification;
import static org.neo4j.kernel.api.exceptions.Status.Classification.DatabaseError;
@@ -121,6 +122,8 @@ enum Transaction implements Status
MarkedAsFailed( ClientError, "Transaction was marked as both successful and failed. Failure takes precedence" +
" and so this transaction was rolled back although it may have looked like it was going to be " +
"committed" ),
+ Outdated( TransientError, "Transaction has seen state which has been invalidated by applied updates while " +
+ "transaction was active. Transaction should succeed if retried" )
;
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelStatement.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelStatement.java
index 23e74386e220a..85daa48d942a0 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelStatement.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelStatement.java
@@ -20,13 +20,13 @@
package org.neo4j.kernel.impl.api;
import org.neo4j.graphdb.NotInTransactionException;
-import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.kernel.api.DataWriteOperations;
import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.api.SchemaWriteOperations;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.TokenWriteOperations;
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.api.index.IndexReader;
@@ -38,6 +38,8 @@
import org.neo4j.kernel.impl.api.store.StoreStatement;
import org.neo4j.kernel.impl.locking.Locks;
+import static org.neo4j.kernel.impl.api.TransactionTermination.throwCorrectExceptionBasedOnTerminationReason;
+
public class KernelStatement implements TxStateHolder, Statement
{
protected final Locks.Client locks;
@@ -126,9 +128,11 @@ void assertOpen()
{
throw new NotInTransactionException( "The statement has been closed." );
}
- if ( transaction.shouldBeTerminated() )
+
+ Status terminationReason = transaction.shouldBeTerminated();
+ if ( terminationReason != null )
{
- throw new TransactionTerminatedException();
+ throwCorrectExceptionBasedOnTerminationReason( terminationReason );
}
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java
index f7a98d6f4f4ec..5fc7b3157b297 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java
@@ -173,9 +173,12 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private boolean closing, closed;
private boolean failure, success;
private volatile boolean terminated;
+ private Status terminationReason;
// Some header information
private long startTimeMillis;
private long lastTransactionIdWhenStarted;
+ private long lastTransactionTimestampWhenStarted;
+
/**
* Implements reusing the same underlying {@link KernelStatement} for overlapping statements.
*/
@@ -188,7 +191,7 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private volatile int reuseCount;
/**
- * Lock prevents transaction {@link #markForTermination() transction termination} from interfering with {@link
+ * Lock prevents transaction {@link #markForTermination(Status)} transction termination} from interfering with {@link
* #close() transaction commit} and specifically with {@link #release()}.
* Termination can run concurrently with commit and we need to make sure that it terminates the right lock client
* and the right transaction (with the right {@link #reuseCount}) because {@link KernelTransactionImplementation}
@@ -248,15 +251,17 @@ public KernelTransactionImplementation( StatementOperationParts operations,
/**
* Reset this transaction to a vanilla state, turning it into a logically new transaction.
*/
- public KernelTransactionImplementation initialize( long lastCommittedTx )
+ public KernelTransactionImplementation initialize( long lastCommittedTx, long lastTimeStamp )
{
this.locks = locksManager.newClient();
+ this.terminationReason = null;
this.closing = closed = failure = success = terminated = false;
this.transactionType = TransactionType.ANY;
this.beforeHookInvoked = false;
this.recordState.initialize( lastCommittedTx );
this.startTimeMillis = clock.currentTimeMillis();
this.lastTransactionIdWhenStarted = lastCommittedTx;
+ this.lastTransactionTimestampWhenStarted = lastTimeStamp;
this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.storeStatement = storeLayer.acquireStatement();
@@ -282,9 +287,9 @@ public void failure()
}
@Override
- public boolean shouldBeTerminated()
+ public Status shouldBeTerminated()
{
- return terminated;
+ return terminated ? terminationReason : null;
}
/**
@@ -294,7 +299,7 @@ public boolean shouldBeTerminated()
* {@link #close()} and {@link #release()} calls.
*/
@Override
- public void markForTermination()
+ public void markForTermination( Status reason )
{
if ( !canBeTerminated() )
{
@@ -313,6 +318,7 @@ public void markForTermination()
{
failure = true;
terminated = true;
+ terminationReason = reason;
if ( txTerminationAwareLocks && locks != null )
{
locks.stop();
@@ -469,11 +475,6 @@ private boolean hasChanges()
counts.hasChanges();
}
- private boolean hasDataChanges()
- {
- return hasTxStateWithChanges() && txState.hasDataChanges();
- }
-
// Only for test-access
public TransactionRecordState getTransactionRecordState()
{
@@ -492,7 +493,7 @@ public void close() throws TransactionFailureException
if ( failure || !success || terminated )
{
rollback();
- failOnNonExplicitRollbackIfNeeded();
+ failOnNonExplicitRollbackIfNeeded( );
}
else
{
@@ -543,7 +544,8 @@ private void failOnNonExplicitRollbackIfNeeded() throws TransactionFailureExcept
{
if ( success && terminated )
{
- throw new TransactionTerminatedException();
+ String terminationMessage = terminationReason == null ? "" : terminationReason.code().description();
+ throw new TransactionTerminatedException( terminationMessage );
}
if ( success )
{
@@ -744,7 +746,7 @@ private void afterRollback()
/**
* Release resources held up by this transaction & return it to the transaction pool.
* This method is guarded by {@link #terminationReleaseLock} to coordinate concurrent
- * {@link #markForTermination()} calls.
+ * {@link #markForTermination(Status)} calls.
*/
private void release()
{
@@ -754,6 +756,7 @@ private void release()
locks.close();
locks = null;
terminated = false;
+ terminationReason = null;
pool.release( this );
if ( storeStatement != null )
{
@@ -777,6 +780,11 @@ private boolean canBeTerminated()
return !closed && !terminated;
}
+ public long lastTransactionTimestampWhenStarted()
+ {
+ return lastTransactionTimestampWhenStarted;
+ }
+
private class TransactionToRecordStateVisitor extends TxStateVisitor.Adapter
{
private final RelationshipDataExtractor edge = new RelationshipDataExtractor();
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java
index 4bde7740ad7f1..e81588aa1551c 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java
@@ -23,6 +23,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.neo4j.collection.pool.LinkedQueuePool;
import org.neo4j.collection.pool.MarshlandPool;
@@ -32,6 +34,7 @@
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.api.KernelTransaction;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.configuration.Config;
@@ -46,6 +49,7 @@
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.store.NeoStores;
+import org.neo4j.kernel.impl.store.TransactionId;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.state.IntegrityValidator;
@@ -99,6 +103,7 @@ public class KernelTransactions extends LifecycleAdapter
private final LifeSupport dataSourceLife;
private final ProcedureCache procedureCache;
private final Tracers tracers;
+ private final Lock activeTransactionsLock = new ReentrantLock();
// End Tx Dependencies
@@ -115,6 +120,7 @@ public class KernelTransactions extends LifecycleAdapter
*/
private final Set allTransactions = newSetFromMap(
new ConcurrentHashMap() );
+ private boolean freezeActiveTransactions;
public KernelTransactions( NeoStoreTransactionContextFactory neoStoreTransactionContextFactory,
NeoStores neoStores, Locks locks, IntegrityValidator integrityValidator,
@@ -185,11 +191,22 @@ public KernelTransactionImplementation newInstance()
}
};
+ // TODO: Optimize with read / write locks and or only do it on slaves
@Override
public KernelTransaction newInstance()
{
- assertDatabaseIsRunning();
- return localTxPool.acquire().initialize( neoStores.getMetaDataStore().getLastCommittedTransactionId() );
+ activeTransactionsLock.lock();
+ try
+ {
+ assertDatabaseIsRunning();
+ TransactionId lastCommittedTransaction = neoStores.getMetaDataStore().getLastCommittedTransaction();
+ return localTxPool.acquire().initialize( lastCommittedTransaction.transactionId(),
+ lastCommittedTransaction.commitTimestamp() );
+ }
+ finally
+ {
+ activeTransactionsLock.unlock();
+ }
}
/**
@@ -239,7 +256,7 @@ public void disposeAll()
for ( KernelTransactionImplementation tx : allTransactions )
{
// We mark all transactions for termination since we want to be on the safe side here.
- tx.markForTermination();
+ tx.markForTermination( Status.General.DatabaseUnavailable );
}
localTxPool.disposeAll();
globalTxPool.disposeAll();
@@ -266,6 +283,18 @@ private void assertDatabaseIsRunning()
@Override
public KernelTransactionsSnapshot get()
{
- return new KernelTransactionsSnapshot( allTransactions );
+ return new KernelTransactionsSnapshot( allTransactions, Clock.SYSTEM_CLOCK.currentTimeMillis() );
+ }
+
+ public Lock freezeActiveTx()
+ {
+ activeTransactionsLock.lock();
+ return activeTransactionsLock;
+ }
+
+ public Lock unfreezeActiveTx()
+ {
+ activeTransactionsLock.unlock();
+ return activeTransactionsLock;
}
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionsSnapshot.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionsSnapshot.java
index 8ccf6c103e51f..47cdd172a49d0 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionsSnapshot.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionsSnapshot.java
@@ -31,8 +31,9 @@
public class KernelTransactionsSnapshot
{
private Tx relevantTransactions;
+ private final long snapshotTime;
- public KernelTransactionsSnapshot( Set allTransactions )
+ public KernelTransactionsSnapshot( Set allTransactions, long snapshotTime )
{
Tx head = null;
for ( KernelTransactionImplementation tx : allTransactions )
@@ -52,6 +53,7 @@ public KernelTransactionsSnapshot( Set allTrans
}
}
relevantTransactions = head;
+ this.snapshotTime = snapshotTime;
}
public boolean allClosed()
@@ -72,6 +74,11 @@ public boolean allClosed()
return true;
}
+ public long snapshotTime()
+ {
+ return snapshotTime;
+ }
+
private static class Tx
{
private final KernelTransactionImplementation transaction;
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java
index 733899ecb3582..8b9e1bc2410c4 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java
@@ -307,6 +307,10 @@ public Object nodeGetProperty( long nodeId, int propertyKeyId ) throws EntityNot
{
return node.get().getProperty( propertyKeyId );
}
+ finally
+ {
+ statement.assertOpen();
+ }
}
@Override
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/TransactionTermination.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/TransactionTermination.java
new file mode 100644
index 0000000000000..572dcd4822c87
--- /dev/null
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/TransactionTermination.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.kernel.impl.api;
+
+import org.neo4j.graphdb.TransactionTerminatedException;
+import org.neo4j.graphdb.TransientTransactionFailureException;
+import org.neo4j.kernel.api.exceptions.Status;
+
+public class TransactionTermination
+{
+ /**
+ * Throws different user facing exception depending on {@code reason} of transaction termination.
+ *
+ * @param reason the {@link Status} given as reason for terminating a transaction.
+ */
+ public static void throwCorrectExceptionBasedOnTerminationReason( Status reason )
+ {
+ if ( reason != null )
+ {
+ if ( reason.code().classification() == Status.Classification.TransientError )
+ {
+ throw new TransientTransactionFailureException( reason.code().description() );
+ }
+ throw new TransactionTerminatedException();
+ }
+ }
+}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorePropertyPayloadCursor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorePropertyPayloadCursor.java
index f29df18303657..6f0930854a7c2 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorePropertyPayloadCursor.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorePropertyPayloadCursor.java
@@ -291,8 +291,13 @@ private void readFromStore( AbstractDynamicStore store, AbstractDynamicStore.Dyn
buffer = newBuffer;
}
buffer.put( data, 0, data.length );
+ Thread.sleep( 1 ); // TODO: Remove
}
}
+ catch ( InterruptedException e )
+ {
+ throw new RuntimeException( e );
+ }
}
private ByteBuffer newBiggerBuffer( int requiredCapacity )
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/core/ThreadToStatementContextBridge.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/core/ThreadToStatementContextBridge.java
index a252858473915..7fd81602bc2ed 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/core/ThreadToStatementContextBridge.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/core/ThreadToStatementContextBridge.java
@@ -23,11 +23,13 @@
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.kernel.TopLevelTransaction;
-import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
+import static org.neo4j.kernel.impl.api.TransactionTermination.throwCorrectExceptionBasedOnTerminationReason;
+
/**
* This is meant to serve as the bridge that makes the Beans API tie transactions to threads. The Beans API
* will use this to get the appropriate {@link Statement} when it performs operations.
@@ -70,9 +72,10 @@ private void assertInUnterminatedTransaction( TopLevelTransaction transaction )
{
throw new NotInTransactionException();
}
- if ( transaction.getTransaction().shouldBeTerminated() )
+ Status terminationReason = transaction.getTransaction().shouldBeTerminated();
+ if ( terminationReason != null )
{
- throw new TransactionTerminatedException();
+ throwCorrectExceptionBasedOnTerminationReason( terminationReason );
}
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/CommunityEditionModule.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/CommunityEditionModule.java
index 0bbfc09f6fac0..86af06cc34f0a 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/CommunityEditionModule.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/CommunityEditionModule.java
@@ -30,7 +30,6 @@
import org.neo4j.kernel.DefaultIdGeneratorFactory;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.IdGeneratorFactory;
-import org.neo4j.kernel.IdReuseEligibility;
import org.neo4j.kernel.KernelData;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.Version;
@@ -52,6 +51,7 @@
import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.kernel.impl.locking.community.CommunityLockManger;
import org.neo4j.kernel.impl.logging.LogService;
+import org.neo4j.kernel.impl.store.id.IdReuseEligibility;
import org.neo4j.kernel.impl.storemigration.ConfigMapUpgradeConfiguration;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
@@ -108,6 +108,8 @@ public CommunityEditionModule( PlatformModule platformModule )
constraintSemantics = createSchemaRuleVerifier();
+ eligibleForIdReuse = IdReuseEligibility.ALWAYS;
+
registerRecovery( config.get( GraphDatabaseFacadeFactory.Configuration.editionName), life, dependencies );
publishEditionInfo( dependencies.resolveDependency( UsageData.class ) );
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/EditionModule.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/EditionModule.java
index fef7c63d3fae2..2df7e7e43d9c7 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/EditionModule.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/EditionModule.java
@@ -21,16 +21,16 @@
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.kernel.IdGeneratorFactory;
-import org.neo4j.kernel.IdReuseEligibility;
import org.neo4j.kernel.KernelDiagnostics;
import org.neo4j.kernel.NeoStoreDataSource;
-import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.api.CommitProcessFactory;
import org.neo4j.kernel.impl.api.SchemaWriteGuard;
+import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.core.LabelTokenHolder;
import org.neo4j.kernel.impl.core.PropertyKeyTokenHolder;
import org.neo4j.kernel.impl.core.RelationshipTypeTokenHolder;
import org.neo4j.kernel.impl.locking.Locks;
+import org.neo4j.kernel.impl.store.id.IdReuseEligibility;
import org.neo4j.kernel.impl.storemigration.UpgradeConfiguration;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.info.DiagnosticsManager;
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/HighestTransactionId.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/HighestTransactionId.java
index d089978e016f5..fd6f04e9b44cd 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/HighestTransactionId.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/HighestTransactionId.java
@@ -29,9 +29,9 @@ public class HighestTransactionId
{
private final AtomicReference highest = new AtomicReference<>();
- public HighestTransactionId( long initialTransactionId, long initialChecksum )
+ public HighestTransactionId( long initialTransactionId, long initialChecksum, long commitTimestamp )
{
- set( initialTransactionId, initialChecksum );
+ set( initialTransactionId, initialChecksum, commitTimestamp );
}
/**
@@ -40,10 +40,11 @@ public HighestTransactionId( long initialTransactionId, long initialChecksum )
*
* @param transactionId transaction id to compare for highest.
* @param checksum checksum of the transaction.
+ * @param commitTimestamp commit time for transaction with {@code transactionId}.
* @return {@code true} if the given transaction id was higher than the current highest,
* {@code false}.
*/
- public boolean offer( long transactionId, long checksum )
+ public boolean offer( long transactionId, long checksum, long commitTimestamp )
{
TransactionId high = highest.get();
if ( transactionId < high.transactionId() )
@@ -51,7 +52,7 @@ public boolean offer( long transactionId, long checksum )
return false;
}
- TransactionId update = new TransactionId( transactionId, checksum );
+ TransactionId update = new TransactionId( transactionId, checksum, commitTimestamp );
while ( !highest.compareAndSet( high, update ) )
{
high = highest.get();
@@ -69,10 +70,11 @@ public boolean offer( long transactionId, long checksum )
*
* @param transactionId id of the transaction.
* @param checksum checksum of the transaction.
+ * @param commitTimestamp commit time for transaction with {@code transactionId}.
*/
- public void set( long transactionId, long checksum )
+ public void set( long transactionId, long checksum, long commitTimestamp )
{
- highest.set( new TransactionId( transactionId, checksum ) );
+ highest.set( new TransactionId( transactionId, checksum, commitTimestamp ) );
}
/**
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/MetaDataStore.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/MetaDataStore.java
index 395452f2a12d9..21084d0a1ebe1 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/MetaDataStore.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/MetaDataStore.java
@@ -77,7 +77,8 @@ public enum Position
UPGRADE_TRANSACTION_CHECKSUM( 10, "Checksum of transaction id the most recent upgrade was performed at" ),
LAST_CLOSED_TRANSACTION_LOG_VERSION( 11, "Log version where the last transaction commit entry has been written into" ),
LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET( 12, "Byte offset in the log file where the last transaction commit entry " +
- "has been written into" );
+ "has been written into" ),
+ LAST_TRANSACTION_COMMIT_TIMESTAMP( 13, "Commit time timestamp for last committed transaction" );
private final int id;
private final String description;
@@ -114,8 +115,10 @@ public String description()
// This is not a field in the store, but something keeping track of which is the currently highest
// committed transaction id, together with its checksum.
+ // TODO: Set commit timestamp to FIELD_NOT_INITIALIZED is risky as we do not know what that means.
+ // TODO: But do we know what state we are in? BASE_LAST_COMMIT_TIMESTAMP or UNKNOWN_LAST_COMMIT_TIMESTAMP?
private final HighestTransactionId highestCommittedTransaction =
- new HighestTransactionId( FIELD_NOT_INITIALIZED, FIELD_NOT_INITIALIZED );
+ new HighestTransactionId( FIELD_NOT_INITIALIZED, FIELD_NOT_INITIALIZED, FIELD_NOT_INITIALIZED );
// This is not a field in the store, but something keeping track of which of the committed
// transactions have been closed. Useful in rotation and shutdown.
@@ -148,7 +151,7 @@ protected void initialiseNewStoreFile( PagedFile file ) throws IOException
setUpgradeTransaction( BASE_TX_ID, BASE_TX_CHECKSUM );
setCurrentLogVersion( 0 );
setLastCommittedAndClosedTransactionId(
- BASE_TX_ID, BASE_TX_CHECKSUM, BASE_TX_LOG_VERSION, BASE_TX_LOG_BYTE_OFFSET );
+ BASE_TX_ID, BASE_TX_CHECKSUM, BASE_TX_COMMIT_TIMESTAMP, BASE_TX_LOG_BYTE_OFFSET, BASE_TX_LOG_VERSION );
setStoreVersion( MetaDataStore.versionStringToLong( CommonAbstractStore.ALL_STORES_VERSION ) );
setGraphNextProp( -1 );
setLatestConstraintIntroducingTx( 0 );
@@ -464,13 +467,19 @@ private void readAllFields( PageCursor cursor ) throws IOException
lastClosedTx.set( lastCommittedTxId,
new long[]{lastClosedTransactionLogVersion, lastClosedTransactionLogByteOffset} );
highestCommittedTransaction.set( lastCommittedTxId,
- getRecordValue( cursor, Position.LAST_TRANSACTION_CHECKSUM ) );
+ getRecordValue( cursor, Position.LAST_TRANSACTION_CHECKSUM ),
+ getRecordValue( cursor, Position.LAST_TRANSACTION_COMMIT_TIMESTAMP, BASE_TX_COMMIT_TIMESTAMP ) );
upgradeTxChecksumField = getRecordValue( cursor, Position.UPGRADE_TRANSACTION_CHECKSUM );
}
while ( cursor.shouldRetry() );
}
private long getRecordValue( PageCursor cursor, Position position )
+ {
+ return getRecordValue( cursor, position, FIELD_NOT_PRESENT );
+ }
+
+ private long getRecordValue( PageCursor cursor, Position position, long defaultValue )
{
int offset = position.id * getRecordSize();
cursor.setOffset( offset );
@@ -478,9 +487,10 @@ private long getRecordValue( PageCursor cursor, Position position )
{
return cursor.getLong();
}
- return FIELD_NOT_PRESENT;
+ return defaultValue;
}
+
private void incrementVersion( PageCursor cursor ) throws IOException
{
int offset = Position.LOG_VERSION.id * getRecordSize();
@@ -630,11 +640,11 @@ public long nextCommittingTransactionId()
}
@Override
- public void transactionCommitted( long transactionId, long checksum )
+ public void transactionCommitted( long transactionId, long checksum, long commitTimestamp )
{
assertNotClosed();
checkInitialized( lastCommittingTxField.get() );
- if ( highestCommittedTransaction.offer( transactionId, checksum ) )
+ if ( highestCommittedTransaction.offer( transactionId, checksum, commitTimestamp ) )
{
// We need to synchronize here in order to guarantee that the two field are written consistently
// together. Note that having the exclusive lock on tha page is not enough for 2 reasons:
@@ -649,6 +659,8 @@ public void transactionCommitted( long transactionId, long checksum )
{
setRecord( Position.LAST_TRANSACTION_ID, transactionId );
setRecord( Position.LAST_TRANSACTION_CHECKSUM, checksum );
+ setRecord( Position.LAST_TRANSACTION_COMMIT_TIMESTAMP, commitTimestamp );
+ // TODO: setRecords(...) to avoid multiple new pageCursor
}
}
}
@@ -675,7 +687,7 @@ public TransactionId getUpgradeTransaction()
{
assertNotClosed();
checkInitialized( upgradeTxChecksumField );
- return new TransactionId( upgradeTxIdField, upgradeTxChecksumField );
+ return new TransactionId( upgradeTxIdField, upgradeTxChecksumField, BASE_TX_COMMIT_TIMESTAMP );
}
@Override
@@ -709,19 +721,21 @@ private void checkInitialized( long field )
// only for initialization
@Override
- public void setLastCommittedAndClosedTransactionId( long transactionId, long checksum, long logVersion, long byteOffset )
+ public void setLastCommittedAndClosedTransactionId( long transactionId, long checksum,
+ long commitTimestamp, long byteOffset, long logVersion )
{
assertNotClosed();
setRecord( Position.LAST_TRANSACTION_ID, transactionId );
setRecord( Position.LAST_TRANSACTION_CHECKSUM, checksum );
setRecord( Position.LAST_CLOSED_TRANSACTION_LOG_VERSION, logVersion );
setRecord( Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, byteOffset );
+ setRecord( Position.LAST_TRANSACTION_COMMIT_TIMESTAMP, commitTimestamp );
checkInitialized( lastCommittingTxField.get() );
lastCommittingTxField.set( transactionId );
lastClosedTx.set( transactionId, new long[]{logVersion, byteOffset} );
lastClosedTransactionLogVersion = logVersion;
lastClosedTransactionLogByteOffset = byteOffset;
- highestCommittedTransaction.set( transactionId, checksum );
+ highestCommittedTransaction.set( transactionId, checksum, commitTimestamp );
}
@Override
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/TransactionId.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/TransactionId.java
index 5040ab510a746..a84d613470414 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/TransactionId.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/TransactionId.java
@@ -28,11 +28,13 @@ public class TransactionId
{
private long transactionId;
private long checksum;
+ private long commitTimestamp;
- public TransactionId( long transactionId, long checksum )
+ public TransactionId( long transactionId, long checksum, long commitTimestamp )
{
this.transactionId = transactionId;
this.checksum = checksum;
+ this.commitTimestamp = commitTimestamp;
}
/**
@@ -44,6 +46,14 @@ public long transactionId()
return transactionId;
}
+ /**
+ * Commit timestamp. Timestamp when transaction with transactionId was committed.
+ */
+ public long commitTimestamp()
+ {
+ return commitTimestamp;
+ }
+
/**
* Checksum of a transaction, generated from transaction meta data or its contents.
*/
@@ -65,7 +75,9 @@ public boolean equals( Object o )
}
TransactionId that = (TransactionId) o;
- return transactionId == that.transactionId && checksum == that.checksum;
+ return transactionId == that.transactionId &&
+ checksum == that.checksum &&
+ commitTimestamp == that.commitTimestamp;
}
@Override
@@ -73,6 +85,7 @@ public int hashCode()
{
int result = (int) (transactionId ^ (transactionId >>> 32));
result = 31 * result + (int) (checksum ^ (checksum >>> 32));
+ result = 31 * result + (int) (commitTimestamp ^ (commitTimestamp >>> 32));
return result;
}
@@ -82,6 +95,7 @@ public String toString()
return getClass().getSimpleName() + "{" +
"transactionId=" + transactionId +
", checksum=" + checksum +
+ ", commitTimestamp=" + commitTimestamp +
'}';
}
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/BufferingIdGeneratorFactory.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/BufferingIdGeneratorFactory.java
index b857b99a38124..790ad671f3249 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/BufferingIdGeneratorFactory.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/BufferingIdGeneratorFactory.java
@@ -24,7 +24,6 @@
import org.neo4j.function.Predicate;
import org.neo4j.function.Supplier;
import org.neo4j.kernel.IdGeneratorFactory;
-import org.neo4j.kernel.IdReuseEligibility;
import org.neo4j.kernel.IdType;
import org.neo4j.kernel.impl.api.KernelTransactionsSnapshot;
@@ -53,7 +52,7 @@ public void initialize( Supplier boundaries, final I
@Override
public boolean test( KernelTransactionsSnapshot snapshot )
{
- return snapshot.allClosed() && eligibleForReuse.isEligible();
+ return snapshot.allClosed() && eligibleForReuse.isEligible( snapshot );
}
};
for ( BufferingIdGenerator generator : overriddenIdGenerators )
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/IdReuseEligibility.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/IdReuseEligibility.java
similarity index 81%
rename from community/kernel/src/main/java/org/neo4j/kernel/IdReuseEligibility.java
rename to community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/IdReuseEligibility.java
index 32911e8520a2d..a664b327f69b8 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/IdReuseEligibility.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/IdReuseEligibility.java
@@ -17,7 +17,9 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
-package org.neo4j.kernel;
+package org.neo4j.kernel.impl.store.id;
+
+import org.neo4j.kernel.impl.api.KernelTransactionsSnapshot;
/**
* Deciding whether or not ids are eligible for being released from buffering since being deleted.
@@ -27,11 +29,11 @@ public interface IdReuseEligibility
IdReuseEligibility ALWAYS = new IdReuseEligibility()
{
@Override
- public boolean isEligible()
+ public boolean isEligible( KernelTransactionsSnapshot snapshot )
{
return true;
}
};
- boolean isEligible();
+ boolean isEligible( KernelTransactionsSnapshot snapshot );
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreMigrator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreMigrator.java
index 198f63a19cb9a..f221c056747f1 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreMigrator.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/StoreMigrator.java
@@ -57,6 +57,7 @@
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.StoreFactory;
+import org.neo4j.kernel.impl.store.TransactionId;
import org.neo4j.kernel.impl.store.counts.CountsTracker;
import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
@@ -105,11 +106,15 @@
import static org.neo4j.helpers.collection.IteratorUtil.loop;
import static org.neo4j.kernel.impl.store.CommonAbstractStore.ALL_STORES_VERSION;
import static org.neo4j.kernel.impl.store.MetaDataStore.DEFAULT_NAME;
+import static org.neo4j.kernel.impl.store.MetaDataStore.FIELD_NOT_PRESENT;
import static org.neo4j.kernel.impl.storemigration.FileOperation.COPY;
import static org.neo4j.kernel.impl.storemigration.FileOperation.DELETE;
import static org.neo4j.kernel.impl.storemigration.FileOperation.MOVE;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_CHECKSUM;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_LOG_BYTE_OFFSET;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_LOG_VERSION;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.UNKNOWN_TX_COMMIT_TIMESTAMP;
import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.withDynamicProcessorAssignment;
/**
@@ -141,13 +146,19 @@ public class StoreMigrator implements StoreMigrationParticipant
public StoreMigrator( MigrationProgressMonitor progressMonitor, FileSystemAbstraction fileSystem,
PageCache pageCache, Config config, LogService logService )
+ {
+ this( progressMonitor, fileSystem, pageCache, config, logService, new LegacyLogs( fileSystem ) );
+ }
+
+ StoreMigrator( MigrationProgressMonitor progressMonitor, FileSystemAbstraction fileSystem,
+ PageCache pageCache, Config config, LogService logService, LegacyLogs legacyLogs )
{
this.progressMonitor = progressMonitor;
this.fileSystem = fileSystem;
this.pageCache = pageCache;
this.config = config;
this.logService = logService;
- this.legacyLogs = new LegacyLogs( fileSystem );
+ this.legacyLogs = legacyLogs;
}
@Override
@@ -159,10 +170,12 @@ public void migrate( File storeDir, File migrationDir, SchemaIndexProvider schem
// Extract information about the last transaction from legacy neostore
File neoStore = new File( storeDir, MetaDataStore.DEFAULT_NAME );
long lastTxId = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_ID );
- long lastTxChecksum = extractTransactionChecksum( neoStore, storeDir, lastTxId );
+ // Checksum and timestamp
+ // TODO: Do we ever use this information during migration?
+ TransactionId lastTxInfo = extractTransactionIdInformation( neoStore, storeDir, lastTxId );
LogPosition lastTxLogPosition = extractTransactionLogPosition( neoStore, storeDir, lastTxId );
- // Write the tx checksum to file in migrationDir, because we need it later when moving files into storeDir
- writeLastTxChecksum( migrationDir, lastTxChecksum );
+ // Write the tx information to file in migrationDir, because we need it later when moving files into storeDir
+ writeLastTxInformation( migrationDir, lastTxInfo );
writeLastTxLogPosition( migrationDir, lastTxLogPosition );
@@ -178,8 +191,8 @@ public void migrate( File storeDir, File migrationDir, SchemaIndexProvider schem
case Legacy20Store.LEGACY_VERSION:
case Legacy19Store.LEGACY_VERSION:
// migrate stores
- migrateWithBatchImporter( storeDir, migrationDir,
- lastTxId, lastTxChecksum, lastTxLogPosition.getLogVersion(), lastTxLogPosition.getByteOffset(),
+ migrateWithBatchImporter( storeDir, migrationDir, lastTxId, lastTxInfo.checksum(),
+ lastTxLogPosition.getLogVersion(), lastTxLogPosition.getByteOffset(),
pageCache, versionToMigrateFrom );
// don't create counters from scratch, since the batch importer just did
break;
@@ -196,11 +209,12 @@ public void migrate( File storeDir, File migrationDir, SchemaIndexProvider schem
progressMonitor.finished();
}
- private void writeLastTxChecksum( File migrationDir, long lastTxChecksum ) throws IOException
+ private void writeLastTxInformation( File migrationDir, TransactionId txInfo ) throws IOException
{
- try ( Writer writer = fileSystem.openAsWriter( lastTxChecksumFile( migrationDir ), UTF8, false ) )
+ try ( Writer writer = fileSystem.openAsWriter( lastTxInformationFile( migrationDir), UTF8, false ) )
{
- writer.write( String.valueOf( lastTxChecksum ) );
+ // TODO: Use same splitter as for writeLastTxPosition?
+ writer.write( txInfo.transactionId() + "A" + txInfo.checksum() + "A" + txInfo.commitTimestamp() );
}
}
@@ -212,14 +226,18 @@ private void writeLastTxLogPosition( File migrationDir, LogPosition lastTxLogPos
}
}
+ // TODO: WRITE TESTS
// accessible for tests
- static long readLastTxChecksum( FileSystemAbstraction fileSystem, File migrationDir ) throws IOException
+ static TransactionId readLastTxInformation( FileSystemAbstraction fileSystem, File migrationDir ) throws IOException
{
- try ( Reader reader = fileSystem.openAsReader( lastTxChecksumFile( migrationDir ), UTF8 ) )
+ try ( Reader reader = fileSystem.openAsReader( lastTxInformationFile( migrationDir ), UTF8 ) )
{
- char[] buffer = new char[100];
+ char[] buffer = new char[4096]; // TODO: Why so large buffer?
int chars = reader.read( buffer );
- return Long.parseLong( String.valueOf( buffer, 0, chars ) );
+ String s = String.valueOf( buffer, 0, chars );
+ String[] split = s.split( "A" );
+ return new TransactionId( Long.parseLong( split[0] ), Long.parseLong( split[1] ),
+ Long.parseLong( split[2] ) );
}
}
@@ -228,7 +246,7 @@ static LogPosition readLastTxLogPosition( FileSystemAbstraction fileSystem, File
{
try ( Reader reader = fileSystem.openAsReader( lastTxLogPositionFile( migrationDir ), UTF8 ) )
{
- char[] buffer = new char[4096];
+ char[] buffer = new char[4096]; // TODO: Why so large buffer?
int chars = reader.read( buffer );
String s = String.valueOf( buffer, 0, chars );
String[] split = s.split( "A" );
@@ -236,9 +254,9 @@ static LogPosition readLastTxLogPosition( FileSystemAbstraction fileSystem, File
}
}
- private static File lastTxChecksumFile( File migrationDir )
+ private static File lastTxInformationFile( File migrationDir )
{
- return new File( migrationDir, "lastxchecksum" );
+ return new File( migrationDir, "lastxinformation" );
}
private static File lastTxLogPositionFile( File migrationDir )
@@ -246,29 +264,30 @@ private static File lastTxLogPositionFile( File migrationDir )
return new File( migrationDir, "lastxlogposition" );
}
- private long extractTransactionChecksum( File neoStore, File storeDir, long txId ) throws IOException
+ // accessible for tests
+ protected TransactionId extractTransactionIdInformation( File neoStore, File storeDir, long txId ) throws IOException
{
+ long checksum = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_CHECKSUM );
+ long commitTimestamp = MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_COMMIT_TIMESTAMP );
+ if ( checksum != FIELD_NOT_PRESENT && commitTimestamp != FIELD_NOT_PRESENT )
+ {
+ return new TransactionId( txId, checksum, commitTimestamp );
+ }
+ // The legacy store we're migrating doesn't have this record in neostore so try to extract it from tx log
try
{
- return MetaDataStore.getRecord( pageCache, neoStore, Position.LAST_TRANSACTION_CHECKSUM );
+ return legacyLogs.getTransactionInformation( storeDir, txId );
}
- catch ( IllegalStateException e )
+ catch ( IOException ioe )
{
- // The legacy store we're migrating doesn't have this record in neostore so try to extract it from tx log
- try
- {
- return legacyLogs.getTransactionChecksum( storeDir, txId );
- }
- catch ( IOException ioe )
- {
- // OK, so the legacy store didn't even have this transaction checksum in its transaction logs,
- // so just generate a random new one. I don't think it matters since we know that in a
- // multi-database scenario there can only be one of them upgrading, the other ones will have to
- // copy that database.
- return txId == TransactionIdStore.BASE_TX_ID
- ? TransactionIdStore.BASE_TX_CHECKSUM
- : Math.abs( new Random().nextLong() );
- }
+ // OK, so we could not get the transaction information from the legacy store logs,
+ // so just generate a random new one. I don't think it matters since we know that in a
+ // multi-database scenario there can only be one of them upgrading, the other ones will have to
+ // copy that database.
+ return txId == TransactionIdStore.BASE_TX_ID
+ ? new TransactionId( txId, BASE_TX_CHECKSUM, BASE_TX_COMMIT_TIMESTAMP )
+ : new TransactionId( txId, Math.abs( new Random().nextLong() ),
+ UNKNOWN_TX_COMMIT_TIMESTAMP );
}
}
@@ -372,7 +391,7 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
Configuration importConfig = new Configuration.Overridden( config );
AdditionalInitialIds additionalInitialIds =
- readAdditionalIds( storeDir, lastTxId, lastTxChecksum, lastTxLogVersion, lastTxLogByteOffset );
+ readAdditionalIds( storeDir, lastTxId, lastTxChecksum, lastTxLogVersion, lastTxLogByteOffset );
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem,
importConfig, logService, withDynamicProcessorAssignment( migrationBatchImporterMonitor(
legacyStore ), importConfig ),
@@ -857,9 +876,16 @@ private void updateOrAddNeoStoreFieldsAsPartOfMigration( File migrationDir, File
// to look up checksums for transactions succeeding T by looking at its transaction logs,
// but T needs to be stored in neostore to be accessible. Obvioously this scenario is only
// problematic as long as we don't migrate and translate old logs.
- long lastTxChecksum = readLastTxChecksum( fileSystem, migrationDir );
- MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_TRANSACTION_CHECKSUM, lastTxChecksum );
- MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.UPGRADE_TRANSACTION_CHECKSUM, lastTxChecksum );
+
+ // TODO: Is this what we want to do with txInfo and do we not need UPGRADE_TRANSACTION_COMMIT_TIMESTAMP?
+ TransactionId lastTxInfo = readLastTxInformation( fileSystem, migrationDir );
+ // Checksum
+ MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_TRANSACTION_CHECKSUM,
+ lastTxInfo.checksum() );
+ MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.UPGRADE_TRANSACTION_CHECKSUM,
+ lastTxInfo.checksum() );
+ MetaDataStore.setRecord( pageCache, storeDirNeoStore, Position.LAST_TRANSACTION_COMMIT_TIMESTAMP,
+ lastTxInfo.commitTimestamp() );
// add LAST_CLOSED_TRANSACTION_LOG_VERSION and LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET to the migrated
// NeoStore
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/legacylogs/LegacyLogs.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/legacylogs/LegacyLogs.java
index d9e2e2803b355..33433d409cd24 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/legacylogs/LegacyLogs.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/legacylogs/LegacyLogs.java
@@ -28,6 +28,7 @@
import org.neo4j.helpers.Pair;
import org.neo4j.io.fs.FileSystemAbstraction;
+import org.neo4j.kernel.impl.store.TransactionId;
import org.neo4j.kernel.impl.storemigration.FileOperation;
import org.neo4j.kernel.impl.transaction.log.IOCursor;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
@@ -116,7 +117,8 @@ public void migrateLogs( File storeDir, File migrationDir ) throws IOException
}
}
- public long getTransactionChecksum( File storeDir, long transactionId ) throws IOException
+ // TODO: TEST THIS
+ public TransactionId getTransactionInformation( File storeDir, long transactionId ) throws IOException
{
List logFiles = Arrays.asList( fs.listFiles( storeDir, versionedLegacyLogFilesFilter ) );
Collections.sort( logFiles, NEWEST_FIRST );
@@ -141,7 +143,8 @@ else if ( logEntry instanceof LogEntryCommit )
LogEntryCommit commitEntry = logEntry.as();
if ( commitEntry.getTxId() == transactionId )
{
- return startEntry.checksum();
+ return new TransactionId( transactionId, startEntry.checksum(),
+ commitEntry.getTimeWritten() );
}
}
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppender.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppender.java
index e1d53e1a0d147..78876e37a04c7 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppender.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/BatchingTransactionAppender.java
@@ -203,16 +203,18 @@ private static class TransactionCommitment implements Commitment
private final boolean hasLegacyIndexChanges;
private final long transactionId;
private final long transactionChecksum;
+ private final long transactionCommitTimestamp;
private final LogPosition logPosition;
private final TransactionIdStore transactionIdStore;
private boolean markedAsCommitted;
TransactionCommitment( boolean hasLegacyIndexChanges, long transactionId, long transactionChecksum,
- LogPosition logPosition, TransactionIdStore transactionIdStore )
+ long transactionCommitTimestamp, LogPosition logPosition, TransactionIdStore transactionIdStore )
{
this.hasLegacyIndexChanges = hasLegacyIndexChanges;
this.transactionId = transactionId;
this.transactionChecksum = transactionChecksum;
+ this.transactionCommitTimestamp = transactionCommitTimestamp;
this.logPosition = logPosition;
this.transactionIdStore = transactionIdStore;
}
@@ -221,7 +223,7 @@ private static class TransactionCommitment implements Commitment
public void publishAsCommitted()
{
markedAsCommitted = true;
- transactionIdStore.transactionCommitted( transactionId, transactionChecksum );
+ transactionIdStore.transactionCommitted( transactionId, transactionChecksum, transactionCommitTimestamp );
}
@Override
@@ -280,8 +282,8 @@ private TransactionCommitment appendToLog( TransactionRepresentation transaction
legacyIndexTransactionOrdering.offer( transactionId );
}
return new TransactionCommitment(
- hasLegacyIndexChanges, transactionId, transactionChecksum, logPositionAfterCommit,
- transactionIdStore );
+ hasLegacyIndexChanges, transactionId, transactionChecksum, transaction.getTimeCommitted(),
+ logPositionAfterCommit, transactionIdStore );
}
catch ( final Throwable panic )
{
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/ReadOnlyTransactionIdStore.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/ReadOnlyTransactionIdStore.java
index 658b9e6bb386e..080471543115c 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/ReadOnlyTransactionIdStore.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/ReadOnlyTransactionIdStore.java
@@ -62,7 +62,7 @@ public long nextCommittingTransactionId()
}
@Override
- public void transactionCommitted( long transactionId, long checksum )
+ public void transactionCommitted( long transactionId, long checksum, long commitTimestamp )
{
throw new UnsupportedOperationException( "Read-only transaction ID store" );
}
@@ -76,7 +76,7 @@ public long getLastCommittedTransactionId()
@Override
public TransactionId getLastCommittedTransaction()
{
- return new TransactionId( transactionId, transactionChecksum );
+ return new TransactionId( transactionId, transactionChecksum, BASE_TX_COMMIT_TIMESTAMP );
}
@Override
@@ -98,7 +98,8 @@ public long[] getLastClosedTransaction()
}
@Override
- public void setLastCommittedAndClosedTransactionId( long transactionId, long checksum, long logVersion, long logByteOffset )
+ public void setLastCommittedAndClosedTransactionId( long transactionId, long checksum, long commitTimestamp,
+ long logByteOffset, long logVersion )
{
throw new UnsupportedOperationException( "Read-only transaction ID store" );
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionAppender.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionAppender.java
index b34914806e362..1b858aaca569e 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionAppender.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionAppender.java
@@ -37,7 +37,7 @@ public interface TransactionAppender
*
* Any failure happening inside this method will automatically
* {@link TransactionIdStore#transactionClosed(long, long, long) close} the transaction if the execution got past
- * {@link TransactionIdStore#transactionCommitted(long, long)}, so callers should not close transactions
+ * {@link TransactionIdStore#transactionCommitted(long, long, long)}, so callers should not close transactions
* on exception thrown from this method. Although callers must make sure that successfully appended
* transactions exiting this method are {@link Commitment#publishAsApplied()}}.
*
@@ -59,7 +59,7 @@ public interface TransactionAppender
*
* Any failure happening inside this method will automatically
* {@link TransactionIdStore#transactionClosed(long, long, long) close} the transaction if the execution got past
- * {@link TransactionIdStore#transactionCommitted(long, long)}, so callers should not close transactions
+ * {@link TransactionIdStore#transactionCommitted(long, long, long)}, so callers should not close transactions
* on exception thrown from this method. Although callers must make sure that successfully appended
* transactions exiting this method are {@link Commitment#publishAsApplied()}.
*
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionIdStore.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionIdStore.java
index 038e9cb195cca..e06844aae0e98 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionIdStore.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/TransactionIdStore.java
@@ -33,7 +33,7 @@
*
*
{@link #nextCommittingTransactionId()} is called and an id is returned to a committer.
* At this point that id isn't visible from any getter.
- *
{@link #transactionCommitted(long, long)} is called with this id after the fact that the transaction
+ *
{@link #transactionCommitted(long, long, long)} is called with this id after the fact that the transaction
* has been committed, i.e. written forcefully to a log. After this call the id may be visible from
* {@link #getLastCommittedTransactionId()} if all ids before it have also been committed.
*
{@link #transactionClosed(long, long, long)} is called with this id again, this time after all changes the
@@ -43,16 +43,47 @@
*/
public interface TransactionIdStore
{
+ // TODO: Document this somewhere.
+ /**
+ * Empty store
+ * TIMESTAMP BASE_TX_COMMIT_TIMESTAMP (0)
+ * ==> FINE. NO KILL because no previous state can have been observed anyway
+ *
+ * Upgraded store w/ tx logs
+ * TIMESTAMP CARRIED OVER
+ * ==> FINE
+ *
+ * Upgraded store w/o tx logs
+ * TIMESTAMP UNKNOWN_TX_COMMIT_TIMESTAMP (1)
+ * ==> READS WILL TERMINATE WHEN FIRST PULL UPDATES HAPPENS
+ *
+ * TODO
+ * Store on 2.3.prev, w/ tx logs (no upgrade)
+ * TIMESTAMP CARRIED OVER
+ * ==> FINE
+ *
+ * TODO
+ * Store on 2.3.prev w/o tx logs (no upgrade)
+ * TIMESTAMP UNKNOWN_TX_COMMIT_TIMESTAMP (1)
+ * ==> READS WILL TERMINATE WHEN FIRST PULL UPDATES HAPPENS
+ *
+ * Store already on 2.3.next, w/ or w/o tx logs
+ * TIMESTAMP CORRECT
+ * ==> FINE
+ */
+
// Tx id counting starting from this value (this value means no transaction ever committed)
long BASE_TX_ID = 1;
long BASE_TX_CHECKSUM = 0;
+ long BASE_TX_COMMIT_TIMESTAMP = 0; // TODO: JAVADOCSA THIS
+ long UNKNOWN_TX_COMMIT_TIMESTAMP = 1; // TODO: JAVADOCSA THIS
long BASE_TX_LOG_VERSION = 0;
long BASE_TX_LOG_BYTE_OFFSET = LOG_HEADER_SIZE;
/**
* @return the next transaction id for a committing transaction. The transaction id is incremented
* with each call. Ids returned from this method will not be visible from {@link #getLastCommittedTransactionId()}
- * until handed to {@link #transactionCommitted(long, long)}.
+ * until handed to {@link #transactionCommitted(long, long, long)}.
*/
long nextCommittingTransactionId();
@@ -62,11 +93,12 @@ public interface TransactionIdStore
* seen given to this method will be visible in {@link #getLastCommittedTransactionId()}.
* @param transactionId the applied transaction id.
* @param checksum checksum of the transaction.
+ * @param commitTimestamp
*/
- void transactionCommitted( long transactionId, long checksum );
+ void transactionCommitted( long transactionId, long checksum, long commitTimestamp );
/**
- * @return highest seen {@link #transactionCommitted(long, long) committed transaction id}.
+ * @return highest seen {@link #transactionCommitted(long, long, long) committed transaction id}.
*/
long getLastCommittedTransactionId();
@@ -96,13 +128,14 @@ public interface TransactionIdStore
/**
* Used by recovery, where last committed/closed transaction ids are set.
* Perhaps this shouldn't be exposed like this?
- *
- * @param transactionId transaction id that will be the last closed/committed id.
+ * @param transactionId transaction id that will be the last closed/committed id.
* @param checksum checksum of the transaction.
- * @param logVersion version of log the committed entry has been written into.
+ * @param commitTimestamp
* @param byteOffset offset in the log file where the committed entry has been written.
+ * @param logVersion version of log the committed entry has been written into.
*/
- void setLastCommittedAndClosedTransactionId( long transactionId, long checksum, long logVersion, long byteOffset );
+ void setLastCommittedAndClosedTransactionId( long transactionId, long checksum, long commitTimestamp,
+ long byteOffset, long logVersion );
/**
* Signals that a transaction with the given transaction id has been fully applied. Calls to this method
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/state/RecoveryVisitor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/state/RecoveryVisitor.java
index baeb8b74b95cc..15ba26a000759 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/state/RecoveryVisitor.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/state/RecoveryVisitor.java
@@ -46,6 +46,7 @@ public interface Monitor
private final IndexUpdatesValidator indexUpdatesValidator;
private final Monitor monitor;
private long lastTransactionIdApplied = -1;
+ private long lastTransactionCommitTimestamp;
private long lastTransactionChecksum;
private LogPosition lastTransactionLogPosition;
@@ -74,6 +75,7 @@ public boolean visit( RecoverableTransaction transaction ) throws IOException
}
lastTransactionIdApplied = txId;
+ lastTransactionCommitTimestamp = transaction.representation().getCommitEntry().getTimeWritten();
lastTransactionChecksum = LogEntryStart.checksum( representation.getStartEntry() );
lastTransactionLogPosition = transaction.positionAfterTx();
monitor.transactionRecovered( txId );
@@ -86,7 +88,8 @@ public void close() throws IOException
if ( lastTransactionIdApplied != -1 )
{
store.setLastCommittedAndClosedTransactionId( lastTransactionIdApplied, lastTransactionChecksum,
- lastTransactionLogPosition.getLogVersion(), lastTransactionLogPosition.getByteOffset() );
+ lastTransactionCommitTimestamp, lastTransactionLogPosition.getByteOffset(),
+ lastTransactionLogPosition.getLogVersion() );
}
}
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/AdditionalInitialIds.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/AdditionalInitialIds.java
index 543db267b5648..003c41d9e4e08 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/AdditionalInitialIds.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/AdditionalInitialIds.java
@@ -98,4 +98,5 @@ public long lastCommittedTransactionLogByteOffset()
return TransactionIdStore.BASE_TX_LOG_BYTE_OFFSET;
}
};
+
}
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java
index 10e49c34297e2..a54040ace5d31 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingNeoStores.java
@@ -59,13 +59,13 @@
import org.neo4j.unsafe.impl.batchimport.store.io.IoTracer;
import static java.lang.String.valueOf;
-
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.mapped_memory_page_size;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.pagecache_memory;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.io.ByteUnit.kibiBytes;
import static org.neo4j.io.ByteUnit.mebiBytes;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
/**
* Creator and accessor of {@link NeoStores} with some logic to provide very batch friendly services to the
@@ -129,7 +129,8 @@ public long countBytesWritten()
}
neoStores.getMetaDataStore().setLastCommittedAndClosedTransactionId(
initialIds.lastCommittedTransactionId(), initialIds.lastCommittedTransactionChecksum(),
- initialIds.lastCommittedTransactionLogVersion(), initialIds.lastCommittedTransactionLogByteOffset() );
+ BASE_TX_COMMIT_TIMESTAMP, initialIds.lastCommittedTransactionLogByteOffset(),
+ initialIds.lastCommittedTransactionLogVersion() );
this.propertyKeyRepository = new BatchingPropertyKeyTokenRepository(
neoStores.getPropertyKeyTokenStore(), initialIds.highPropertyKeyTokenId() );
this.labelRepository = new BatchingLabelTokenRepository(
diff --git a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java
index 2bb965144e667..a43e4219d3d08 100644
--- a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java
+++ b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java
@@ -32,7 +32,6 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -134,8 +133,7 @@ public Void call() throws Exception
}
catch ( Exception e )
{
- assertThat( rootCause( e ), anyOf( instanceOf( TransactionFailureException.class ),
- instanceOf( TransactionTerminatedException.class ) ) );
+ assertThat( rootCause( e ), instanceOf( TransientTransactionFailureException.class ) );
}
}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java b/community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java
index 74608832bd5b1..a9c86bd8d0d4b 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java
@@ -79,6 +79,7 @@
import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel.DEFAULT_READ_AHEAD_SIZE;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderWriter.writeLogHeader;
import static org.neo4j.kernel.impl.transaction.log.entry.LogVersions.CURRENT_LOG_VERSION;
@@ -89,7 +90,8 @@ public class RecoveryTest
@Rule
public final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() );
private final LogVersionRepository logVersionRepository = new DeadSimpleLogVersionRepository( 1L );
- private final TransactionIdStore transactionIdStore = new DeadSimpleTransactionIdStore( 5L, 0, 0, 0 );
+ private final TransactionIdStore transactionIdStore = new DeadSimpleTransactionIdStore( 5L, 0,
+ BASE_TX_COMMIT_TIMESTAMP, 0, 0 );
private final int logVersion = 1;
private LogEntry lastCommittedTxStartEntry;
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelStatementTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelStatementTest.java
index 4ebcb3af21a25..9bb6a1437440d 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelStatementTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelStatementTest.java
@@ -22,6 +22,7 @@
import org.junit.Test;
import org.neo4j.graphdb.TransactionTerminatedException;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.labelscan.LabelScanReader;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
@@ -69,7 +70,7 @@ public void shouldCloseOpenedLabelScanReader() throws Exception
public void shouldThrowTerminateExceptionWhenTransactionTerminated() throws Exception
{
KernelTransactionImplementation transaction = mock( KernelTransactionImplementation.class );
- when( transaction.shouldBeTerminated() ).thenReturn( true );
+ when( transaction.shouldBeTerminated() ).thenReturn( Status.General.UnknownFailure );
KernelStatement statement = new KernelStatement(
transaction, mock( IndexReaderFactory.class ),
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java
index d3ac360036367..5fdef934ef986 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java
@@ -33,6 +33,7 @@
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.helpers.FakeClock;
import org.neo4j.kernel.api.KernelTransaction;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.impl.api.store.ProcedureCache;
@@ -72,6 +73,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
public class KernelTransactionImplementationTest
{
@@ -148,7 +150,7 @@ public void shouldRollbackOnClosingTerminatedTransaction() throws Exception
// GIVEN
KernelTransaction transaction = newInitializedTransaction();
transaction.success();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
try
{
@@ -173,8 +175,8 @@ public void shouldRollbackOnClosingSuccessfulButTerminatedTransaction() throws E
try ( KernelTransaction transaction = newInitializedTransaction() )
{
// WHEN
- transaction.markForTermination();
- assertTrue( transaction.shouldBeTerminated() );
+ transaction.markForTermination( Status.General.UnknownFailure );
+ assertEquals( Status.General.UnknownFailure, transaction.shouldBeTerminated() );
}
// THEN
@@ -188,10 +190,10 @@ public void shouldRollbackOnClosingTerminatedButSuccessfulTransaction() throws E
{
// GIVEN
KernelTransaction transaction = newInitializedTransaction();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
transaction.success();
- assertTrue( transaction.shouldBeTerminated() );
+ assertEquals( Status.General.UnknownFailure, transaction.shouldBeTerminated() );
try
{
@@ -215,9 +217,9 @@ public void shouldNotDowngradeFailureState() throws Exception
try ( KernelTransaction transaction = newInitializedTransaction() )
{
// WHEN
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
transaction.failure();
- assertTrue( transaction.shouldBeTerminated() );
+ assertEquals( Status.General.UnknownFailure, transaction.shouldBeTerminated() );
}
// THEN
@@ -236,7 +238,7 @@ public void shouldIgnoreTerminateAfterCommit() throws Exception
KernelTransaction transaction = newInitializedTransaction( true, locks );
transaction.success();
transaction.close();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
// THEN
verify( transactionMonitor, times( 1 ) ).transactionFinished( true );
@@ -254,7 +256,7 @@ public void shouldIgnoreTerminateAfterRollback() throws Exception
KernelTransaction transaction = newInitializedTransaction( true, locks );
transaction.close();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
// THEN
verify( transactionMonitor, times( 1 ) ).transactionFinished( false );
@@ -268,7 +270,7 @@ public void shouldThrowOnTerminationInCommit() throws Exception
{
KernelTransaction transaction = newInitializedTransaction();
transaction.success();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
transaction.close();
}
@@ -277,7 +279,7 @@ public void shouldThrowOnTerminationInCommit() throws Exception
public void shouldIgnoreTerminationDuringRollback() throws Exception
{
KernelTransaction transaction = newInitializedTransaction();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
transaction.close();
// THEN
@@ -299,7 +301,7 @@ public void shouldAllowTerminatingFromADifferentThread() throws Exception
public void run()
{
latch.awaitStart();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
latch.finish();
}
} );
@@ -344,7 +346,7 @@ public Void answer( InvocationOnMock invocationOnMock ) throws Throwable
} ).when( recordState ).extractCommands( anyListOf( Command.class ) );
try ( KernelTransactionImplementation transaction = newInitializedTransaction() )
{
- transaction.initialize( 5L );
+ transaction.initialize( 5L, BASE_TX_COMMIT_TIMESTAMP );
// WHEN committing it at a later point
clock.forward( 5, MILLISECONDS );
@@ -366,7 +368,7 @@ public void shouldStillReturnTransactionInstanceWithTerminationMarkToPool() thro
KernelTransactionImplementation transaction = newInitializedTransaction();
// WHEN
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
transaction.close();
// THEN
@@ -379,10 +381,10 @@ public void shouldBeAbleToReuseTerminatedTransaction() throws Exception
// GIVEN
KernelTransactionImplementation transaction = newInitializedTransaction();
transaction.close();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
// WHEN
- transaction.initialize( 10L );
+ transaction.initialize( 10L, BASE_TX_COMMIT_TIMESTAMP );
transaction.txState().nodeDoCreate( 11L );
transaction.success();
transaction.close();
@@ -402,7 +404,7 @@ public void shouldAcquireNewLocksClientEveryTimeTransactionIsReused() throws Exc
reset( locks );
// WHEN
- transaction.initialize( 10L );
+ transaction.initialize( 10L, BASE_TX_COMMIT_TIMESTAMP );
transaction.close();
// THEN
@@ -418,7 +420,7 @@ public void shouldIncrementReuseCounterOnReuse() throws Exception
// WHEN
transaction.close();
- transaction.initialize( 1 );
+ transaction.initialize( 1, BASE_TX_COMMIT_TIMESTAMP );
// THEN
assertEquals( reuseCount + 1, transaction.getReuseCount() );
@@ -429,9 +431,9 @@ public void markForTerminationNotInitializedTransaction()
{
KernelTransactionImplementation transaction = newTransaction( true, new NoOpLocks() );
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
- assertTrue( transaction.shouldBeTerminated() );
+ assertEquals( Status.General.UnknownFailure, transaction.shouldBeTerminated() );
}
@Test
@@ -443,9 +445,9 @@ public void markForTerminationInitializedTransaction()
KernelTransactionImplementation transaction = newInitializedTransaction( true, locks );
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
- assertTrue( transaction.shouldBeTerminated() );
+ assertEquals( Status.General.UnknownFailure, transaction.shouldBeTerminated() );
verify( client ).stop();
}
@@ -458,11 +460,11 @@ public void markForTerminationTerminatedTransaction()
KernelTransactionImplementation transaction = newInitializedTransaction( true, locks );
- transaction.markForTermination();
- transaction.markForTermination();
- transaction.markForTermination();
+ transaction.markForTermination( Status.General.UnknownFailure );
+ transaction.markForTermination( Status.General.UnknownFailure );
+ transaction.markForTermination( Status.General.UnknownFailure );
- assertTrue( transaction.shouldBeTerminated() );
+ assertEquals( Status.General.UnknownFailure, transaction.shouldBeTerminated() );
verify( client ).stop();
verify( transactionMonitor ).transactionTerminated();
}
@@ -475,7 +477,7 @@ public void terminatedTxMarkedNeitherSuccessNorFailureClosesWithoutThrowing() th
when( locks.newClient() ).thenReturn( client );
KernelTransactionImplementation tx = newInitializedTransaction( true, locks );
- tx.markForTermination();
+ tx.markForTermination( Status.General.UnknownFailure );
tx.close();
@@ -492,7 +494,7 @@ public void terminatedTxMarkedForSuccessThrowsOnClose()
KernelTransactionImplementation tx = newInitializedTransaction( true, locks );
tx.success();
- tx.markForTermination();
+ tx.markForTermination( Status.General.UnknownFailure );
try
{
@@ -514,7 +516,7 @@ public void terminatedTxMarkedForFailureClosesWithoutThrowing() throws Transacti
KernelTransactionImplementation tx = newInitializedTransaction( true, locks );
tx.failure();
- tx.markForTermination();
+ tx.markForTermination( Status.General.UnknownFailure );
tx.close();
@@ -532,7 +534,7 @@ public void terminatedTxMarkedForBothSuccessAndFailureThrowsOnClose()
KernelTransactionImplementation tx = newInitializedTransaction( true, locks );
tx.success();
tx.failure();
- tx.markForTermination();
+ tx.markForTermination( Status.General.UnknownFailure );
try
{
@@ -595,7 +597,7 @@ private KernelTransactionImplementation newInitializedTransaction()
private KernelTransactionImplementation newInitializedTransaction( boolean txTerminationAware, Locks locks )
{
KernelTransactionImplementation transaction = newTransaction( txTerminationAware, locks );
- transaction.initialize( 0 );
+ transaction.initialize( 0, BASE_TX_COMMIT_TIMESTAMP );
return transaction;
}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java
index 4bb02c7ed918f..87fb0dac5fd14 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java
@@ -35,6 +35,7 @@
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.helpers.FakeClock;
import org.neo4j.kernel.api.KernelTransaction;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
@@ -53,6 +54,7 @@
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -73,7 +75,7 @@ public void transactionCantBeTerminatedAfterItIsClosed() throws Exception
@Override
public void accept( TestKernelTransaction tx )
{
- tx.markForTermination();
+ tx.markForTermination( Status.Transaction.MarkedAsFailed );
}
},
new Consumer()
@@ -82,7 +84,7 @@ public void accept( TestKernelTransaction tx )
public void accept( TestKernelTransaction tx )
{
close( tx );
- assertFalse( tx.shouldBeTerminated() );
+ assertNull( tx.shouldBeTerminated() );
tx.initialize();
}
}
@@ -241,7 +243,7 @@ void executeOn( KernelTransaction tx )
@Override
void executeOn( KernelTransaction tx )
{
- tx.markForTermination();
+ tx.markForTermination( Status.Transaction.MarkedAsFailed );
}
};
@@ -403,7 +405,7 @@ static TestKernelTransaction create()
TestKernelTransaction initialize()
{
- initialize( 42 );
+ initialize( 42, 42 );
monitor.reset();
return this;
}
@@ -420,13 +422,13 @@ void assertRolledBack()
void assertTerminated()
{
- assertTrue( shouldBeTerminated() );
+ assertEquals( Status.Transaction.MarkedAsFailed, shouldBeTerminated() );
assertTrue( monitor.terminated );
}
void assertNotTerminated()
{
- assertFalse( shouldBeTerminated() );
+ assertNull( shouldBeTerminated() );
assertFalse( monitor.terminated );
}
}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java
index b5f0a73f528b2..5f063fb9c5995 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java
@@ -39,6 +39,7 @@
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
+import org.neo4j.kernel.impl.store.TransactionId;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor;
@@ -111,7 +112,7 @@ public void shouldDisposeTransactionsWhenAsked() throws Exception
assertThat( postDispose, not( equalTo( first ) ) );
assertThat( postDispose, not( equalTo( second ) ) );
- assertTrue( leftOpen.shouldBeTerminated() );
+ assertTrue( leftOpen.shouldBeTerminated() != null );
}
@Test
@@ -262,7 +263,9 @@ private static KernelTransactions newKernelTransactions( TransactionCommitProces
when( readLayer.acquireStatement() ).thenReturn( mock( StoreStatement.class ) );
NeoStores neoStores = mock( NeoStores.class );
- when( neoStores.getMetaDataStore() ).thenReturn( mock( MetaDataStore.class ) );
+ MetaDataStore metaDataStore = mock( MetaDataStore.class );
+ when( metaDataStore.getLastCommittedTransaction() ).thenReturn( new TransactionId( 2, 3, 4 ) );
+ when( neoStores.getMetaDataStore() ).thenReturn( metaDataStore );
return new KernelTransactions( contextSupplier, neoStores, locks,
mock( IntegrityValidator.class ), null, null, null, null, null, null, null,
TransactionHeaderInformationFactory.DEFAULT, readLayer, commitProcess, null,
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/LockingStatementOperationsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/LockingStatementOperationsTest.java
index 545e4f255323b..adc6ce05fd18f 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/LockingStatementOperationsTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/LockingStatementOperationsTest.java
@@ -79,7 +79,7 @@ public LockingStatementOperationsTest()
entityReadOps, entityWriteOps, schemaReadOps, schemaWriteOps, schemaStateOps
);
- when( transaction.shouldBeTerminated() ).thenReturn( false );
+ when( transaction.shouldBeTerminated() ).thenReturn( null );
}
@Test
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/TransactionRepresentationCommitProcessTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/TransactionRepresentationCommitProcessTest.java
index 2e39269768f6b..658f8d13f1c0e 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/TransactionRepresentationCommitProcessTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/TransactionRepresentationCommitProcessTest.java
@@ -54,6 +54,7 @@
import static org.neo4j.helpers.Exceptions.contains;
import static org.neo4j.kernel.impl.api.TransactionApplicationMode.INTERNAL;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.*;
public class TransactionRepresentationCommitProcessTest
{
@@ -85,7 +86,7 @@ public void shouldNotIncrementLastCommittedTxIdIfAppendFails() throws Exception
assertTrue( contains( e, rootCause.getMessage(), rootCause.getClass() ) );
}
- verify( transactionIdStore, times( 0 ) ).transactionCommitted( txId, 0 );
+ verify( transactionIdStore, times( 0 ) ).transactionCommitted( txId, 0, BASE_TX_COMMIT_TIMESTAMP );
}
@Test
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/constraints/ConstraintIndexCreatorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/constraints/ConstraintIndexCreatorTest.java
index 2278b529f3757..1f6620f6e2b93 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/constraints/ConstraintIndexCreatorTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/constraints/ConstraintIndexCreatorTest.java
@@ -29,6 +29,7 @@
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.Statement;
import org.neo4j.kernel.api.TransactionHook;
+import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.exceptions.schema.ConstraintVerificationFailedKernelException;
@@ -198,13 +199,13 @@ public boolean isOpen()
}
@Override
- public boolean shouldBeTerminated()
+ public Status shouldBeTerminated()
{
- return false;
+ return null;
}
@Override
- public void markForTermination()
+ public void markForTermination( Status reason )
{
}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/KernelIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/KernelIT.java
index 4807f7230259e..ec0e71e70abfc 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/KernelIT.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/KernelIT.java
@@ -32,7 +32,7 @@
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
-import org.neo4j.graphdb.TransactionTerminatedException;
+import org.neo4j.graphdb.TransientFailureException;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.SchemaWriteOperations;
import org.neo4j.kernel.api.Statement;
@@ -545,7 +545,7 @@ public void shouldKillTransactionsOnShutdown() throws Throwable
tx.acquireStatement().readOperations().nodeExists( 0l );
fail("Should have been terminated.");
}
- catch(TransactionTerminatedException e)
+ catch( TransientFailureException e )
{
// Success
}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/HighestTransactionIdTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/HighestTransactionIdTest.java
index 08a821819d8f7..5d2806c789030 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/HighestTransactionIdTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/HighestTransactionIdTest.java
@@ -25,47 +25,46 @@
import org.neo4j.test.Race;
+import static java.lang.Math.max;
+import static java.lang.Runtime.getRuntime;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static java.lang.Math.max;
-import static java.lang.Runtime.getRuntime;
-
public class HighestTransactionIdTest
{
@Test
public void shouldHardSetHighest() throws Exception
{
// GIVEN
- HighestTransactionId highest = new HighestTransactionId( 10, 10 );
+ HighestTransactionId highest = new HighestTransactionId( 10, 10, 10 );
// WHEN
- highest.set( 8, 1299128 );
+ highest.set( 8, 1299128, 42 );
// THEN
- assertEquals( new TransactionId( 8, 1299128 ), highest.get() );
+ assertEquals( new TransactionId( 8, 1299128, 42 ), highest.get() );
}
@Test
public void shouldOnlyKeepTheHighestOffered() throws Exception
{
// GIVEN
- HighestTransactionId highest = new HighestTransactionId( -1, -1 );
+ HighestTransactionId highest = new HighestTransactionId( -1, -1, -1 );
// WHEN/THEN
- assertAccepted( highest, 2, 123 );
- assertAccepted( highest, 5, 1231 );
- assertRejected( highest, 3, 334343 );
- assertRejected( highest, 4, 3343 );
- assertAccepted( highest, 10, 3343857 );
+ assertAccepted( highest, 2 );
+ assertAccepted( highest, 5 );
+ assertRejected( highest, 3 );
+ assertRejected( highest, 4 );
+ assertAccepted( highest, 10 );
}
@Test
public void shouldKeepHighestDuringConcurrentOfferings() throws Throwable
{
// GIVEN
- final HighestTransactionId highest = new HighestTransactionId( -1, -1 );
+ final HighestTransactionId highest = new HighestTransactionId( -1, -1, -1 );
Race race = new Race();
int updaters = max( 2, getRuntime().availableProcessors() );
final AtomicInteger accepted = new AtomicInteger();
@@ -77,7 +76,7 @@ public void shouldKeepHighestDuringConcurrentOfferings() throws Throwable
@Override
public void run()
{
- if ( highest.offer( id, id ) )
+ if ( highest.offer( id, id, id ) )
{
accepted.incrementAndGet();
}
@@ -93,17 +92,17 @@ public void run()
assertEquals( updaters, highest.get().transactionId() );
}
- private void assertAccepted( HighestTransactionId highest, long txId, long checksum )
+ private void assertAccepted( HighestTransactionId highest, long txId )
{
TransactionId current = highest.get();
- assertTrue( highest.offer( txId, checksum ) );
+ assertTrue( highest.offer( txId, -1, -1 ) );
assertTrue( txId > current.transactionId() );
}
- private void assertRejected( HighestTransactionId highest, long txId, long checksum )
+ private void assertRejected( HighestTransactionId highest, long txId )
{
TransactionId current = highest.get();
- assertFalse( highest.offer( txId, checksum ) );
+ assertFalse( highest.offer( txId, -1, -1 ) );
assertEquals( current, highest.get() );
}
}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/MetaDataStoreTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/MetaDataStoreTest.java
index 2f2402575fbc5..0c1fa41d546ab 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/MetaDataStoreTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/MetaDataStoreTest.java
@@ -34,6 +34,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
public class MetaDataStoreTest
{
@@ -107,6 +108,22 @@ public void getLastClosedTransactionIdShouldFailWhenStoreIsClosed() throws IOExc
}
}
+ @Test
+ public void getLastClosedTransactionShouldFailWhenStoreIsClosed() throws Exception
+ {
+ MetaDataStore metaDataStore = newMetaDataStore();
+ metaDataStore.close();
+ try
+ {
+ metaDataStore.getLastClosedTransaction();
+ fail( "Expected exception reading from MetaDataStore after being closed." );
+ }
+ catch ( Exception e )
+ {
+ assertThat( e, instanceOf( IllegalStateException.class ) );
+ }
+ }
+
@Test
public void getLastCommittedTransactionShouldFailWhenStoreIsClosed() throws IOException
{
@@ -242,7 +259,7 @@ public void setLastCommittedAndClosedTransactionIdShouldFailWhenStoreIsClosed()
metaDataStore.close();
try
{
- metaDataStore.setLastCommittedAndClosedTransactionId( 1, 1, 1, 1 );
+ metaDataStore.setLastCommittedAndClosedTransactionId( 1, 1, BASE_TX_COMMIT_TIMESTAMP, 1, 1 );
fail( "Expected exception reading from MetaDataStore after being closed." );
}
catch ( Exception e )
@@ -258,7 +275,7 @@ public void transactionCommittedShouldFailWhenStoreIsClosed() throws IOException
metaDataStore.close();
try
{
- metaDataStore.transactionCommitted( 1, 1 );
+ metaDataStore.transactionCommitted( 1, 1, BASE_TX_COMMIT_TIMESTAMP );
fail( "Expected exception reading from MetaDataStore after being closed." );
}
catch ( Exception e )
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java
index 853b98e02619e..2236d39a901ba 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java
@@ -85,8 +85,8 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
import static org.neo4j.helpers.collection.MapUtil.stringMap;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
public class NeoStoresTest
@@ -466,7 +466,7 @@ public void shouldNotReadNonRecordDataAsRecord() throws Exception
metaDataStore.setCreationTime( 3 );
metaDataStore.setRandomNumber( 4 );
metaDataStore.setCurrentLogVersion( 5 );
- metaDataStore.setLastCommittedAndClosedTransactionId( 6, 0, 0, 0 );
+ metaDataStore.setLastCommittedAndClosedTransactionId( 6, 0, 0, 0, 0 );
metaDataStore.setStoreVersion( 7 );
metaDataStore.setGraphNextProp( 8 );
metaDataStore.setLatestConstraintIntroducingTx( 9 );
@@ -575,7 +575,7 @@ public void shouldAddUpgradeFieldsToTheNeoStoreIfNotPresent() throws IOException
metaDataStore.setCreationTime( 3 );
metaDataStore.setRandomNumber( 4 );
metaDataStore.setCurrentLogVersion( 5 );
- metaDataStore.setLastCommittedAndClosedTransactionId( 6, 0, 0, 0 );
+ metaDataStore.setLastCommittedAndClosedTransactionId( 6, 0, BASE_TX_COMMIT_TIMESTAMP, 0, 0 );
metaDataStore.setStoreVersion( 7 );
metaDataStore.setGraphNextProp( 8 );
metaDataStore.setLatestConstraintIntroducingTx( 9 );
@@ -600,7 +600,8 @@ public void shouldAddUpgradeFieldsToTheNeoStoreIfNotPresent() throws IOException
assertEquals( 7, metaDataStore.getStoreVersion() );
assertEquals( 8, metaDataStore.getGraphNextProp() );
assertEquals( 9, metaDataStore.getLatestConstraintIntroducingTx() );
- assertEquals( new TransactionId( 10, 11 ), metaDataStore.getUpgradeTransaction() );
+ assertEquals( new TransactionId( 10, 11, BASE_TX_COMMIT_TIMESTAMP ),
+ metaDataStore.getUpgradeTransaction() );
assertEquals( 12, metaDataStore.getUpgradeTime() );
}
}
@@ -616,13 +617,15 @@ public void shouldSetHighestTransactionIdWhenNeeded() throws Throwable
try ( NeoStores neoStore = factory.openAllNeoStores( true ) )
{
MetaDataStore store = neoStore.getMetaDataStore();
- store.setLastCommittedAndClosedTransactionId( 40, 4444, 0, LogHeader.LOG_HEADER_SIZE );
+ store.setLastCommittedAndClosedTransactionId( 40, 4444, BASE_TX_COMMIT_TIMESTAMP,
+ LogHeader.LOG_HEADER_SIZE, 0 );
// WHEN
- store.transactionCommitted( 42, 6666 );
+ store.transactionCommitted( 42, 6666, BASE_TX_COMMIT_TIMESTAMP );
// THEN
- assertEquals( new TransactionId( 42, 6666 ), store.getLastCommittedTransaction() );
+ assertEquals( new TransactionId( 42, 6666, BASE_TX_COMMIT_TIMESTAMP ),
+ store.getLastCommittedTransaction() );
}
}
@@ -637,13 +640,15 @@ public void shouldNotSetHighestTransactionIdWhenNeeded() throws Throwable
try ( NeoStores neoStore = factory.openAllNeoStores( true ) )
{
MetaDataStore store = neoStore.getMetaDataStore();
- store.setLastCommittedAndClosedTransactionId( 40, 4444, 0, LogHeader.LOG_HEADER_SIZE );
+ store.setLastCommittedAndClosedTransactionId( 40, 4444, BASE_TX_COMMIT_TIMESTAMP,
+ LogHeader.LOG_HEADER_SIZE, 0 );
// WHEN
- store.transactionCommitted( 39, 3333 );
+ store.transactionCommitted( 39, 3333, BASE_TX_COMMIT_TIMESTAMP );
// THEN
- assertEquals( new TransactionId( 40, 4444 ), store.getLastCommittedTransaction() );
+ assertEquals( new TransactionId( 40, 4444, BASE_TX_COMMIT_TIMESTAMP ),
+ store.getLastCommittedTransaction() );
}
}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/id/BufferingIdGeneratorFactoryTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/id/BufferingIdGeneratorFactoryTest.java
index 488e3161add93..f7eb894c6279e 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/id/BufferingIdGeneratorFactoryTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/id/BufferingIdGeneratorFactoryTest.java
@@ -25,12 +25,14 @@
import java.io.File;
import org.neo4j.function.Supplier;
+import org.neo4j.helpers.FakeClock;
import org.neo4j.kernel.IdGeneratorFactory;
-import org.neo4j.kernel.IdReuseEligibility;
-import org.neo4j.kernel.IdType;
+import org.neo4j.kernel .IdType;
import org.neo4j.kernel.impl.api.KernelTransactionsSnapshot;
import org.neo4j.test.EphemeralFileSystemRule;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -68,6 +70,48 @@ public void shouldDelayFreeingOfAggressivelyReusedIds() throws Exception
verify( actual.get( IdType.STRING_BLOCK ) ).freeId( 7 );
}
+ @Test
+ public void shouldDelayFreeingOfAggressivelyReusedIdsConsideringTimeAsWell() throws Exception
+ {
+ // GIVEN
+ MockedIdGeneratorFactory actual = new MockedIdGeneratorFactory();
+ final FakeClock clock = new FakeClock();
+ final long safeZone = MINUTES.toMillis( 1 );
+ BufferingIdGeneratorFactory bufferingIdGeneratorFactory = new BufferingIdGeneratorFactory( actual );
+ ControllableSnapshotSupplier boundaries = new ControllableSnapshotSupplier();
+ IdGenerator idGenerator = bufferingIdGeneratorFactory.open(
+ new File( "doesnt-matter" ), 10, IdType.STRING_BLOCK, 0 );
+ bufferingIdGeneratorFactory.initialize( boundaries, new IdReuseEligibility()
+ {
+ @Override
+ public boolean isEligible( KernelTransactionsSnapshot t )
+ {
+ return clock.currentTimeMillis() - t.snapshotTime() >= safeZone;
+ }
+ } );
+
+ // WHEN
+ idGenerator.freeId( 7 );
+ verifyNoMoreInteractions( actual.get( IdType.STRING_BLOCK ) );
+
+ // after some maintenance and transaction still not closed
+ bufferingIdGeneratorFactory.maintenance();
+ verifyNoMoreInteractions( actual.get( IdType.STRING_BLOCK ) );
+
+ // although after transactions have all closed
+ boundaries.setMostRecentlyReturnedSnapshotToAllClosed();
+ bufferingIdGeneratorFactory.maintenance();
+ // ... the clock would still say "nope" so no interaction
+ verifyNoMoreInteractions( actual.get( IdType.STRING_BLOCK ) );
+
+ // then finally after time has passed as well
+ clock.forward( 70, SECONDS );
+ bufferingIdGeneratorFactory.maintenance();
+
+ // THEN
+ verify( actual.get( IdType.STRING_BLOCK ) ).freeId( 7 );
+ }
+
private static class ControllableSnapshotSupplier implements Supplier
{
KernelTransactionsSnapshot mostRecentlyReturned;
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/StoreMigratorIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/StoreMigratorIT.java
new file mode 100644
index 0000000000000..5c2f879dd95c0
--- /dev/null
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/storemigration/StoreMigratorIT.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.kernel.impl.storemigration;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.neo4j.function.Function;
+import org.neo4j.io.fs.DefaultFileSystemAbstraction;
+import org.neo4j.io.fs.FileSystemAbstraction;
+import org.neo4j.io.pagecache.PageCache;
+import org.neo4j.kernel.api.index.SchemaIndexProvider;
+import org.neo4j.kernel.configuration.Config;
+import org.neo4j.kernel.impl.api.index.inmemory.InMemoryIndexProvider;
+import org.neo4j.kernel.impl.logging.LogService;
+import org.neo4j.kernel.impl.logging.NullLogService;
+import org.neo4j.kernel.impl.store.StoreFactory;
+import org.neo4j.kernel.impl.store.TransactionId;
+import org.neo4j.kernel.impl.storemigration.legacystore.LegacyStoreVersionCheck;
+import org.neo4j.kernel.impl.storemigration.legacystore.v19.Legacy19Store;
+import org.neo4j.kernel.impl.storemigration.legacystore.v20.Legacy20Store;
+import org.neo4j.kernel.impl.storemigration.legacystore.v21.Legacy21Store;
+import org.neo4j.kernel.impl.storemigration.legacystore.v22.Legacy22Store;
+import org.neo4j.kernel.impl.storemigration.monitoring.SilentMigrationProgressMonitor;
+import org.neo4j.kernel.impl.transaction.log.LogPosition;
+import org.neo4j.test.PageCacheRule;
+import org.neo4j.test.TargetDirectory;
+import org.neo4j.test.TargetDirectory.TestDirectory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.neo4j.kernel.impl.storemigration.StoreMigrator.readLastTxInformation;
+import static org.neo4j.kernel.impl.storemigration.StoreMigrator.readLastTxLogPosition;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_LOG_BYTE_OFFSET;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_LOG_VERSION;
+import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.UNKNOWN_TX_COMMIT_TIMESTAMP;
+
+@RunWith( Parameterized.class )
+public class StoreMigratorIT
+{
+ @Rule
+ public final TestDirectory directory = TargetDirectory.testDirForTest( getClass() );
+ @Rule
+ public final PageCacheRule pageCacheRule = new PageCacheRule();
+ public final FileSystemAbstraction fs = new DefaultFileSystemAbstraction();
+ private final SchemaIndexProvider schemaIndexProvider = new InMemoryIndexProvider();
+
+ @Parameterized.Parameter( 0 )
+ public String version;
+
+ @Parameterized.Parameter( 1 )
+ public LogPosition expectedLogPosition;
+
+ @Parameterized.Parameter( 2 )
+ public Function txIdComparator;
+
+ @Parameterized.Parameters( name = "{0}" )
+ public static Collection