Skip to content

Commit

Permalink
Introduce transaction log pruner used during recovery to truncate all
Browse files Browse the repository at this point in the history
the logs after some specified position, that
recovery threats as corrupted or non-readable.

Transaction log file specified by provided log position will be truncated
to provided length, any subsequent files will be removed.
Any removed or modified log content will be stored in separate
corruption logs archive for further analysis and as an additional safety
option to have the possibility to fully restore original logs in a faulty case.
  • Loading branch information
MishaDemianenko committed Oct 3, 2017
1 parent 4842282 commit df8bdc7
Show file tree
Hide file tree
Showing 8 changed files with 436 additions and 41 deletions.
5 changes: 5 additions & 0 deletions community/kernel/pom.xml
Expand Up @@ -234,6 +234,11 @@ the relevant Commercial Agreement.
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.neo4j</groupId> <groupId>org.neo4j</groupId>
<artifactId>neo4j-io</artifactId> <artifactId>neo4j-io</artifactId>
Expand Down
Expand Up @@ -152,6 +152,7 @@
import org.neo4j.kernel.recovery.DefaultRecoverySPI; import org.neo4j.kernel.recovery.DefaultRecoverySPI;
import org.neo4j.kernel.recovery.PositionToRecoverFrom; import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.kernel.recovery.Recovery; import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.kernel.recovery.TransactionLogPruner;
import org.neo4j.kernel.spi.explicitindex.IndexImplementation; import org.neo4j.kernel.spi.explicitindex.IndexImplementation;
import org.neo4j.kernel.spi.explicitindex.IndexProviders; import org.neo4j.kernel.spi.explicitindex.IndexProviders;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
Expand Down Expand Up @@ -688,9 +689,9 @@ private void buildRecovery(
StorageEngine storageEngine, StorageEngine storageEngine,
LogicalTransactionStore logicalTransactionStore ) LogicalTransactionStore logicalTransactionStore )
{ {
Recovery.SPI spi = new DefaultRecoverySPI( storageEngine, logFiles, fileSystemAbstraction, tailScanner, Recovery.SPI spi = new DefaultRecoverySPI( storageEngine, tailScanner, transactionIdStore, logicalTransactionStore, positionMonitor );
transactionIdStore, logicalTransactionStore, positionMonitor ); TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemAbstraction );
Recovery recovery = new Recovery( spi, recoveryMonitor, startupStatistics, logService ); Recovery recovery = new Recovery( spi, startupStatistics, logPruner, logService, recoveryMonitor );
life.add( recovery ); life.add( recovery );
} }


Expand Down
Expand Up @@ -21,17 +21,16 @@


import java.io.IOException; import java.io.IOException;


