Skip to content

Commit

Permalink
Introduce "failOnCorruptedLogFiles" feature flag in NeoStoreDataSource
Browse files Browse the repository at this point in the history
to force recovery to fail instead in case if corrupted log in encountered
instead of trying to clean up and truncate transaction logs.
By default recovery will try to do as much as it can to recover database
and to avoid requesting extra options from users.
  • Loading branch information
MishaDemianenko committed Oct 3, 2017
1 parent 857b1b5 commit 192dd70
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 53 deletions.
Expand Up @@ -168,6 +168,7 @@
import org.neo4j.storageengine.api.StoreFileMetadata;
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.time.SystemNanoClock;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue;

Expand Down Expand Up @@ -232,6 +233,8 @@ boolean applicable( DiagnosticsPhase phase )
}

public static final String DEFAULT_DATA_SOURCE_NAME = "nioneodb";
private final boolean failOnCorruptedLogFiles = FeatureToggles.flag( NeoStoreDataSource.class,
"failOnCorruptedLogFiles", false );

private final Monitors monitors;
private final Tracers tracers;
Expand Down Expand Up @@ -431,7 +434,8 @@ public void start() throws IOException
// Check the tail of transaction logs and validate version
final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, PhysicalLogFile.DEFAULT_NAME, fs );
final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, logEntryReader, monitors );

LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, logEntryReader, monitors, failOnCorruptedLogFiles );
monitors.addMonitorListener( new LoggingLogTailScannerMonitor( logService.getInternalLog( LogTailScanner.class ) ) );
LogVersionUpgradeChecker.check( tailScanner, config );

Expand Down Expand Up @@ -693,10 +697,10 @@ private void buildRecovery(
StorageEngine storageEngine,
LogicalTransactionStore logicalTransactionStore )
{
RecoveryService
spi = new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore, logicalTransactionStore, positionMonitor );
RecoveryService recoveryService = new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore,
logicalTransactionStore, positionMonitor );
TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemAbstraction );
Recovery recovery = new Recovery( spi, startupStatistics, logPruner, recoveryMonitor );
Recovery recovery = new Recovery( recoveryService, startupStatistics, logPruner, recoveryMonitor, failOnCorruptedLogFiles );
life.add( recovery );
}

Expand Down
Expand Up @@ -43,9 +43,7 @@ public class DefaultRecoveryService implements RecoveryService
private final TransactionIdStore transactionIdStore;
private final LogicalTransactionStore logicalTransactionStore;

