Skip to content

Commit

Permalink
Fixes issue with lonely check point in log file during recovery
Browse files Browse the repository at this point in the history
LatestCheckPointFinderTest didn't test what it claimed to test and so this issue
slipped through the cracks. This whole test class have been simplified and its
use of mocking heavily reduced. Issue was this:

Given an older log file with some transaction in and a newer log file with only
a check point in, targeting an entry in the older log file ->
LatestCheckPointFinder would report that there were no transactions after
the last check point, while in fact there were. This would result in all those
transactions to not be recovered on that startup.

This commit also adds more detailed logging on finding last checkpoint at startup,
this to better be able to reason about recovery issues by reading debug.log.
  • Loading branch information
tinwelint authored and burqen committed Mar 6, 2017
1 parent 3e8c046 commit 61fd0d6
Show file tree
Hide file tree
Showing 11 changed files with 442 additions and 194 deletions.
Expand Up @@ -152,6 +152,7 @@
import org.neo4j.kernel.monitoring.tracing.Tracers;
import org.neo4j.kernel.recovery.DefaultRecoverySPI;
import org.neo4j.kernel.recovery.LatestCheckPointFinder;
import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.kernel.spi.legacyindex.IndexImplementation;
import org.neo4j.kernel.spi.legacyindex.IndexProviders;
Expand Down Expand Up @@ -468,6 +469,7 @@ public void start() throws IOException
transactionIdStore,
logVersionRepository,
monitors.newMonitor( Recovery.Monitor.class ),
monitors.newMonitor( PositionToRecoverFrom.Monitor.class ),
transactionLogModule.logFiles(), startupStatistics,
storageEngine, logEntryReader, transactionLogModule.logicalTransactionStore() );

Expand Down Expand Up @@ -738,6 +740,7 @@ private void buildRecovery(
TransactionIdStore transactionIdStore,
LogVersionRepository logVersionRepository,
Recovery.Monitor recoveryMonitor,
PositionToRecoverFrom.Monitor positionMonitor,
final PhysicalLogFiles logFiles,
final StartupStatisticsProvider startupStatistics,
StorageEngine storageEngine,
Expand All @@ -748,7 +751,7 @@ private void buildRecovery(
new LatestCheckPointFinder( logFiles, fileSystemAbstraction, logEntryReader );
Recovery.SPI spi = new DefaultRecoverySPI(
storageEngine, logFiles, fileSystemAbstraction, logVersionRepository,
checkPointFinder, transactionIdStore, logicalTransactionStore );
checkPointFinder, transactionIdStore, logicalTransactionStore, positionMonitor );
Recovery recovery = new Recovery( spi, recoveryMonitor );
monitors.addMonitorListener( new Recovery.Monitor()
{
Expand Down
Expand Up @@ -34,6 +34,8 @@
import org.neo4j.kernel.recovery.LatestCheckPointFinder;
import org.neo4j.kernel.recovery.PositionToRecoverFrom;

import static org.neo4j.kernel.recovery.PositionToRecoverFrom.NO_MONITOR;

/**
* An external tool that can determine if a given store will need recovery.
*/
Expand Down Expand Up @@ -66,6 +68,6 @@ public boolean isRecoveryRequiredAt( File dataDir ) throws IOException
LogEntryReader<ReadableClosablePositionAwareChannel> reader = new VersionAwareLogEntryReader<>();

LatestCheckPointFinder finder = new LatestCheckPointFinder( logFiles, fs, reader );
return new PositionToRecoverFrom( finder ).apply( logVersion ) != LogPosition.UNSPECIFIED;
return new PositionToRecoverFrom( finder, NO_MONITOR ).apply( logVersion ) != LogPosition.UNSPECIFIED;
}
}
Expand Up @@ -23,11 +23,16 @@

import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.logging.Log;

import static java.lang.String.format;

public class LoggingLogFileMonitor implements PhysicalLogFile.Monitor, LogRotation.Monitor, Recovery.Monitor
public class LoggingLogFileMonitor implements
PhysicalLogFile.Monitor,
LogRotation.Monitor,
Recovery.Monitor,
PositionToRecoverFrom.Monitor
{
private long firstTransactionRecovered = -1, lastTransactionRecovered;
private final Log log;
Expand Down Expand Up @@ -85,4 +90,25 @@ public void opened( File logFile, long logVersion, long lastTransactionId, boole
log.info( format( "Opened logical log [%s] version=%d, lastTxId=%d (%s)",
logFile, logVersion, lastTransactionId, (clean ? "clean" : "recovered") ) );
}

@Override
public void noCommitsAfterLastCheckPoint( LogPosition logPosition )
{
log.info( format( "No commits found after last check point (which is at %s)",
logPosition != null ? logPosition.toString() : "<no log position given>" ) );
}

@Override
public void commitsAfterLastCheckPoint( LogPosition logPosition, long firstTxIdAfterLastCheckPoint )
{
log.info( format(
"Commits found after last check point (which is at %s). First txId after last checkpoint: %d ",
logPosition, firstTxIdAfterLastCheckPoint ) );
}

@Override
public void noCheckPointFound()
{
log.info( "No check point found in transaction log" );
}
}
Expand Up @@ -54,6 +54,8 @@ public void opened( File logFile, long logVersion, long lastTransactionId, boole
}
}