import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.api.TransactionQueue; import org.neo4j.kernel.impl.api.TransactionQueue;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner; import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor; import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.recovery.Recovery.RecoveryApplier; import org.neo4j.kernel.recovery.Recovery.RecoveryApplier;
import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StorageEngine;
Expand All @@ -42,22 +41,17 @@
public class DefaultRecoverySPI implements Recovery.SPI public class DefaultRecoverySPI implements Recovery.SPI
{ {
private final PositionToRecoverFrom positionToRecoverFrom; private final PositionToRecoverFrom positionToRecoverFrom;
private final PhysicalLogFiles logFiles;
private final FileSystemAbstraction fs;
private final StorageEngine storageEngine; private final StorageEngine storageEngine;
private final TransactionIdStore transactionIdStore; private final TransactionIdStore transactionIdStore;
private final LogicalTransactionStore logicalTransactionStore; private final LogicalTransactionStore logicalTransactionStore;


public DefaultRecoverySPI( public DefaultRecoverySPI(
StorageEngine storageEngine, StorageEngine storageEngine,
PhysicalLogFiles logFiles, FileSystemAbstraction fs,
LogTailScanner logTailScanner, LogTailScanner logTailScanner,
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore, TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore,
PositionToRecoverFrom.Monitor monitor ) PositionToRecoverFrom.Monitor monitor )
{ {
this.storageEngine = storageEngine; this.storageEngine = storageEngine;
this.logFiles = logFiles;
this.fs = fs;
this.transactionIdStore = transactionIdStore; this.transactionIdStore = transactionIdStore;
this.logicalTransactionStore = logicalTransactionStore; this.logicalTransactionStore = logicalTransactionStore;
this.positionToRecoverFrom = new PositionToRecoverFrom( logTailScanner, monitor ); this.positionToRecoverFrom = new PositionToRecoverFrom( logTailScanner, monitor );
Expand Down Expand Up @@ -100,20 +94,20 @@ public TransactionCursor getTransactionsInReverseOrder( LogPosition position ) t


@Override @Override
public void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction, public void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
LogPosition positionAfterLastRecoveredTransaction ) throws Exception LogPosition positionAfterLastRecoveredTransaction )
{ {
long recoveredTransactionLogVersion = positionAfterLastRecoveredTransaction.getLogVersion();
long recoveredTransactionOffset = positionAfterLastRecoveredTransaction.getByteOffset();
if ( lastRecoveredTransaction != null ) if ( lastRecoveredTransaction != null )
{ {
LogEntryCommit commitEntry = lastRecoveredTransaction.getCommitEntry();
transactionIdStore.setLastCommittedAndClosedTransactionId( transactionIdStore.setLastCommittedAndClosedTransactionId(
lastRecoveredTransaction.getCommitEntry().getTxId(), commitEntry.getTxId(),
LogEntryStart.checksum( lastRecoveredTransaction.getStartEntry() ), LogEntryStart.checksum( lastRecoveredTransaction.getStartEntry() ),
lastRecoveredTransaction.getCommitEntry().getTimeWritten(), commitEntry.getTimeWritten(),
positionAfterLastRecoveredTransaction.getByteOffset(), recoveredTransactionOffset,
positionAfterLastRecoveredTransaction.getLogVersion() ); recoveredTransactionLogVersion );
} }

fs.truncate( logFiles.getLogFileForVersion( positionAfterLastRecoveredTransaction.getLogVersion() ),
positionAfterLastRecoveredTransaction.getByteOffset() );
} }


static class RecoveryVisitor implements RecoveryApplier static class RecoveryVisitor implements RecoveryApplier
Expand Down
Expand Up @@ -79,20 +79,23 @@ public interface SPI
RecoveryApplier getRecoveryApplier( TransactionApplicationMode mode ) throws Exception; RecoveryApplier getRecoveryApplier( TransactionApplicationMode mode ) throws Exception;


void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction, void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
LogPosition positionAfterLastRecoveredTransaction ) throws Exception; LogPosition positionAfterLastRecoveredTransaction );
} }


private final SPI spi; private final SPI spi;
private final Monitor monitor; private final Monitor monitor;
private final StartupStatisticsProvider startupStatistics; private final StartupStatisticsProvider startupStatistics;
private final TransactionLogPruner logPruner;
private final Log log; private final Log log;
private int numberOfRecoveredTransactions; private int numberOfRecoveredTransactions;


public Recovery( SPI spi, Monitor monitor, StartupStatisticsProvider startupStatistics, LogService logService ) public Recovery( SPI spi, StartupStatisticsProvider startupStatistics, TransactionLogPruner logPruner,
LogService logService, Monitor monitor )
{ {
this.spi = spi; this.spi = spi;
this.monitor = monitor; this.monitor = monitor;
this.startupStatistics = startupStatistics; this.startupStatistics = startupStatistics;
this.logPruner = logPruner;
this.log = logService.getInternalLog( Recovery.class ); this.log = logService.getInternalLog( Recovery.class );
} }


Expand Down Expand Up @@ -161,6 +164,7 @@ public void init() throws Throwable
recoveryToPosition = recoveryFromPosition; recoveryToPosition = recoveryFromPosition;
} }
} }
logPruner.prune( recoveryToPosition );


spi.transactionsRecovered( lastTransaction, recoveryToPosition ); spi.transactionsRecovered( lastTransaction, recoveryToPosition );
startupStatistics.setNumberOfRecoveredTransactions( numberOfRecoveredTransactions ); startupStatistics.setNumberOfRecoveredTransactions( numberOfRecoveredTransactions );
Expand Down
@@ -0,0 +1,161 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.recovery;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.function.LongConsumer;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;

import static java.lang.String.format;

