Skip to content

Commit

Permalink
Check pointing uses closed transations to mark recovery points
Browse files Browse the repository at this point in the history
Using committed transactions didn't guarantee that the transactions
were applied to the store. Hence when flushing the page cache, it
might not contain the correct data due to missing transaction
applications. The solution is to use closed transactions since they
are guaranteed to be both appended to the transaction log and applied
to the store.
  • Loading branch information
davidegrohmann committed Jul 1, 2015
1 parent 4a56aaa commit 6877ff8
Show file tree
Hide file tree
Showing 20 changed files with 254 additions and 128 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.Commitment;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
Expand Down Expand Up @@ -60,9 +61,9 @@ public long commit( TransactionRepresentation transaction, LockGroup locks, Comm
{
try ( ValidatedIndexUpdates indexUpdates = validateIndexUpdates( transaction, mode ) )
{
long transactionId = appendToLog( transaction, commitEvent );
applyToStore( transaction, locks, commitEvent, indexUpdates, transactionId, mode );
return transactionId;
Commitment commitment = appendToLog( transaction, commitEvent );
applyToStore( transaction, locks, commitEvent, indexUpdates, commitment, mode );
return commitment.transactionId();
}
}

Expand All @@ -80,31 +81,31 @@ private ValidatedIndexUpdates validateIndexUpdates( TransactionRepresentation tr
}
}