public static final Monitor NO_MONITOR = new Monitor.Adapter();

public static final String DEFAULT_NAME = "neostore.transaction.db";
public static final String REGEX_DEFAULT_NAME = "neostore\\.transaction\\.db";
public static final String DEFAULT_VERSION_SUFFIX = ".";
Expand Down
Expand Up @@ -56,15 +56,16 @@ public DefaultRecoverySPI(
StorageEngine storageEngine,
PhysicalLogFiles logFiles, FileSystemAbstraction fs,
LogVersionRepository logVersionRepository, LatestCheckPointFinder checkPointFinder,
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore )
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore,
PositionToRecoverFrom.Monitor monitor )
{
this.storageEngine = storageEngine;
this.logFiles = logFiles;
this.fs = fs;
this.logVersionRepository = logVersionRepository;
this.transactionIdStore = transactionIdStore;
this.logicalTransactionStore = logicalTransactionStore;
this.positionToRecoverFrom = new PositionToRecoverFrom( checkPointFinder );
this.positionToRecoverFrom = new PositionToRecoverFrom( checkPointFinder, monitor );
}

@Override
Expand Down
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
Expand All @@ -31,6 +32,7 @@
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.CheckPoint;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;

Expand Down Expand Up @@ -91,9 +93,28 @@ public LatestCheckPoint find( long fromVersionBackwards ) throws IOException

if ( latestCheckPoint != null )
{
// Is the latest start entry in this log file version later than what the latest check point targets?
LogPosition target = latestCheckPoint.getLogPosition();
boolean commitsAfterCheckPoint = latestStartEntry != null &&
latestStartEntry.getStartPosition().compareTo( latestCheckPoint.getLogPosition() ) >= 0;
return new LatestCheckPoint( latestCheckPoint, commitsAfterCheckPoint , oldestVersionFound );
latestStartEntry.getStartPosition().compareTo( target ) >= 0;
if ( !commitsAfterCheckPoint )
{
if ( target.getLogVersion() < version )
{
// This check point entry targets a previous log file.
// Go there and see if there's a transaction. Reader is capped to that log version.
commitsAfterCheckPoint = extractFirstTxIdAfterPosition( target, target.getLogVersion() ) !=
LatestCheckPoint.NO_TRANSACTION_ID;
}
}

// Extract first transaction id after check point target position.
// Reader may continue into log files after the initial version.
long firstTxIdAfterCheckPoint = commitsAfterCheckPoint
? extractFirstTxIdAfterPosition( target, fromVersionBackwards )
: LatestCheckPoint.NO_TRANSACTION_ID;
return new LatestCheckPoint( latestCheckPoint, commitsAfterCheckPoint,
firstTxIdAfterCheckPoint, oldestVersionFound );
}

