Skip to content

Commit

Permalink
Update transaction logs processing during recovery to avoid
Browse files Browse the repository at this point in the history
double checking logs in case if checkpoint was never found.
  • Loading branch information
MishaDemianenko committed Oct 3, 2017
1 parent df8bdc7 commit e51a186
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 131 deletions.
Expand Up @@ -135,7 +135,7 @@ private Result checkCleanShutDownByCheckPoint()
// check version
try
{
if ( !tailScanner.getTailInformation().commitsAfterLastCheckPoint )
if ( !tailScanner.getTailInformation().commitsAfterLastCheckpoint() )
{
return new Result( Result.Outcome.ok, null, null );
}
Expand Down
Expand Up @@ -48,6 +48,7 @@
*/
public class LogTailScanner
{
public static long NO_TRANSACTION_ID = -1;
private final PhysicalLogFiles logFiles;
private final FileSystemAbstraction fileSystem;
private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader;
Expand All @@ -65,14 +66,15 @@ public LogTailScanner( PhysicalLogFiles logFiles, FileSystemAbstraction fileSyst

private LogTailInformation findLogTail() throws IOException
{
final long fromVersionBackwards = logFiles.getHighestLogVersion();
long version = fromVersionBackwards;
long versionToSearchForCommits = fromVersionBackwards;
final long highestLogVersion = logFiles.getHighestLogVersion();
long version = highestLogVersion;
long versionToSearchForCommits = highestLogVersion;
LogEntryStart latestStartEntry = null;
LogEntryStart oldestStartEntry = null;
long oldestStartEntryTransaction = -1;
long oldestVersionFound = -1;
LogEntryVersion latestLogEntryVersion = null;
boolean commitsAfterCheckPoint = false;
boolean startRecordAfterCheckpoint = false;
boolean corruptedTransactionLogs = false;

while ( version >= INITIAL_LOG_VERSION )
{
Expand All @@ -84,11 +86,8 @@ private LogTailInformation findLogTail() throws IOException
}

oldestVersionFound = version;

CheckPoint latestCheckPoint = null;
ReadableLogChannel recoveredDataChannel = new ReadAheadLogChannel( channel );
boolean firstStartEntry = true;

try ( LogEntryCursor cursor = new LogEntryCursor( logEntryReader, recoveredDataChannel ) )
{
LogEntry entry;
Expand All @@ -101,23 +100,21 @@ private LogTailInformation findLogTail() throws IOException
{
latestCheckPoint = entry.as();
}

// Collect data about latest commits
if ( entry instanceof LogEntryStart )
else if ( entry instanceof LogEntryCommit )
{
if ( oldestStartEntryTransaction == NO_TRANSACTION_ID )
{
oldestStartEntryTransaction = ((LogEntryCommit) entry).getTxId();
}
}
else if ( entry instanceof LogEntryStart )
{
LogEntryStart startEntry = entry.as();
if ( version == versionToSearchForCommits )
{
latestStartEntry = startEntry;
}

// The scan goes backwards by log version, although forward per log version
// Oldest start entry will be the first in the last log version scanned.
if ( firstStartEntry )
{
oldestStartEntry = startEntry;
firstStartEntry = false;
}
startRecordAfterCheckpoint = true;
}

// Collect data about latest entry version, only in first log file
Expand All @@ -144,14 +141,14 @@ private LogTailInformation findLogTail() throws IOException
" ", FeatureToggles.toggle( LogTailScanner.class, "force", true ) ), t );
}
}
corruptedTransactionLogs = true;
log.warn( format( "Fail to read transaction log version %d.", version ), t );
commitsAfterCheckPoint = true;
}

if ( latestCheckPoint != null )
{
return latestCheckPoint( fromVersionBackwards, version, latestStartEntry, oldestVersionFound,
latestCheckPoint, latestLogEntryVersion, commitsAfterCheckPoint );
return checkpointTailInformation( highestLogVersion, latestStartEntry, oldestVersionFound,
latestLogEntryVersion, latestCheckPoint, corruptedTransactionLogs );
}

version--;
Expand All @@ -163,59 +160,23 @@ private LogTailInformation findLogTail() throws IOException
}
}

commitsAfterCheckPoint |= oldestStartEntry != null;
long firstTxAfterPosition = (commitsAfterCheckPoint && oldestStartEntry != null)
? extractFirstTxIdAfterPosition( oldestStartEntry.getStartPosition(), fromVersionBackwards )
: LogTailInformation.NO_TRANSACTION_ID;

return new LogTailInformation( null, commitsAfterCheckPoint, firstTxAfterPosition, oldestVersionFound,
fromVersionBackwards, latestLogEntryVersion );
return new LogTailInformation( corruptedTransactionLogs || startRecordAfterCheckpoint,
oldestStartEntryTransaction, oldestVersionFound, highestLogVersion, latestLogEntryVersion );
}

