Skip to content

Commit

Permalink
Corrupted logs resilient recovery
Browse files Browse the repository at this point in the history
Allow recovery process to restore store with corrupted transaction logs.
Transaction logs can be corrupted in various scenarios, most frequently
that happens when out of disk space error occurred.
This PR introduce additional resilience in log reading during recovery
that will allow to restore as much transactions as database can read.
Any further logs after first non readable transaction are ignored and non readable as well.
All unreadable transactions logs will be truncated in the end of recovery.

To avoid cases when recovery can remove transactions logs during downgrade
that can happen by mistake additional "force" option introduced.
That will not gonna allow to remove logs with unknown version by default.
  • Loading branch information
MishaDemianenko committed Oct 3, 2017
1 parent dea7245 commit 4842282
Show file tree
Hide file tree
Showing 48 changed files with 983 additions and 374 deletions.
Expand Up @@ -43,6 +43,7 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory;
import org.neo4j.kernel.impl.recovery.RecoveryRequiredChecker;
import org.neo4j.logging.FormattedLogProvider;
Expand Down Expand Up @@ -223,7 +224,8 @@ private void checkDbState( File storeDir, Config additionalConfiguration ) throw
PageCache pageCache = ConfigurableStandalonePageCacheFactory
.createPageCache( fileSystem, additionalConfiguration ) )
{
if ( new RecoveryRequiredChecker( fileSystem, pageCache ).isRecoveryRequiredAt( storeDir ) )
RecoveryRequiredChecker requiredChecker = new RecoveryRequiredChecker( fileSystem, pageCache, NullLogService.getInstance() );
if ( requiredChecker.isRecoveryRequiredAt( storeDir ) )
{
throw new CommandFailed(
Strings.joinAsLines( "Active logical log detected, this might be a source of inconsistencies.",
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory;
import org.neo4j.kernel.impl.recovery.RecoveryRequiredChecker;
import org.neo4j.logging.FormattedLogProvider;
Expand Down Expand Up @@ -133,7 +134,8 @@ private void checkDbState( File storeDir, Config tuningConfiguration ) throws To
{
try ( PageCache pageCache = ConfigurableStandalonePageCacheFactory.createPageCache( fs, tuningConfiguration ) )
{
if ( new RecoveryRequiredChecker( fs, pageCache ).isRecoveryRequiredAt( storeDir ) )
RecoveryRequiredChecker requiredChecker = new RecoveryRequiredChecker( fs, pageCache, NullLogService.getInstance() );
if ( requiredChecker.isRecoveryRequiredAt( storeDir ) )
{
throw new ToolFailureException( Strings.joinAsLines(
"Active logical log detected, this might be a source of inconsistencies.",
Expand Down
Expand Up @@ -427,7 +427,7 @@ public void start() throws IOException
// Check the tail of transaction logs and validate version
final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, PhysicalLogFile.DEFAULT_NAME, fs );
final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, logEntryReader );
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, logEntryReader, logService );
LogVersionUpgradeChecker.check( tailScanner, config );

// Upgrade the store before we begin
Expand Down Expand Up @@ -650,7 +650,7 @@ private NeoStoreTransactionLogModule buildTransactionLogs(
logFile, logRotation, transactionMetadataCache, transactionIdStore, explicitIndexTransactionOrdering,
databaseHealth ) );
final LogicalTransactionStore logicalTransactionStore =
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader );
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader, logService );

int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx );
final CountCommittedTransactionThreshold countCommittedTransactionThreshold =
Expand Down Expand Up @@ -688,18 +688,9 @@ private void buildRecovery(
StorageEngine storageEngine,
LogicalTransactionStore logicalTransactionStore )
{
Recovery.SPI spi =
new DefaultRecoverySPI( storageEngine, logFiles, fileSystemAbstraction, tailScanner, transactionIdStore,
logicalTransactionStore, positionMonitor );
Recovery recovery = new Recovery( spi, recoveryMonitor );
monitors.addMonitorListener( new Recovery.Monitor()
{
@Override
public void recoveryCompleted( int numberOfRecoveredTransactions )
{
startupStatistics.setNumberOfRecoveredTransactions( numberOfRecoveredTransactions );
}
} );
Recovery.SPI spi = new DefaultRecoverySPI( storageEngine, logFiles, fileSystemAbstraction, tailScanner,
transactionIdStore, logicalTransactionStore, positionMonitor );
Recovery recovery = new Recovery( spi, recoveryMonitor, startupStatistics, logService );
life.add( recovery );
}

Expand Down
Expand Up @@ -27,6 +27,12 @@ public class SimpleLogService extends AbstractLogService
private final LogProvider userLogProvider;
private final LogProvider internalLogProvider;

public SimpleLogService( LogProvider commonLogProvider )
{
this.userLogProvider = commonLogProvider;
this.internalLogProvider = commonLogProvider;
}

public SimpleLogService( LogProvider userLogProvider, LogProvider internalLogProvider )
{
this.userLogProvider = new DuplicatingLogProvider( userLogProvider, internalLogProvider );
Expand Down
Expand Up @@ -24,7 +24,7 @@

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
Expand All @@ -43,30 +43,27 @@ public class RecoveryRequiredChecker
{
private final FileSystemAbstraction fs;
private final PageCache pageCache;
private final LogService logService;

public RecoveryRequiredChecker( FileSystemAbstraction fs, PageCache pageCache )
public RecoveryRequiredChecker( FileSystemAbstraction fs, PageCache pageCache, LogService logService )
{
this.fs = fs;
this.pageCache = pageCache;
this.logService = logService;
}

public boolean isRecoveryRequiredAt( File dataDir ) throws IOException
{
File neoStore = new File( dataDir, MetaDataStore.DEFAULT_NAME );
boolean noStoreFound = !NeoStores.isStorePresent( pageCache, dataDir );

// We need config to determine where the logical log files are
if ( noStoreFound )
if ( !NeoStores.isStorePresent( pageCache, dataDir ) )
{
// No database in the specified directory.
return false;
}

PhysicalLogFiles logFiles = new PhysicalLogFiles( dataDir, fs );

LogEntryReader<ReadableClosablePositionAwareChannel> reader = new VersionAwareLogEntryReader<>();

LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, reader );
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, reader, logService );
return new PositionToRecoverFrom( tailScanner, NO_MONITOR ).get() != LogPosition.UNSPECIFIED;
}
}
Expand Up @@ -82,7 +82,7 @@ public void migrate( File storeDir )
{
LogProvider logProvider = logService.getInternalLogProvider();
UpgradableDatabase upgradableDatabase =
new UpgradableDatabase( new StoreVersionCheck( pageCache ), format, tailScanner );
new UpgradableDatabase( new StoreVersionCheck( pageCache ), format, tailScanner, logService );
StoreUpgrader storeUpgrader = new StoreUpgrader( upgradableDatabase, progressMonitor, config, fs, pageCache,
logProvider );

Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.File;

import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.format.FormatFamily;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
Expand All @@ -43,13 +44,15 @@ public class UpgradableDatabase
private final StoreVersionCheck storeVersionCheck;
private final RecordFormats format;
private final LogTailScanner tailScanner;
private final LogService logService;

public UpgradableDatabase( StoreVersionCheck storeVersionCheck, RecordFormats format,
LogTailScanner tailScanner )
LogTailScanner tailScanner, LogService logService )
{
this.storeVersionCheck = storeVersionCheck;
this.format = format;
this.tailScanner = tailScanner;
this.logService = logService;
}

/**
Expand Down
Expand Up @@ -331,7 +331,7 @@ private LogPosition extractTransactionLogPosition( File neoStore, File storeDir,
{
return new LogPosition( BASE_TX_LOG_VERSION, BASE_TX_LOG_BYTE_OFFSET );
}
long offset = fileSystem.getFileSize( logFiles.getLogFileForVersion( logVersion ) );
long offset = fileSystem.getFileSize( logFiles.getHighestLogFile() );
return new LogPosition( logVersion, offset );

}
Expand Down
Expand Up @@ -21,16 +21,21 @@

import java.io.IOException;

import org.neo4j.helpers.Exceptions;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.transaction.log.entry.CheckPoint;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.impl.transaction.log.entry.UnsupportedLogVersionException;
import org.neo4j.logging.Log;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static java.lang.String.format;
import static org.neo4j.kernel.impl.transaction.log.LogVersionRepository.INITIAL_LOG_VERSION;

/**
Expand All @@ -47,16 +52,18 @@ public class LogTailScanner
private final FileSystemAbstraction fileSystem;
private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader;
private LogTailInformation logTailInformation;
private final Log log;

public LogTailScanner( PhysicalLogFiles logFiles, FileSystemAbstraction fileSystem,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader )
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, LogService logService )
{
this.logFiles = logFiles;
this.fileSystem = fileSystem;
this.logEntryReader = logEntryReader;
this.log = logService.getInternalLog( LogTailScanner.class );
}

private LogTailInformation update() throws IOException
private LogTailInformation findLogTail() throws IOException
{
final long fromVersionBackwards = logFiles.getHighestLogVersion();
long version = fromVersionBackwards;
Expand All @@ -65,6 +72,7 @@ private LogTailInformation update() throws IOException
LogEntryStart oldestStartEntry = null;
long oldestVersionFound = -1;
LogEntryVersion latestLogEntryVersion = null;
boolean commitsAfterCheckPoint = false;

while ( version >= INITIAL_LOG_VERSION )
{
Expand All @@ -78,8 +86,7 @@ private LogTailInformation update() throws IOException
oldestVersionFound = version;

CheckPoint latestCheckPoint = null;
ReadableLogChannel recoveredDataChannel =
new ReadAheadLogChannel( channel, NO_MORE_CHANNELS );
ReadableLogChannel recoveredDataChannel = new ReadAheadLogChannel( channel );
boolean firstStartEntry = true;

try ( LogEntryCursor cursor = new LogEntryCursor( logEntryReader, recoveredDataChannel ) )
Expand Down Expand Up @@ -120,11 +127,31 @@ private LogTailInformation update() throws IOException
}
}
}
catch ( Throwable t )
{
if ( Exceptions.contains( t, UnsupportedLogVersionException.class ) )
{
if ( FeatureToggles.flag( LogTailScanner.class, "force", false ) )
{
log.warn( "Unsupported log version was found in transactional logs, but log processing was " +
"forced.", t );
}
else
{
throw new RuntimeException( format( "Unsupported transaction log version found. " +
"To force transactional processing anyway and trip non recognised transactions please " +
"use %s. By using this flag you can lose part of your transactions log. This operation is irretrievable." +
" ", FeatureToggles.toggle( LogTailScanner.class, "force", true ) ), t );
}
}
log.warn( format( "Fail to read transaction log version %d.", version ), t );
commitsAfterCheckPoint = true;
}

if ( latestCheckPoint != null )
{
return latestCheckPoint( fromVersionBackwards, version, latestStartEntry, oldestVersionFound,
latestCheckPoint, latestLogEntryVersion );
latestCheckPoint, latestLogEntryVersion, commitsAfterCheckPoint );
}

version--;
Expand All @@ -136,8 +163,8 @@ private LogTailInformation update() throws IOException
}
}

boolean commitsAfterCheckPoint = oldestStartEntry != null;
long firstTxAfterPosition = commitsAfterCheckPoint
commitsAfterCheckPoint |= oldestStartEntry != null;
long firstTxAfterPosition = (commitsAfterCheckPoint && oldestStartEntry != null)
? extractFirstTxIdAfterPosition( oldestStartEntry.getStartPosition(), fromVersionBackwards )
: LogTailInformation.NO_TRANSACTION_ID;

Expand All @@ -147,28 +174,46 @@ private LogTailInformation update() throws IOException

protected LogTailInformation latestCheckPoint( long fromVersionBackwards, long version,
LogEntryStart latestStartEntry, long oldestVersionFound, CheckPoint latestCheckPoint,
LogEntryVersion latestLogEntryVersion ) throws IOException
LogEntryVersion latestLogEntryVersion, boolean commitsAfterCheckPoint ) throws IOException
{
// Is the latest start entry in this log file version later than what the latest check point targets?
LogPosition target = latestCheckPoint.getLogPosition();
boolean startEntryAfterCheckPoint = latestStartEntry != null &&
latestStartEntry.getStartPosition().compareTo( target ) >= 0;
boolean startEntryAfterCheckPoint = ((latestStartEntry != null) &&
(latestStartEntry.getStartPosition().compareTo( target ) >= 0)) ||
commitsAfterCheckPoint;
if ( !startEntryAfterCheckPoint )
{
if ( target.getLogVersion() < version )
{
// This check point entry targets a previous log file.
// Go there and see if there's a transaction. Reader is capped to that log version.
startEntryAfterCheckPoint = extractFirstTxIdAfterPosition( target, version ) !=
LogTailInformation.NO_TRANSACTION_ID;
try
{
startEntryAfterCheckPoint =
extractFirstTxIdAfterPosition( target, version ) != LogTailInformation.NO_TRANSACTION_ID;
}
catch ( LogReadingException lre )
{
startEntryAfterCheckPoint = true;
}
}
}

// Extract first transaction id after check point target position.
// Reader may continue into log files after the initial version.
long firstTxIdAfterCheckPoint = startEntryAfterCheckPoint
? extractFirstTxIdAfterPosition( target, fromVersionBackwards )
: LogTailInformation.NO_TRANSACTION_ID;
long firstTxIdAfterCheckPoint = LogTailInformation.NO_TRANSACTION_ID;
if ( startEntryAfterCheckPoint )
{
try
{
firstTxIdAfterCheckPoint = extractFirstTxIdAfterPosition( target, fromVersionBackwards );
}
catch ( LogReadingException lre )
{
//
}
}

return new LogTailInformation( latestCheckPoint, startEntryAfterCheckPoint,
firstTxIdAfterCheckPoint, oldestVersionFound, fromVersionBackwards, latestLogEntryVersion );
}
Expand Down Expand Up @@ -196,7 +241,7 @@ protected long extractFirstTxIdAfterPosition( LogPosition initialPosition, long
try
{
storeChannel.position( currentPosition.getByteOffset() );
try ( ReadAheadLogChannel logChannel = new ReadAheadLogChannel( storeChannel, NO_MORE_CHANNELS );
try ( ReadAheadLogChannel logChannel = new ReadAheadLogChannel( storeChannel );
LogEntryCursor cursor = new LogEntryCursor( logEntryReader, logChannel ) )
{
while ( cursor.next() )
Expand All @@ -208,6 +253,12 @@ protected long extractFirstTxIdAfterPosition( LogPosition initialPosition, long
}
}
}
catch ( Throwable t )
{
log.warn( format( "Fail to read transaction log version %d.", currentPosition.getLogVersion() ), t );
throw new LogReadingException(
new LogPosition( currentPosition.getLogVersion(), storeChannel.position() ) );
}
}
finally
{
Expand Down Expand Up @@ -237,7 +288,7 @@ public LogTailInformation getTailInformation() throws UnderlyingStorageException
{
try
{
logTailInformation = update();
logTailInformation = findLogTail();
}
catch ( IOException e )
{
Expand Down Expand Up @@ -270,4 +321,19 @@ public LogTailInformation( CheckPoint lastCheckPoint, boolean commitsAfterLastCh
this.latestLogEntryVersion = latestLogEntryVersion;
}
}

private class LogReadingException extends RuntimeException
{
private LogPosition logPosition;

LogReadingException( LogPosition logPosition )
{
this.logPosition = logPosition;
}

public LogPosition getLogPosition()
{
return logPosition;
}
}
}
Expand Up @@ -58,8 +58,7 @@ TransactionCursor getTransactions( long transactionIdToStartFrom )
* or if the transaction has been committed, but information about it is no longer available for some reason.
* @throws IOException if there was an I/O related error looking for the start transaction.
*/
TransactionCursor getTransactions( LogPosition position )
throws IOException;
TransactionCursor getTransactions( LogPosition position ) throws IOException;

/**
* Acquires a {@link TransactionCursor cursor} which will provide {@link CommittedTransactionRepresentation}
Expand Down

0 comments on commit 4842282

Please sign in to comment.