version--;
Expand All @@ -105,19 +126,77 @@ public LatestCheckPoint find( long fromVersionBackwards ) throws IOException
}
}

return new LatestCheckPoint( null, latestStartEntry != null, oldestVersionFound );
boolean commitsAfterCheckPoint = latestStartEntry != null;
long firstTxAfterPosition = commitsAfterCheckPoint
? extractFirstTxIdAfterPosition( latestStartEntry.getStartPosition(),
latestStartEntry.getStartPosition().getLogVersion() )
: LatestCheckPoint.NO_TRANSACTION_ID;

return new LatestCheckPoint( null, commitsAfterCheckPoint, firstTxAfterPosition, oldestVersionFound );
}

/**
* Extracts txId from first commit entry, when starting reading at the given {@code position}.
* If no commit entry found in the version, the reader will continue into next version(s) up till
* {@code maxLogVersion} until finding one.
*
* @param initialPosition {@link LogPosition} to start scan from.
* @param maxLogVersion max log version to scan.
* @return txId of closes commit entry to {@code initialPosition}, or {@link LatestCheckPoint#NO_TRANSACTION_ID}
* if not found.
* @throws IOException on I/O error.
*/
private long extractFirstTxIdAfterPosition( LogPosition initialPosition, long maxLogVersion ) throws IOException
{
LogPosition currentPosition = initialPosition;
while ( currentPosition.getLogVersion() <= maxLogVersion )
{
LogVersionedStoreChannel storeChannel = PhysicalLogFile.tryOpenForVersion( logFiles, fileSystem,
currentPosition.getLogVersion(), false );
if ( storeChannel != null )
{
try
{
storeChannel.position( currentPosition.getByteOffset() );
try ( ReadAheadLogChannel logChannel = new ReadAheadLogChannel( storeChannel, NO_MORE_CHANNELS );
LogEntryCursor cursor = new LogEntryCursor( logEntryReader, logChannel ) )
{
while ( cursor.next() )
{
LogEntry entry = cursor.get();
if ( entry instanceof LogEntryCommit )
{
return ((LogEntryCommit) entry).getTxId();
}
}
}
}
finally
{
storeChannel.close();
}
}

currentPosition = LogPosition.start( currentPosition.getLogVersion() + 1 );
}
return LatestCheckPoint.NO_TRANSACTION_ID;
}

public static class LatestCheckPoint
{
public static long NO_TRANSACTION_ID = -1;

public final CheckPoint checkPoint;
public final boolean commitsAfterCheckPoint;
public final long firstTxIdAfterLastCheckPoint;
public final long oldestLogVersionFound;

public LatestCheckPoint( CheckPoint checkPoint, boolean commitsAfterCheckPoint, long oldestLogVersionFound )
public LatestCheckPoint( CheckPoint checkPoint, boolean commitsAfterCheckPoint,
long firstTxIdAfterLastCheckPoint, long oldestLogVersionFound )
{
this.checkPoint = checkPoint;
this.commitsAfterCheckPoint = commitsAfterCheckPoint;
this.firstTxIdAfterLastCheckPoint = firstTxIdAfterLastCheckPoint;
this.oldestLogVersionFound = oldestLogVersionFound;
}

Expand All @@ -136,6 +215,7 @@ public boolean equals( Object o )
LatestCheckPoint that = (LatestCheckPoint) o;

return commitsAfterCheckPoint == that.commitsAfterCheckPoint &&
firstTxIdAfterLastCheckPoint == that.firstTxIdAfterLastCheckPoint &&
oldestLogVersionFound == that.oldestLogVersionFound &&
(checkPoint == null ? that.checkPoint == null : checkPoint.equals( that.checkPoint ));
}
Expand All @@ -145,7 +225,11 @@ public int hashCode()
{
int result = checkPoint != null ? checkPoint.hashCode() : 0;
result = 31 * result + (commitsAfterCheckPoint ? 1 : 0);
result = 31 * result + (int) (oldestLogVersionFound ^ (oldestLogVersionFound >>> 32));
if ( commitsAfterCheckPoint )
{
result = 31 * result + Long.hashCode( firstTxIdAfterLastCheckPoint );
}
result = 31 * result + Long.hashCode( oldestLogVersionFound );
return result;
}