protected LogTailInformation latestCheckPoint( long fromVersionBackwards, long version,
LogEntryStart latestStartEntry, long oldestVersionFound, CheckPoint latestCheckPoint,
LogEntryVersion latestLogEntryVersion, boolean commitsAfterCheckPoint ) throws IOException
protected LogTailInformation checkpointTailInformation( long highestLogVersion, LogEntryStart latestStartEntry,
long oldestVersionFound, LogEntryVersion latestLogEntryVersion, CheckPoint latestCheckPoint,
boolean corruptedTransactionLogs ) throws IOException
{
// Is the latest start entry in this log file version later than what the latest check point targets?
LogPosition target = latestCheckPoint.getLogPosition();
boolean startEntryAfterCheckPoint = ((latestStartEntry != null) &&
(latestStartEntry.getStartPosition().compareTo( target ) >= 0)) ||
commitsAfterCheckPoint;
if ( !startEntryAfterCheckPoint )
{
if ( target.getLogVersion() < version )
{
// This check point entry targets a previous log file.
// Go there and see if there's a transaction. Reader is capped to that log version.
try
{
startEntryAfterCheckPoint =
extractFirstTxIdAfterPosition( target, version ) != LogTailInformation.NO_TRANSACTION_ID;
}
catch ( LogReadingException lre )
{
startEntryAfterCheckPoint = true;
}
}
}

// Extract first transaction id after check point target position.
// Reader may continue into log files after the initial version.
long firstTxIdAfterCheckPoint = LogTailInformation.NO_TRANSACTION_ID;
if ( startEntryAfterCheckPoint )
{
try
{
firstTxIdAfterCheckPoint = extractFirstTxIdAfterPosition( target, fromVersionBackwards );
}
catch ( LogReadingException lre )
{
//
}
}

return new LogTailInformation( latestCheckPoint, startEntryAfterCheckPoint,
firstTxIdAfterCheckPoint, oldestVersionFound, fromVersionBackwards, latestLogEntryVersion );
LogPosition checkPointLogPosition = latestCheckPoint.getLogPosition();
ExtractedTransactionRecord transactionRecord = extractFirstTxIdAfterPosition( checkPointLogPosition, highestLogVersion );
long firstTxIdAfterPosition = transactionRecord.getId();
boolean startRecordAfterCheckpoint = (firstTxIdAfterPosition != NO_TRANSACTION_ID) ||
((latestStartEntry != null) &&
(latestStartEntry.getStartPosition().compareTo( latestCheckPoint.getLogPosition() ) >= 0));
boolean corruptedLogs = transactionRecord.isFailure() || corruptedTransactionLogs;
return new LogTailInformation( latestCheckPoint, corruptedLogs || startRecordAfterCheckpoint,
firstTxIdAfterPosition, oldestVersionFound, highestLogVersion, latestLogEntryVersion );
}