private long appendToLog(
private Commitment appendToLog(
TransactionRepresentation transaction, CommitEvent commitEvent ) throws TransactionFailureException
{
long transactionId;
Commitment commitment;
try ( LogAppendEvent logAppendEvent = commitEvent.beginLogAppend() )
{
transactionId = appender.append( transaction, logAppendEvent );
commitment = appender.append( transaction, logAppendEvent );
}
catch ( Throwable cause )
{
throw new TransactionFailureException( CouldNotWriteToLog, cause,
"Could not append transaction representation to log" );
}
commitEvent.setTransactionId( transactionId );
return transactionId;
commitEvent.setTransactionId( commitment.transactionId() );
return commitment;
}

private void applyToStore(
TransactionRepresentation transaction, LockGroup locks, CommitEvent commitEvent,
ValidatedIndexUpdates indexUpdates, long transactionId, TransactionApplicationMode mode )
ValidatedIndexUpdates indexUpdates, Commitment commitment, TransactionApplicationMode mode )
throws TransactionFailureException
{
try ( StoreApplyEvent storeApplyEvent = commitEvent.beginStoreApply() )
{
storeApplier.apply( transaction, indexUpdates, locks, transactionId, mode );
storeApplier.apply( transaction, indexUpdates, locks, commitment.transactionId(), mode );
}
// TODO catch different types of exceptions here, some which are OK
catch ( Throwable cause )
Expand All @@ -114,8 +115,7 @@ private void applyToStore(
}
finally
{
transactionIdStore.transactionClosed( transactionId );
commitment.publishAsApplied();
}
}

}
Expand Up @@ -66,8 +66,6 @@
*/
public class NeoStore extends AbstractStore implements TransactionIdStore, LogVersionRepository
{
private static final long[] CLOSED_TX_META = new long[]{1};

public abstract static class Configuration
extends AbstractStore.Configuration
{
Expand Down Expand Up @@ -99,8 +97,8 @@ public enum Position
UPGRADE_TIME( 8, "Time of last upgrade" ),
LAST_TRANSACTION_CHECKSUM( 9, "Checksum of last committed transaction" ),
UPGRADE_TRANSACTION_CHECKSUM( 10, "Checksum of transaction id the most recent upgrade was performed at" ),
LAST_TRANSACTION_LOG_VERSION( 11, "Log version where the last transaction commit entry has been written into" ),
LAST_TRANSACTION_LOG_BYTE_OFFSET( 12, "Byte offset in the log file where the last transaction commit entry " +
LAST_CLOSED_TRANSACTION_LOG_VERSION( 11, "Log version where the last transaction commit entry has been written into" ),
LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET( 12, "Byte offset in the log file where the last transaction commit entry " +
"has been written into" );

private final int id;
Expand Down Expand Up @@ -148,14 +146,14 @@ public static boolean isStorePresent( FileSystemAbstraction fs, File storeDir )
private volatile long upgradeTxIdField = FIELD_NOT_INITIALIZED;
private volatile long upgradeTimeField = FIELD_NOT_INITIALIZED;
private volatile long lastTransactionChecksum = FIELD_NOT_INITIALIZED;
private volatile long lastTransactionLogVersion = FIELD_NOT_INITIALIZED;
private volatile long lastTransactionLogByteOffset = FIELD_NOT_INITIALIZED;
private volatile long lastClosedTransactionLogVersion = FIELD_NOT_INITIALIZED;
private volatile long lastClosedTransactionLogByteOffset = FIELD_NOT_INITIALIZED;
private volatile long upgradeTxChecksumField = FIELD_NOT_INITIALIZED;

// This is not a field in the store, but something keeping track of which of the committed
// transactions have been closed. Useful in rotation and shutdown.
private final OutOfOrderSequence lastCommittedTx = new ArrayQueueOutOfOrderSequence( -1, 200, new long[3] );
private final OutOfOrderSequence lastClosedTx = new ArrayQueueOutOfOrderSequence( -1, 200, CLOSED_TX_META );
private final OutOfOrderSequence lastCommittedTx = new ArrayQueueOutOfOrderSequence( -1, 200, new long[1] );
private final OutOfOrderSequence lastClosedTx = new ArrayQueueOutOfOrderSequence( -1, 200, new long[2] );

private final int relGrabSize;
private final CappedOperation<Void> transactionCloseWaitLogger;
Expand Down Expand Up @@ -592,11 +590,11 @@ private void readAllFields( PageCursor cursor ) throws IOException
graphNextPropField = getRecordValue( cursor, Position.FIRST_GRAPH_PROPERTY );
latestConstraintIntroducingTxField = getRecordValue( cursor, Position.LAST_CONSTRAINT_TRANSACTION );
lastTransactionChecksum = getRecordValue( cursor, Position.LAST_TRANSACTION_CHECKSUM );
lastTransactionLogVersion = getRecordValue( cursor, Position.LAST_TRANSACTION_LOG_VERSION );
lastTransactionLogByteOffset = getRecordValue( cursor, Position.LAST_TRANSACTION_LOG_BYTE_OFFSET );
lastClosedTx.set( lastCommittedTxId, CLOSED_TX_META );
lastCommittedTx.set( lastCommittedTxId, new long[]{lastTransactionChecksum, lastTransactionLogVersion,
lastTransactionLogByteOffset} );
lastClosedTransactionLogVersion = getRecordValue( cursor, Position.LAST_CLOSED_TRANSACTION_LOG_VERSION );
lastClosedTransactionLogByteOffset = getRecordValue( cursor, Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET );
lastClosedTx.set( lastCommittedTxId,
new long[]{lastClosedTransactionLogVersion, lastClosedTransactionLogByteOffset} );
lastCommittedTx.set( lastCommittedTxId, new long[]{lastTransactionChecksum} );
upgradeTxChecksumField = getRecordValue( cursor, Position.UPGRADE_TRANSACTION_CHECKSUM );
}
while ( cursor.shouldRetry() );
Expand Down Expand Up @@ -907,18 +905,14 @@ public long nextCommittingTransactionId()
}

@Override
public void transactionCommitted( long transactionId, long checksum, long logVersion, long byteOffset )
public void transactionCommitted( long transactionId, long checksum )
{
if ( lastCommittedTx.offer( transactionId, new long[]{checksum, logVersion, byteOffset} ) )
if ( lastCommittedTx.offer( transactionId, new long[]{checksum} ) )
{
long[] transactionData = lastCommittedTx.get();
setRecord( Position.LAST_TRANSACTION_ID, transactionData[0] );
setRecord( Position.LAST_TRANSACTION_CHECKSUM, transactionData[1] );
setRecord( Position.LAST_TRANSACTION_LOG_VERSION, transactionData[2] );
setRecord( Position.LAST_TRANSACTION_LOG_BYTE_OFFSET, transactionData[3] );
lastTransactionChecksum = checksum;
lastTransactionLogVersion = logVersion;
lastTransactionLogByteOffset = byteOffset;
}
}

Expand Down Expand Up @@ -950,6 +944,13 @@ public long getLastClosedTransactionId()
return lastClosedTx.getHighestGapFreeNumber();
}