Expand All @@ -155,6 +239,7 @@ public String toString()
return "LatestCheckPoint{" +
"checkPoint=" + checkPoint +
", commitsAfterCheckPoint=" + commitsAfterCheckPoint +
(commitsAfterCheckPoint ? ", firstTxIdAfterLastCheckPoint=" + firstTxIdAfterLastCheckPoint : "") +
", oldestLogVersionFound=" + oldestLogVersionFound +
'}';
}
Expand Down
Expand Up @@ -32,11 +32,47 @@
*/
public class PositionToRecoverFrom implements ThrowingLongFunction<LogPosition,IOException>
{
public interface Monitor
{
/**
* There's a check point log entry as the last entry in the transaction log.
*
* @param logPosition {@link LogPosition} of the last check point.
*/
default void noCommitsAfterLastCheckPoint( LogPosition logPosition )
{ // no-op by default
}

/**
* There's a check point log entry, but there are other log entries after it.
*
* @param logPosition {@link LogPosition} pointing to the first log entry after the last
* check pointed transaction.
* @param firstTxIdAfterLastCheckPoint transaction id of the first transaction after the last check point.
*/
default void commitsAfterLastCheckPoint( LogPosition logPosition, long firstTxIdAfterLastCheckPoint )
{ // no-op by default
}

/**
* No check point log entry found in the transaction log.
*/
default void noCheckPointFound()
{ // no-op by default
}
}

public static final Monitor NO_MONITOR = new Monitor()
{
};

private final LatestCheckPointFinder checkPointFinder;
private final Monitor monitor;

public PositionToRecoverFrom( LatestCheckPointFinder checkPointFinder )
public PositionToRecoverFrom( LatestCheckPointFinder checkPointFinder, Monitor monitor )
{
this.checkPointFinder = checkPointFinder;
this.monitor = monitor;
}

/**
Expand All @@ -53,11 +89,15 @@ public LogPosition apply( long currentLogVersion ) throws IOException
LatestCheckPointFinder.LatestCheckPoint latestCheckPoint = checkPointFinder.find( currentLogVersion );
if ( !latestCheckPoint.commitsAfterCheckPoint )
{
monitor.noCommitsAfterLastCheckPoint(
latestCheckPoint.checkPoint != null ? latestCheckPoint.checkPoint.getLogPosition() : null );
return LogPosition.UNSPECIFIED;
}

if ( latestCheckPoint.checkPoint != null )
{
monitor.commitsAfterLastCheckPoint( latestCheckPoint.checkPoint.getLogPosition(),
latestCheckPoint.firstTxIdAfterLastCheckPoint );
return latestCheckPoint.checkPoint.getLogPosition();
}
else
Expand All @@ -68,6 +108,7 @@ public LogPosition apply( long currentLogVersion ) throws IOException
throw new UnderlyingStorageException( "No check point found in any log file from version " +
fromLogVersion + " to " + currentLogVersion );
}
monitor.noCheckPointFound();
return LogPosition.start( 0 );
}
}
Expand Down
Expand Up @@ -75,6 +75,7 @@
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderWriter.writeLogHeader;
import static org.neo4j.kernel.impl.transaction.log.entry.LogVersions.CURRENT_LOG_VERSION;
import static org.neo4j.kernel.recovery.PositionToRecoverFrom.NO_MONITOR;

public class RecoveryTest
{
Expand Down Expand Up @@ -149,7 +150,7 @@ public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) th
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );

life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore )
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore, NO_MONITOR )
{
private int nr = 0;

Expand Down Expand Up @@ -239,7 +240,7 @@ public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) th
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );

life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore )
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore, NO_MONITOR )
{
@Override
public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
Expand Down Expand Up @@ -390,7 +391,7 @@ private boolean recover( PhysicalLogFiles logFiles )
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );

life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore )
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore, NO_MONITOR )
{
@Override
public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
Expand Down

0 comments on commit 61fd0d6

Please sign in to comment.