/**
Expand All @@ -225,13 +186,15 @@ protected LogTailInformation latestCheckPoint( long fromVersionBackwards, long v
*
* @param initialPosition {@link LogPosition} to start scan from.
* @param maxLogVersion max log version to scan.
* @return txId of closes commit entry to {@code initialPosition}, or {@link LogTailInformation#NO_TRANSACTION_ID}
* if not found.
* @throws IOException on I/O error.
* @return value object that contains first transaction id of closes commit entry to {@code initialPosition},
* or {@link LogTailInformation#NO_TRANSACTION_ID} if not found. And failure flag that will be set to true if
* there was some exception during transaction log processing.
* @throws IOException on channel close I/O error.
*/
protected long extractFirstTxIdAfterPosition( LogPosition initialPosition, long maxLogVersion ) throws IOException
protected ExtractedTransactionRecord extractFirstTxIdAfterPosition( LogPosition initialPosition, long maxLogVersion ) throws IOException
{
LogPosition currentPosition = initialPosition;
ExtractedTransactionRecord transactionRecord = new ExtractedTransactionRecord();
while ( currentPosition.getLogVersion() <= maxLogVersion )
{
LogVersionedStoreChannel storeChannel = PhysicalLogFile.tryOpenForVersion( logFiles, fileSystem,
Expand All @@ -242,23 +205,24 @@ protected long extractFirstTxIdAfterPosition( LogPosition initialPosition, long
{
storeChannel.position( currentPosition.getByteOffset() );
try ( ReadAheadLogChannel logChannel = new ReadAheadLogChannel( storeChannel );
LogEntryCursor cursor = new LogEntryCursor( logEntryReader, logChannel ) )
LogEntryCursor cursor = new LogEntryCursor( logEntryReader, logChannel ) )
{
while ( cursor.next() )
{
LogEntry entry = cursor.get();
if ( entry instanceof LogEntryCommit )
{
return ((LogEntryCommit) entry).getTxId();
transactionRecord.setId( ((LogEntryCommit) entry).getTxId() );
return transactionRecord;
}
}
}
catch ( Throwable t )
{
log.warn( format( "Fail to read transaction log version %d.", currentPosition.getLogVersion() ), t );
throw new LogReadingException(
new LogPosition( currentPosition.getLogVersion(), storeChannel.position() ) );
}
}
catch ( Exception e )
{
// TODO: propagate error whats was wrong
transactionRecord.setFailure( true );
return transactionRecord;
}
finally
{
Expand All @@ -268,7 +232,7 @@ protected long extractFirstTxIdAfterPosition( LogPosition initialPosition, long

currentPosition = LogPosition.start( currentPosition.getLogVersion() + 1 );
}
return LogTailInformation.NO_TRANSACTION_ID;
return transactionRecord;
}

/**
Expand Down Expand Up @@ -299,41 +263,72 @@ public LogTailInformation getTailInformation() throws UnderlyingStorageException
return logTailInformation;
}

static class ExtractedTransactionRecord
{
private long id;
private boolean failure;

ExtractedTransactionRecord()
{
id = NO_TRANSACTION_ID;
failure = false;
}

public long getId()
{
return id;
}

public void setId( long id )
{
this.id = id;
}

public boolean isFailure()
{
return failure;
}

public void setFailure( boolean failure )
{
this.failure = failure;
}
}

public static class LogTailInformation
{
public static long NO_TRANSACTION_ID = -1;

public final CheckPoint lastCheckPoint;
public final boolean commitsAfterLastCheckPoint;
public final long firstTxIdAfterLastCheckPoint;
public final long oldestLogVersionFound;
public final long currentLogVersion;
public final LogEntryVersion latestLogEntryVersion;
private final boolean recordAfterCheckpoint;

public LogTailInformation( boolean recordAfterCheckpoint, long firstTxIdAfterLastCheckPoint,
long oldestLogVersionFound, long currentLogVersion,
LogEntryVersion latestLogEntryVersion )
{
this( null, recordAfterCheckpoint, firstTxIdAfterLastCheckPoint, oldestLogVersionFound, currentLogVersion,
latestLogEntryVersion );
}

public LogTailInformation( CheckPoint lastCheckPoint, boolean commitsAfterLastCheckPoint,
long firstTxIdAfterLastCheckPoint, long oldestLogVersionFound, long currentLogVersion, LogEntryVersion latestLogEntryVersion )
public LogTailInformation( CheckPoint lastCheckPoint, boolean recordAfterCheckpoint,
long firstTxIdAfterLastCheckPoint,
long oldestLogVersionFound, long currentLogVersion, LogEntryVersion latestLogEntryVersion )
{
this.lastCheckPoint = lastCheckPoint;
this.commitsAfterLastCheckPoint = commitsAfterLastCheckPoint;
this.firstTxIdAfterLastCheckPoint = firstTxIdAfterLastCheckPoint;
this.oldestLogVersionFound = oldestLogVersionFound;
this.currentLogVersion = currentLogVersion;
this.latestLogEntryVersion = latestLogEntryVersion;
this.recordAfterCheckpoint = recordAfterCheckpoint;
}
}

private class LogReadingException extends RuntimeException
{
private LogPosition logPosition;

LogReadingException( LogPosition logPosition )
public boolean commitsAfterLastCheckpoint()
{
this.logPosition = logPosition;
}

public LogPosition getLogPosition()
{
return logPosition;
return recordAfterCheckpoint;
}
}

}
Expand Up @@ -87,7 +87,7 @@ public PositionToRecoverFrom( LogTailScanner logTailScanner, Monitor monitor )
public LogPosition get() throws IOException
{
LogTailScanner.LogTailInformation logTailInformation = logTailScanner.getTailInformation();
if ( !logTailInformation.commitsAfterLastCheckPoint )
if ( !logTailInformation.commitsAfterLastCheckpoint() )
{
monitor.noCommitsAfterLastCheckPoint(
logTailInformation.lastCheckPoint != null ? logTailInformation.lastCheckPoint.getLogPosition() : null );
Expand Down
Expand Up @@ -188,7 +188,7 @@ public static void removeCheckPointFromTxLog( FileSystemAbstraction fileSystem,
LogTailScanner tailScanner = new LogTailScanner( logFiles, fileSystem, logEntryReader, NullLogService.getInstance() );
LogTailScanner.LogTailInformation logTailInformation = tailScanner.getTailInformation();

if ( logTailInformation.commitsAfterLastCheckPoint )
if ( logTailInformation.commitsAfterLastCheckpoint() )
{
// done already
return;
Expand Down

0 comments on commit e51a186

Please sign in to comment.