Skip to content

Commit

Permalink
Send committed but not applied transaction logs when backing up.
Browse files Browse the repository at this point in the history
Previously the last committed transaction(s) might not have been sent, and
since the store copy could be newer i.e. containing partial updates, those
transaction logs are required for the idempotent command (re-)application
to repair the store to a consistent state.

The symptom was inconsistencies as reported by the consistency checker.
  • Loading branch information
martinfurmanski committed Jan 26, 2015
1 parent 2371afd commit fdfebb8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
Expand Up @@ -21,6 +21,11 @@

import java.io.IOException;

import org.neo4j.com.RequestContext;
import org.neo4j.com.ResourceReleaser;
import org.neo4j.com.Response;
import org.neo4j.com.TransactionStream;
import org.neo4j.com.TransactionStreamResponse;
import org.neo4j.com.storecopy.ResponsePacker;
import org.neo4j.helpers.Provider;
import org.neo4j.helpers.collection.Visitor;
Expand All @@ -31,6 +36,8 @@
import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;

import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;

/**
* In full backup we're more tolerant about missing transactions. Just as long as we fulfill this criterion:
* We must be able to find and stream transactions that has happened since the start of the full backup.
Expand All @@ -53,12 +60,33 @@ public StoreCopyResponsePacker( LogicalTransactionStore transactionStore,
this.logFileInformation = logFileInformation;
}

@Override
public <T> Response<T> packTransactionStreamResponse( RequestContext context, T response )
{
final long toStartFrom = mandatoryStartTransactionId;
final long toEndAt = transactionIdStore.getLastCommittedTransactionId();
TransactionStream transactions = new TransactionStream()
{
@Override
public void accept( Visitor<CommittedTransactionRepresentation,IOException> visitor ) throws IOException
{
// Check so that it's even worth thinking about extracting any transactions at all
if ( toStartFrom > BASE_TX_ID && toStartFrom <= toEndAt )
{
extractTransactions( toStartFrom, filterVisitor( visitor, toEndAt ) );
}
}
};
return new TransactionStreamResponse<>( response, storeId.instance(), transactions, ResourceReleaser.NO_OP );
}

@Override
protected void extractTransactions( long startingAtTransactionId,
Visitor<CommittedTransactionRepresentation, IOException> accumulator ) throws IOException
{
try
{
startingAtTransactionId = Math.min( mandatoryStartTransactionId, startingAtTransactionId );
super.extractTransactions( startingAtTransactionId, accumulator );
}
catch ( NoSuchTransactionException e )
Expand Down
Expand Up @@ -70,7 +70,7 @@ public RequestContext flushStoresAndStreamStoreFiles( StoreWriter writer )
{
try
{
long transactionIdWhenStartingCopy = transactionIdStore.getLastCommittedTransactionId();
long lastAppliedTransaction = transactionIdStore.getLastClosedTransactionId();
logRotationControl.forceEverything();
ByteBuffer temporaryBuffer = ByteBuffer.allocateDirect( 1024 * 1024 );

Expand All @@ -88,7 +88,7 @@ public RequestContext flushStoresAndStreamStoreFiles( StoreWriter writer )
}
}

return anonymous( transactionIdWhenStartingCopy );
return anonymous( lastAppliedTransaction );
}
catch ( IOException e )
{
Expand Down

0 comments on commit fdfebb8

Please sign in to comment.