/**
* Transaction log pruner used during recovery to truncate all the logs after some specified position, that
* recovery threats as corrupted or non-readable.
* Transaction log file specified by provided log position will be truncated to provided length, any
* subsequent files will be removed.
* Any removed or modified log content will be stored in separate corruption logs archive for further analysis and as
* an additional safety option to have the possibility to fully restore original logs in a faulty case.
*/
public class TransactionLogPruner
{
static final String CORRUPTED_TX_LOGS_FOLDER_NAME = "corrupted-tx-logs";
private static final String LOG_FILE_ARCHIVE_PATTERN = "corrupted-logs-%d-%d-%d.zip";

private final File storeDir;
private final PhysicalLogFiles logFiles;
private final FileSystemAbstraction fs;

public TransactionLogPruner( File storeDir, PhysicalLogFiles logFiles, FileSystemAbstraction fs )
{
this.storeDir = storeDir;
this.logFiles = logFiles;
this.fs = fs;
}

/**
* Prune all transaction logs after provided position. Log version specified in a position will be
* truncated to provided byte offset, any subsequent log files will be deleted. Backup copy of removed data will
* be stored in separate archive.
* @param positionAfterLastRecoveredTransaction position after last recovered transaction
* @throws IOException
*/
public void prune( LogPosition positionAfterLastRecoveredTransaction ) throws IOException
{
long recoveredTransactionLogVersion = positionAfterLastRecoveredTransaction.getLogVersion();
long recoveredTransactionOffset = positionAfterLastRecoveredTransaction.getByteOffset();
if ( isRecoveredLogCorrupted( recoveredTransactionLogVersion, recoveredTransactionOffset ) ||
isNotLastLogFile( recoveredTransactionLogVersion ) )
{
backupCorruptedContent( recoveredTransactionLogVersion, recoveredTransactionOffset );
truncateLogFiles( recoveredTransactionLogVersion, recoveredTransactionOffset );
}
}

private void truncateLogFiles( long recoveredTransactionLogVersion, long recoveredTransactionOffset )
throws IOException
{
File lastRecoveredTransactionLog = logFiles.getLogFileForVersion( recoveredTransactionLogVersion );
fs.truncate( lastRecoveredTransactionLog, recoveredTransactionOffset );
forEachSubsequentLogFile( recoveredTransactionLogVersion,
fileIndex -> fs.deleteFile( logFiles.getLogFileForVersion( fileIndex ) ) );
}

private void forEachSubsequentLogFile( long recoveredTransactionLogVersion, LongConsumer action )
{
long highestLogVersion = logFiles.getHighestLogVersion();
for ( long fileIndex = recoveredTransactionLogVersion + 1; fileIndex <= highestLogVersion; fileIndex++ )
{
action.accept( fileIndex );
}
}

private void backupCorruptedContent( long recoveredTransactionLogVersion, long recoveredTransactionOffset )
throws IOException
{
File corruptedLogArchive = getArchiveFile( recoveredTransactionLogVersion, recoveredTransactionOffset );
try ( ZipOutputStream recoveryContent = new ZipOutputStream(
new BufferedOutputStream( fs.openAsOutputStream( corruptedLogArchive, false ) ) ) )
{
ByteBuffer zipBuffer = ByteBuffer.allocate( (int) ByteUnit.mebiBytes( 1 ) );
copyTransactionLogContent( recoveredTransactionLogVersion, recoveredTransactionOffset, recoveryContent,
zipBuffer );
forEachSubsequentLogFile( recoveredTransactionLogVersion, fileIndex ->
{
try
{
copyTransactionLogContent( fileIndex, 0, recoveryContent, zipBuffer );
}
catch ( IOException io )
{
throw new UncheckedIOException( io );
}
} );
}
}

private File getArchiveFile( long recoveredTransactionLogVersion, long recoveredTransactionOffset )
{
File corruptedLogsFolder = new File( storeDir, CORRUPTED_TX_LOGS_FOLDER_NAME );
assert fs.mkdir( corruptedLogsFolder );
return new File( corruptedLogsFolder,
format( LOG_FILE_ARCHIVE_PATTERN, recoveredTransactionLogVersion, recoveredTransactionOffset,
System.currentTimeMillis() ) );
}

private void copyTransactionLogContent( long logFileIndex, long logOffset, ZipOutputStream destination,
ByteBuffer byteBuffer ) throws IOException
{
File logFile = logFiles.getLogFileForVersion( logFileIndex );
ZipEntry zipEntry = new ZipEntry( logFile.getName() );
destination.putNextEntry( zipEntry );
try ( StoreChannel transactionLogChannel = fs.open( logFile, "r" ) )
{
transactionLogChannel.position( logOffset );
while ( transactionLogChannel.read( byteBuffer ) >= 0 )
{
byteBuffer.flip();
destination.write( byteBuffer.array(), byteBuffer.position(), byteBuffer.remaining() );
byteBuffer.clear();
}
}
destination.closeEntry();
}

private boolean isNotLastLogFile( long recoveredTransactionLogVersion )
{
return logFiles.getHighestLogVersion() > recoveredTransactionLogVersion;
}

private boolean isRecoveredLogCorrupted( long recoveredTransactionLogVersion, long recoveredTransactionOffset )
{
return logFiles.getLogFileForVersion( recoveredTransactionLogVersion ).length() > recoveredTransactionOffset;
}
}

0 comments on commit df8bdc7

Please sign in to comment.