@Override
public long[] getLastClosedTransaction()
{
checkInitialized( lastCommittingTxField.get() );
return lastClosedTx.get();
}

// Ensures that all fields are read from the store, by checking the initial value of the field in question
private void checkInitialized( long field )
{
Expand All @@ -964,21 +965,28 @@ public void setLastCommittedAndClosedTransactionId( long transactionId, long che
{
setRecord( Position.LAST_TRANSACTION_ID, transactionId );
setRecord( Position.LAST_TRANSACTION_CHECKSUM, checksum );
setRecord( Position.LAST_TRANSACTION_LOG_VERSION, logVersion );
setRecord( Position.LAST_TRANSACTION_LOG_BYTE_OFFSET, byteOffset );
setRecord( Position.LAST_CLOSED_TRANSACTION_LOG_VERSION, logVersion );
setRecord( Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, byteOffset );
checkInitialized( lastCommittingTxField.get() );
lastCommittingTxField.set( transactionId );
lastClosedTx.set( transactionId, CLOSED_TX_META );
lastCommittedTx.set( transactionId, new long[]{checksum, logVersion, byteOffset} );
lastCommittedTx.set( transactionId, new long[]{checksum} );
lastTransactionChecksum = checksum;
lastTransactionLogVersion = logVersion;
lastTransactionLogByteOffset = byteOffset;
lastClosedTx.set( transactionId, new long[]{logVersion, byteOffset} );
lastClosedTransactionLogVersion = logVersion;
lastClosedTransactionLogByteOffset = byteOffset;
}

@Override
public void transactionClosed( long transactionId )
public void transactionClosed( long transactionId, long logVersion, long byteOffset )
{
lastClosedTx.offer( transactionId, CLOSED_TX_META );
if ( lastClosedTx.offer( transactionId, new long[]{logVersion, byteOffset} ) )
{
long[] transactionData = lastClosedTx.get();
setRecord( Position.LAST_CLOSED_TRANSACTION_LOG_VERSION, transactionData[0] );
setRecord( Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, transactionData[1] );
lastClosedTransactionLogVersion = logVersion;
lastClosedTransactionLogByteOffset = byteOffset;
}
}

@Override
Expand Down
Expand Up @@ -160,12 +160,12 @@ public long getLastCommittedTxChecksum()

public long getLastCommittedTxLogVersion()
{
return getValue( Position.LAST_TRANSACTION_LOG_VERSION );
return getValue( Position.LAST_CLOSED_TRANSACTION_LOG_VERSION );
}

public long getLastCommittedTxLogByteOffset()
{
return getValue( Position.LAST_TRANSACTION_LOG_BYTE_OFFSET );
return getValue( Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET );
}

@Override
Expand Down
Expand Up @@ -857,10 +857,10 @@ private void updateOrAddNeoStoreFieldsAsPartOfMigration( File migrationDir, File
NeoStore.setRecord( fileSystem, storeDirNeoStore, Position.LAST_TRANSACTION_CHECKSUM, lastTxChecksum );
NeoStore.setRecord( fileSystem, storeDirNeoStore, Position.UPGRADE_TRANSACTION_CHECKSUM, lastTxChecksum );

// add LAST_TRANSACTION_LOG_VERSION and LAST_TRANSACTION_LOG_BYTE_OFFSET to the migated NeoStore
// add LAST_CLOSED_TRANSACTION_LOG_VERSION and LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET to the migated NeoStore
LogPosition logPosition = readLastTxLogPosition( migrationDir );
NeoStore.setRecord( fileSystem, storeDirNeoStore, Position.LAST_TRANSACTION_LOG_VERSION, logPosition.getLogVersion() );
NeoStore.setRecord( fileSystem, storeDirNeoStore, Position.LAST_TRANSACTION_LOG_BYTE_OFFSET, logPosition.getByteOffset() );
NeoStore.setRecord( fileSystem, storeDirNeoStore, Position.LAST_CLOSED_TRANSACTION_LOG_VERSION, logPosition.getLogVersion() );
NeoStore.setRecord( fileSystem, storeDirNeoStore, Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, logPosition.getByteOffset() );
}

@Override
Expand Down
Expand Up @@ -112,7 +112,7 @@ public void start() throws Throwable
}

@Override
public long append( TransactionRepresentation transaction, LogAppendEvent logAppendEvent ) throws IOException
public Commitment append( TransactionRepresentation transaction, LogAppendEvent logAppendEvent ) throws IOException
{
long transactionId = -1;
int phase = 0;
Expand All @@ -122,7 +122,7 @@ public long append( TransactionRepresentation transaction, LogAppendEvent logApp
boolean logRotated = logRotation.rotateLogIfNeeded( logAppendEvent );
logAppendEvent.setLogRotated( logRotated );

TransactionCommitment commit;
TransactionCommitment commitment;
try
{
// Synchronized with logFile to get absolute control over concurrent rotations happening
Expand All @@ -132,23 +132,23 @@ public long append( TransactionRepresentation transaction, LogAppendEvent logApp
{
transactionId = transactionIdStore.nextCommittingTransactionId();
phase = 1;
commit = appendToLog( transaction, transactionId );
commitment = appendToLog( transaction, transactionId );
}
}

forceAfterAppend( logAppendEvent );
commit.publishAsCommitted();
orderLegacyIndexChanges( commit );
commitment.publishAsCommitted();
orderLegacyIndexChanges( commitment );
phase = 2;
return transactionId;
return commitment;
}
finally
{
if ( phase == 1 )
{
// So we end up here if we enter phase 1, but fails to reach phase 2, which means that
// we told TransactionIdStore that we committed transaction, but something failed right after
transactionIdStore.transactionClosed( transactionId );
transactionIdStore.transactionClosed( transactionId, 0l, 0l );
}
}
}
Expand Down Expand Up @@ -200,24 +200,36 @@ private static class TransactionCommitment implements Commitment
private final boolean hasLegacyIndexChanges;
private final long transactionId;
private final long transactionChecksum;
private final LogPosition logPositionAfterTransaction;
private final LogPosition logPosition;
private final TransactionIdStore transactionIdStore;

TransactionCommitment( boolean hasLegacyIndexChanges, long transactionId, long transactionChecksum,
LogPosition logPositionAfterTransaction, TransactionIdStore transactionIdStore )
LogPosition logPosition, TransactionIdStore transactionIdStore )
{
this.hasLegacyIndexChanges = hasLegacyIndexChanges;
this.transactionId = transactionId;
this.transactionChecksum = transactionChecksum;
this.logPositionAfterTransaction = logPositionAfterTransaction;
this.logPosition = logPosition;
this.transactionIdStore = transactionIdStore;
}

@Override
public void publishAsCommitted()
{
transactionIdStore.transactionCommitted( transactionId, transactionChecksum,
logPositionAfterTransaction.getLogVersion(), logPositionAfterTransaction.getByteOffset() );
transactionIdStore.transactionCommitted( transactionId, transactionChecksum );
}

@Override
public void publishAsApplied()
{
transactionIdStore.transactionClosed( transactionId,
logPosition.getLogVersion(), logPosition.getByteOffset() );
}

@Override
public long transactionId()
{
return transactionId;
}
}

Expand Down
Expand Up @@ -19,21 +19,33 @@
*/
package org.neo4j.kernel.impl.transaction.log;


/**
* A way to mark a transaction as committed after
* {@link TransactionAppender#append(org.neo4j.kernel.impl.transaction.TransactionRepresentation, long) appended}
* and manually {@link TransactionAppender#force() forced}.
* and manually {@link TransactionAppender#force() forced} and later closed after
* {@link org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier#apply(
* org.neo4j.kernel.impl.transaction.TransactionRepresentation, org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates,
* org.neo4j.kernel.impl.locking.LockGroup, long, org.neo4j.kernel.impl.api.TransactionApplicationMode)} .
*/
public interface Commitment
{
/**
* <p>
* Marks the transaction as committed and makes this fact public.
* </p>
*/
void publishAsCommitted();

/**
* <p>
* After this call the caller must see to that the transaction gets properly closed as well, i.e
* {@link TransactionIdStore#transactionClosed(long)}.
* Marks the transaction as closed and makes this fact public.
* </p>
*/
void publishAsCommitted();
void publishAsApplied();

/**
* @return the commitment transaction id
*/
long transactionId();
}

0 comments on commit 6877ff8

Please sign in to comment.