public DefaultRecoveryService(
StorageEngine storageEngine,
LogTailScanner logTailScanner,
public DefaultRecoveryService( StorageEngine storageEngine, LogTailScanner logTailScanner,
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore,
PositionToRecoverFrom.Monitor monitor )
{
Expand Down
Expand Up @@ -38,11 +38,8 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.impl.transaction.log.entry.UnsupportedLogVersionException;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

import static java.lang.String.format;
import static org.neo4j.kernel.impl.transaction.log.LogVersionRepository.INITIAL_LOG_VERSION;

/**
Expand All @@ -61,14 +58,23 @@ public class LogTailScanner
private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader;
private LogTailInformation logTailInformation;
private final LogTailScannerMonitor monitor;
private final boolean failOnCorruptedLogFiles;

public LogTailScanner( PhysicalLogFiles logFiles, FileSystemAbstraction fileSystem,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, Monitors monitors )
{
this( logFiles, fileSystem, logEntryReader, monitors, false );
}

public LogTailScanner( PhysicalLogFiles logFiles, FileSystemAbstraction fileSystem,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, Monitors monitors,
boolean failOnCorruptedLogFiles )
{
this.logFiles = logFiles;
this.fileSystem = fileSystem;
this.logEntryReader = logEntryReader;
this.monitor = monitors.newMonitor( LogTailScannerMonitor.class );
this.failOnCorruptedLogFiles = failOnCorruptedLogFiles;
}

private LogTailInformation findLogTail() throws IOException
Expand Down Expand Up @@ -133,22 +139,12 @@ else if ( entry instanceof LogEntryStart )
}
catch ( Throwable t )
{
if ( Exceptions.contains( t, UnsupportedLogVersionException.class ) )
monitor.corruptedLogFile( version, t );
if ( failOnCorruptedLogFiles )
{
if ( FeatureToggles.flag( LogTailScanner.class, "force", false ) )
{
monitor.forced( t );
}
else
{
throw new RuntimeException( format( "Unsupported transaction log version found. " +
"To force transactional processing anyway and trip non recognised transactions please " +
"use %s. By using this flag you can lose part of your transactions log. This operation is irretrievable." +
" ", FeatureToggles.toggle( LogTailScanner.class, "force", true ) ), t );
}
throw Exceptions.launderedException( t );
}
corruptedTransactionLogs = true;
monitor.corruptedLogFile( version, t );
}

if ( latestCheckPoint != null )
Expand Down
Expand Up @@ -21,7 +21,5 @@

public interface LogTailScannerMonitor
{
void forced( Throwable t );

void corruptedLogFile( long version, Throwable t );
}
Expand Up @@ -30,12 +30,6 @@ public LoggingLogTailScannerMonitor( Log log )
this.log = log;
}

@Override
public void forced( Throwable t )
{
log.warn( "Unsupported log version was found in transactional logs, but log processing was forced.", t );
}

@Override
public void corruptedLogFile( long version, Throwable t )
{
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.recovery;

import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.impl.core.StartupStatisticsProvider;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
Expand All @@ -41,15 +42,17 @@ public class Recovery extends LifecycleAdapter
private final RecoveryMonitor monitor;
private final StartupStatisticsProvider startupStatistics;
private final TransactionLogPruner logPruner;
private final boolean failOnCorruptedLogFiles;
private int numberOfRecoveredTransactions;

public Recovery( RecoveryService recoveryService, StartupStatisticsProvider startupStatistics,
TransactionLogPruner logPruner, RecoveryMonitor monitor )
TransactionLogPruner logPruner, RecoveryMonitor monitor, boolean failOnCorruptedLogFiles )
{
this.recoveryService = recoveryService;
this.monitor = monitor;
this.startupStatistics = startupStatistics;
this.logPruner = logPruner;
this.failOnCorruptedLogFiles = failOnCorruptedLogFiles;
}

@Override
Expand Down Expand Up @@ -101,16 +104,20 @@ public void init() throws Throwable
}
}
}
catch ( Exception e )
catch ( Throwable t )
{
if ( failOnCorruptedLogFiles )
{
throw Exceptions.launderedException( t );
}
if ( lastTransaction != null )
{
LogEntryCommit commitEntry = lastTransaction.getCommitEntry();
monitor.failToRecoverTransactionsAfterCommit( e, commitEntry, recoveryToPosition );
monitor.failToRecoverTransactionsAfterCommit( t, commitEntry, recoveryToPosition );
}
else
{
monitor.failToRecoverTransactionsAfterPosition( e, recoveryFromPosition );
monitor.failToRecoverTransactionsAfterPosition( t, recoveryFromPosition );
recoveryToPosition = recoveryFromPosition;
}
}
Expand Down
Expand Up @@ -66,7 +66,6 @@
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.recovery.LogTailScanner;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.test.TestGraphDatabaseFactory;
Expand Down Expand Up @@ -107,11 +106,33 @@ public void setUp() throws Exception
@After
public void tearDown()
{
FeatureToggles.set( LogTailScanner.class, "force", false );
FeatureToggles.set( NeoStoreDataSource.class, "failOnCorruptedLogFiles", false );
}

@Test
public void doNotTruncateNewerTransactionLogFile() throws IOException
public void evenTruncateNewerTransactionLogFile() throws IOException
{
GraphDatabaseAPI database = (GraphDatabaseAPI) databaseFactory.newEmbeddedDatabase( storeDir );
TransactionIdStore transactionIdStore = getTransactionIdStore( database );
long lastClosedTrandactionBeforeStart = transactionIdStore.getLastClosedTransactionId();
for ( int i = 0; i < 10; i++ )
{
generateTransaction( database );
}
long numberOfClosedTransactions = getTransactionIdStore( database ).getLastClosedTransactionId() -
lastClosedTrandactionBeforeStart;
database.shutdown();
removeLastCheckpointRecordFromLastLogFile();
addRandomLongsToLastLogFile();

database = (GraphDatabaseAPI) databaseFactory.newEmbeddedDatabase( storeDir );
database.shutdown();

assertEquals( numberOfClosedTransactions, recoveryMonitor.getNumberOfRecoveredTransactions() );
}

@Test
public void doNotTruncateNewerTransactionLogFileWhenFailOnError() throws IOException
{
GraphDatabaseAPI database = (GraphDatabaseAPI) databaseFactory.newEmbeddedDatabase( storeDir );
for ( int i = 0; i < 10; i++ )
Expand All @@ -122,6 +143,7 @@ public void doNotTruncateNewerTransactionLogFile() throws IOException
removeLastCheckpointRecordFromLastLogFile();
addRandomLongsToLastLogFile();

FeatureToggles.set( NeoStoreDataSource.class, "failOnCorruptedLogFiles", true );
expectedException.expectCause( new RootCauseMatcher<>( UnsupportedLogVersionException.class ) );

database = (GraphDatabaseAPI) databaseFactory.newEmbeddedDatabase( storeDir );
Expand All @@ -143,13 +165,11 @@ public void truncateNewerTransactionLogFileWhenForced() throws IOException
removeLastCheckpointRecordFromLastLogFile();
addRandomLongsToLastLogFile();

FeatureToggles.set( LogTailScanner.class, "force", true );
database = (GraphDatabaseAPI) databaseFactory.newEmbeddedDatabase( storeDir );
database.shutdown();

logProvider.assertContainsMessageContaining( "Fail to read transaction log version 0." );
logProvider.assertContainsMessageContaining( "Fail to read transaction log version 0. Last valid transaction start offset is: 5668." );
logProvider.assertContainsMessageContaining( "Unsupported log version was found in transactional logs, but log processing was forced." );
assertEquals( numberOfClosedTransactions, recoveryMonitor.getNumberOfRecoveredTransactions() );
}

Expand All @@ -174,6 +194,18 @@ public void recoverFirstCorruptedTransactionSingleFileNoCheckpoint() throws IOEx
assertEquals( 1, logEntriesDistribution.count( CheckPoint.class ) );
}

@Test
public void failToRecoverFirstCorruptedTransactionSingleFileNoCheckpointIfFailOnCorruption() throws IOException
{
PositiveLogFilesBasedLogVersionRepository versionRepository = addCorruptedCommandsToLastLogFile();

FeatureToggles.set( NeoStoreDataSource.class, "failOnCorruptedLogFiles", true );
expectedException.expectCause( new RootCauseMatcher<>( NegativeArraySizeException.class ) );

GraphDatabaseService recoveredDatabase = databaseFactory.newEmbeddedDatabase( storeDir );
recoveredDatabase.shutdown();
}

@Test
public void recoverNotAFirstCorruptedTransactionSingleFileNoCheckpoint() throws IOException
{
Expand Down
14 changes: 4 additions & 10 deletions community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java
Expand Up @@ -157,8 +157,6 @@ public void shouldRecoverExistingData() throws Exception
transactionIdStore::getLastCommittedTransactionId, logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
AssertableLogProvider logProvider = new AssertableLogProvider( true );

TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
life.add( new Recovery( new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
Expand Down Expand Up @@ -208,7 +206,7 @@ public boolean visit( CommittedTransactionRepresentation tx ) throws Exception
}
};
}
}, new StartupStatisticsProvider(), logPruner, monitor ) );
}, new StartupStatisticsProvider(), logPruner, monitor, false ) );

life.start();

Expand Down Expand Up @@ -262,8 +260,6 @@ public void shouldSeeThatACleanDatabaseShouldNotRequireRecovery() throws Excepti
transactionIdStore::getLastCommittedTransactionId, logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
AssertableLogProvider logProvider = new AssertableLogProvider( true );

TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
life.add( new Recovery( new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
Expand All @@ -272,7 +268,7 @@ public void startRecovery()
{
fail( "Recovery should not be required" );
}
}, new StartupStatisticsProvider(), logPruner, monitor ));
}, new StartupStatisticsProvider(), logPruner, monitor, false ));

life.start();

Expand Down Expand Up @@ -404,8 +400,6 @@ private boolean recover( File storeDir, PhysicalLogFiles logFiles )
transactionIdStore::getLastCommittedTransactionId, logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
AssertableLogProvider logProvider = new AssertableLogProvider( true );

TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
life.add( new Recovery( new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
Expand All @@ -414,7 +408,7 @@ public void startRecovery()
{
recoveryRequired.set( true );
}
}, new StartupStatisticsProvider(), logPruner, monitor ) );
}, new StartupStatisticsProvider(), logPruner, monitor, false ) );

life.start();
}
Expand All @@ -428,7 +422,7 @@ public void startRecovery()
private LogTailScanner getTailScanner( PhysicalLogFiles logFiles,
LogEntryReader<ReadableClosablePositionAwareChannel> reader )
{
return new LogTailScanner( logFiles, fileSystemRule.get(), reader, monitors );
return new LogTailScanner( logFiles, fileSystemRule.get(), reader, monitors, false );
}

private void writeSomeData( File file,
Expand Down
Expand Up @@ -49,7 +49,6 @@
import org.neo4j.kernel.recovery.RecoveryMonitor;
import org.neo4j.kernel.recovery.RecoveryService;
import org.neo4j.kernel.recovery.TransactionLogPruner;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.test.rule.TestDirectory;
Expand Down Expand Up @@ -198,7 +197,6 @@ public void shouldOpenAndRecoverExistingData() throws Exception

life.add( new BatchingTransactionAppender( logFile, NO_ROTATION, positionCache,
transactionIdStore, BYPASS, DATABASE_HEALTH ) );
AssertableLogProvider logProvider = new AssertableLogProvider( true );
TransactionLogPruner logPruner = new TransactionLogPruner( testDir, logFiles, fileSystemRule.get() );
life.add( new Recovery( new RecoveryService()
{
Expand Down Expand Up @@ -237,7 +235,7 @@ public void transactionsRecovered( CommittedTransactionRepresentation lastRecove
LogPosition positionAfterLastRecoveredTransaction )
{
}
}, new StartupStatisticsProvider(), logPruner, mock( RecoveryMonitor.class ) ) );
}, new StartupStatisticsProvider(), logPruner, mock( RecoveryMonitor.class ), false ) );

// WHEN
try
Expand Down

0 comments on commit 192dd70

Please sign in to comment.