diff --git a/community/kernel/pom.xml b/community/kernel/pom.xml
index d91c871fbe3af..d9bf11c7d6210 100644
--- a/community/kernel/pom.xml
+++ b/community/kernel/pom.xml
@@ -234,6 +234,11 @@ the relevant Commercial Agreement.
test-jar
test
+
+ commons-io
+ commons-io
+ test
+
org.neo4j
neo4j-io
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java
index 8e403811649e4..2a85248ba37f9 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java
@@ -152,6 +152,7 @@
import org.neo4j.kernel.recovery.DefaultRecoverySPI;
import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.kernel.recovery.Recovery;
+import org.neo4j.kernel.recovery.TransactionLogPruner;
import org.neo4j.kernel.spi.explicitindex.IndexImplementation;
import org.neo4j.kernel.spi.explicitindex.IndexProviders;
import org.neo4j.logging.Log;
@@ -688,9 +689,9 @@ private void buildRecovery(
StorageEngine storageEngine,
LogicalTransactionStore logicalTransactionStore )
{
- Recovery.SPI spi = new DefaultRecoverySPI( storageEngine, logFiles, fileSystemAbstraction, tailScanner,
- transactionIdStore, logicalTransactionStore, positionMonitor );
- Recovery recovery = new Recovery( spi, recoveryMonitor, startupStatistics, logService );
+ Recovery.SPI spi = new DefaultRecoverySPI( storageEngine, tailScanner, transactionIdStore, logicalTransactionStore, positionMonitor );
+ TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemAbstraction );
+ Recovery recovery = new Recovery( spi, startupStatistics, logPruner, logService, recoveryMonitor );
life.add( recovery );
}
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/recovery/DefaultRecoverySPI.java b/community/kernel/src/main/java/org/neo4j/kernel/recovery/DefaultRecoverySPI.java
index 1c73cc59f5dc2..cbae08535d5fc 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/recovery/DefaultRecoverySPI.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/recovery/DefaultRecoverySPI.java
@@ -21,7 +21,6 @@
import java.io.IOException;
-import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.api.TransactionQueue;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
@@ -29,9 +28,9 @@
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
-import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
+import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.recovery.Recovery.RecoveryApplier;
import org.neo4j.storageengine.api.StorageEngine;
@@ -42,22 +41,17 @@
public class DefaultRecoverySPI implements Recovery.SPI
{
private final PositionToRecoverFrom positionToRecoverFrom;
- private final PhysicalLogFiles logFiles;
- private final FileSystemAbstraction fs;
private final StorageEngine storageEngine;
private final TransactionIdStore transactionIdStore;
private final LogicalTransactionStore logicalTransactionStore;
public DefaultRecoverySPI(
StorageEngine storageEngine,
- PhysicalLogFiles logFiles, FileSystemAbstraction fs,
LogTailScanner logTailScanner,
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore,
PositionToRecoverFrom.Monitor monitor )
{
this.storageEngine = storageEngine;
- this.logFiles = logFiles;
- this.fs = fs;
this.transactionIdStore = transactionIdStore;
this.logicalTransactionStore = logicalTransactionStore;
this.positionToRecoverFrom = new PositionToRecoverFrom( logTailScanner, monitor );
@@ -100,20 +94,20 @@ public TransactionCursor getTransactionsInReverseOrder( LogPosition position ) t
@Override
public void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
- LogPosition positionAfterLastRecoveredTransaction ) throws Exception
+ LogPosition positionAfterLastRecoveredTransaction )
{
+ long recoveredTransactionLogVersion = positionAfterLastRecoveredTransaction.getLogVersion();
+ long recoveredTransactionOffset = positionAfterLastRecoveredTransaction.getByteOffset();
if ( lastRecoveredTransaction != null )
{
+ LogEntryCommit commitEntry = lastRecoveredTransaction.getCommitEntry();
transactionIdStore.setLastCommittedAndClosedTransactionId(
- lastRecoveredTransaction.getCommitEntry().getTxId(),
+ commitEntry.getTxId(),
LogEntryStart.checksum( lastRecoveredTransaction.getStartEntry() ),
- lastRecoveredTransaction.getCommitEntry().getTimeWritten(),
- positionAfterLastRecoveredTransaction.getByteOffset(),
- positionAfterLastRecoveredTransaction.getLogVersion() );
+ commitEntry.getTimeWritten(),
+ recoveredTransactionOffset,
+ recoveredTransactionLogVersion );
}
-
- fs.truncate( logFiles.getLogFileForVersion( positionAfterLastRecoveredTransaction.getLogVersion() ),
- positionAfterLastRecoveredTransaction.getByteOffset() );
}
static class RecoveryVisitor implements RecoveryApplier
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/recovery/Recovery.java b/community/kernel/src/main/java/org/neo4j/kernel/recovery/Recovery.java
index a07b4fc4b61fd..e86c8b6b85865 100644
--- a/community/kernel/src/main/java/org/neo4j/kernel/recovery/Recovery.java
+++ b/community/kernel/src/main/java/org/neo4j/kernel/recovery/Recovery.java
@@ -79,20 +79,23 @@ public interface SPI
RecoveryApplier getRecoveryApplier( TransactionApplicationMode mode ) throws Exception;
void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
- LogPosition positionAfterLastRecoveredTransaction ) throws Exception;
+ LogPosition positionAfterLastRecoveredTransaction );
}
private final SPI spi;
private final Monitor monitor;
private final StartupStatisticsProvider startupStatistics;
+ private final TransactionLogPruner logPruner;
private final Log log;
private int numberOfRecoveredTransactions;
- public Recovery( SPI spi, Monitor monitor, StartupStatisticsProvider startupStatistics, LogService logService )
+ public Recovery( SPI spi, StartupStatisticsProvider startupStatistics, TransactionLogPruner logPruner,
+ LogService logService, Monitor monitor )
{
this.spi = spi;
this.monitor = monitor;
this.startupStatistics = startupStatistics;
+ this.logPruner = logPruner;
this.log = logService.getInternalLog( Recovery.class );
}
@@ -161,6 +164,7 @@ public void init() throws Throwable
recoveryToPosition = recoveryFromPosition;
}
}
+ logPruner.prune( recoveryToPosition );
spi.transactionsRecovered( lastTransaction, recoveryToPosition );
startupStatistics.setNumberOfRecoveredTransactions( numberOfRecoveredTransactions );
diff --git a/community/kernel/src/main/java/org/neo4j/kernel/recovery/TransactionLogPruner.java b/community/kernel/src/main/java/org/neo4j/kernel/recovery/TransactionLogPruner.java
new file mode 100644
index 0000000000000..a6a512092afae
--- /dev/null
+++ b/community/kernel/src/main/java/org/neo4j/kernel/recovery/TransactionLogPruner.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.kernel.recovery;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.function.LongConsumer;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.neo4j.io.ByteUnit;
+import org.neo4j.io.fs.FileSystemAbstraction;
+import org.neo4j.io.fs.StoreChannel;
+import org.neo4j.kernel.impl.transaction.log.LogPosition;
+import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
+
+import static java.lang.String.format;
+
+/**
+ * Transaction log pruner used during recovery to truncate all the logs after some specified position, that
+ * recovery threats as corrupted or non-readable.
+ * Transaction log file specified by provided log position will be truncated to provided length, any
+ * subsequent files will be removed.
+ * Any removed or modified log content will be stored in separate corruption logs archive for further analysis and as
+ * an additional safety option to have the possibility to fully restore original logs in a faulty case.
+ */
+public class TransactionLogPruner
+{
+ static final String CORRUPTED_TX_LOGS_FOLDER_NAME = "corrupted-tx-logs";
+ private static final String LOG_FILE_ARCHIVE_PATTERN = "corrupted-logs-%d-%d-%d.zip";
+
+ private final File storeDir;
+ private final PhysicalLogFiles logFiles;
+ private final FileSystemAbstraction fs;
+
+ public TransactionLogPruner( File storeDir, PhysicalLogFiles logFiles, FileSystemAbstraction fs )
+ {
+ this.storeDir = storeDir;
+ this.logFiles = logFiles;
+ this.fs = fs;
+ }
+
+ /**
+ * Prune all transaction logs after provided position. Log version specified in a position will be
+ * truncated to provided byte offset, any subsequent log files will be deleted. Backup copy of removed data will
+ * be stored in separate archive.
+ * @param positionAfterLastRecoveredTransaction position after last recovered transaction
+ * @throws IOException
+ */
+ public void prune( LogPosition positionAfterLastRecoveredTransaction ) throws IOException
+ {
+ long recoveredTransactionLogVersion = positionAfterLastRecoveredTransaction.getLogVersion();
+ long recoveredTransactionOffset = positionAfterLastRecoveredTransaction.getByteOffset();
+ if ( isRecoveredLogCorrupted( recoveredTransactionLogVersion, recoveredTransactionOffset ) ||
+ isNotLastLogFile( recoveredTransactionLogVersion ) )
+ {
+ backupCorruptedContent( recoveredTransactionLogVersion, recoveredTransactionOffset );
+ truncateLogFiles( recoveredTransactionLogVersion, recoveredTransactionOffset );
+ }
+ }
+
+ private void truncateLogFiles( long recoveredTransactionLogVersion, long recoveredTransactionOffset )
+ throws IOException
+ {
+ File lastRecoveredTransactionLog = logFiles.getLogFileForVersion( recoveredTransactionLogVersion );
+ fs.truncate( lastRecoveredTransactionLog, recoveredTransactionOffset );
+ forEachSubsequentLogFile( recoveredTransactionLogVersion,
+ fileIndex -> fs.deleteFile( logFiles.getLogFileForVersion( fileIndex ) ) );
+ }
+
+ private void forEachSubsequentLogFile( long recoveredTransactionLogVersion, LongConsumer action )
+ {
+ long highestLogVersion = logFiles.getHighestLogVersion();
+ for ( long fileIndex = recoveredTransactionLogVersion + 1; fileIndex <= highestLogVersion; fileIndex++ )
+ {
+ action.accept( fileIndex );
+ }
+ }
+
+ private void backupCorruptedContent( long recoveredTransactionLogVersion, long recoveredTransactionOffset )
+ throws IOException
+ {
+ File corruptedLogArchive = getArchiveFile( recoveredTransactionLogVersion, recoveredTransactionOffset );
+ try ( ZipOutputStream recoveryContent = new ZipOutputStream(
+ new BufferedOutputStream( fs.openAsOutputStream( corruptedLogArchive, false ) ) ) )
+ {
+ ByteBuffer zipBuffer = ByteBuffer.allocate( (int) ByteUnit.mebiBytes( 1 ) );
+ copyTransactionLogContent( recoveredTransactionLogVersion, recoveredTransactionOffset, recoveryContent,
+ zipBuffer );
+ forEachSubsequentLogFile( recoveredTransactionLogVersion, fileIndex ->
+ {
+ try
+ {
+ copyTransactionLogContent( fileIndex, 0, recoveryContent, zipBuffer );
+ }
+ catch ( IOException io )
+ {
+ throw new UncheckedIOException( io );
+ }
+ } );
+ }
+ }
+
+ private File getArchiveFile( long recoveredTransactionLogVersion, long recoveredTransactionOffset )
+ {
+ File corruptedLogsFolder = new File( storeDir, CORRUPTED_TX_LOGS_FOLDER_NAME );
+ assert fs.mkdir( corruptedLogsFolder );
+ return new File( corruptedLogsFolder,
+ format( LOG_FILE_ARCHIVE_PATTERN, recoveredTransactionLogVersion, recoveredTransactionOffset,
+ System.currentTimeMillis() ) );
+ }
+
+ private void copyTransactionLogContent( long logFileIndex, long logOffset, ZipOutputStream destination,
+ ByteBuffer byteBuffer ) throws IOException
+ {
+ File logFile = logFiles.getLogFileForVersion( logFileIndex );
+ ZipEntry zipEntry = new ZipEntry( logFile.getName() );
+ destination.putNextEntry( zipEntry );
+ try ( StoreChannel transactionLogChannel = fs.open( logFile, "r" ) )
+ {
+ transactionLogChannel.position( logOffset );
+ while ( transactionLogChannel.read( byteBuffer ) >= 0 )
+ {
+ byteBuffer.flip();
+ destination.write( byteBuffer.array(), byteBuffer.position(), byteBuffer.remaining() );
+ byteBuffer.clear();
+ }
+ }
+ destination.closeEntry();
+ }
+
+ private boolean isNotLastLogFile( long recoveredTransactionLogVersion )
+ {
+ return logFiles.getHighestLogVersion() > recoveredTransactionLogVersion;
+ }
+
+ private boolean isRecoveredLogCorrupted( long recoveredTransactionLogVersion, long recoveredTransactionOffset )
+ {
+ return logFiles.getLogFileForVersion( recoveredTransactionLogVersion ).length() > recoveredTransactionOffset;
+ }
+}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java b/community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java
index befeb4bb7a8e0..82c1fdf7b8b10 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java
@@ -63,6 +63,7 @@
import org.neo4j.kernel.recovery.DefaultRecoverySPI;
import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.kernel.recovery.Recovery.RecoveryApplier;
+import org.neo4j.kernel.recovery.TransactionLogPruner;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.TransactionApplicationMode;
@@ -105,7 +106,8 @@ public class RecoveryTest
@Test
public void shouldRecoverExistingData() throws Exception
{
- final PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), "log", fileSystemRule.get() );
+ File storeDir = this.directory.directory();
+ final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, "log", fileSystemRule.get() );
File file = logFiles.getLogFileForVersion( logVersion );
writeSomeData( file, pair ->
@@ -154,8 +156,8 @@ public void shouldRecoverExistingData() throws Exception
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
AssertableLogProvider logProvider = new AssertableLogProvider( true );
- life.add( new Recovery( new DefaultRecoverySPI( storageEngine, logFiles, fileSystemRule.get(),
- tailScanner, transactionIdStore, txStore, NO_MONITOR )
+ TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
+ life.add( new Recovery( new DefaultRecoverySPI( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
private int nr;
@@ -203,7 +205,7 @@ public boolean visit( CommittedTransactionRepresentation tx ) throws Exception
}
};
}
- }, monitor, new StartupStatisticsProvider(), new SimpleLogService( logProvider ) ) );
+ }, new StartupStatisticsProvider(), logPruner, new SimpleLogService( logProvider ), monitor ) );
life.start();
@@ -221,7 +223,8 @@ public boolean visit( CommittedTransactionRepresentation tx ) throws Exception
@Test
public void shouldSeeThatACleanDatabaseShouldNotRequireRecovery() throws Exception
{
- final PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), "log", fileSystemRule.get() );
+ File storeDir = this.directory.directory();
+ final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, "log", fileSystemRule.get() );
File file = logFiles.getLogFileForVersion( logVersion );
writeSomeData( file, pair ->
@@ -258,15 +261,16 @@ public void shouldSeeThatACleanDatabaseShouldNotRequireRecovery() throws Excepti
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
AssertableLogProvider logProvider = new AssertableLogProvider( true );
- life.add( new Recovery( new DefaultRecoverySPI( storageEngine, logFiles, fileSystemRule.get(),
- tailScanner, transactionIdStore, txStore, NO_MONITOR )
+ TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
+ life.add( new Recovery( new DefaultRecoverySPI( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
@Override
public void startRecovery()
{
fail( "Recovery should not be required" );
}
- }, monitor, new StartupStatisticsProvider(), new SimpleLogService( logProvider ) ));
+ }, new StartupStatisticsProvider(), logPruner, new SimpleLogService( logProvider ),
+ monitor ));
life.start();
@@ -282,7 +286,8 @@ public void startRecovery()
public void shouldTruncateLogAfterSinglePartialTransaction() throws Exception
{
// GIVEN
- final PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), "log", fileSystemRule.get() );
+ File storeDir = this.directory.directory();
+ final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, "log", fileSystemRule.get() );
File file = logFiles.getLogFileForVersion( logVersion );
final LogPositionMarker marker = new LogPositionMarker();
@@ -299,7 +304,7 @@ public void shouldTruncateLogAfterSinglePartialTransaction() throws Exception
} );
// WHEN
- boolean recoveryRequired = recover( logFiles );
+ boolean recoveryRequired = recover( storeDir, logFiles );
// THEN
assertTrue( recoveryRequired );
@@ -310,7 +315,8 @@ public void shouldTruncateLogAfterSinglePartialTransaction() throws Exception
public void shouldTruncateLogAfterLastCompleteTransactionAfterSuccessfullRecovery() throws Exception
{
// GIVEN
- final PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), "log", fileSystemRule.get() );
+ File storeDir = this.directory.directory();
+ final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, "log", fileSystemRule.get() );
File file = logFiles.getLogFileForVersion( logVersion );
final LogPositionMarker marker = new LogPositionMarker();
@@ -331,7 +337,7 @@ public void shouldTruncateLogAfterLastCompleteTransactionAfterSuccessfullRecover
} );
// WHEN
- boolean recoveryRequired = recover( logFiles );
+ boolean recoveryRequired = recover( storeDir, logFiles );
// THEN
assertTrue( recoveryRequired );
@@ -342,7 +348,8 @@ public void shouldTruncateLogAfterLastCompleteTransactionAfterSuccessfullRecover
public void shouldTellTransactionIdStoreAfterSuccessfullRecovery() throws Exception
{
// GIVEN
- final PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), "log", fileSystemRule.get() );
+ File storeDir = this.directory.directory();
+ final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, "log", fileSystemRule.get() );
File file = logFiles.getLogFileForVersion( logVersion );
final LogPositionMarker marker = new LogPositionMarker();
@@ -365,7 +372,7 @@ public void shouldTellTransactionIdStoreAfterSuccessfullRecovery() throws Except
} );
// WHEN
- boolean recoveryRequired = recover( logFiles );
+ boolean recoveryRequired = recover( storeDir, logFiles );
// THEN
assertTrue( recoveryRequired );
@@ -378,7 +385,7 @@ public void shouldTellTransactionIdStoreAfterSuccessfullRecovery() throws Except
assertEquals( marker.getByteOffset(), lastClosedTransaction[2] );
}
- private boolean recover( PhysicalLogFiles logFiles )
+ private boolean recover( File storeDir, PhysicalLogFiles logFiles )
{
LifeSupport life = new LifeSupport();
Recovery.Monitor monitor = mock( Recovery.Monitor.class );
@@ -397,15 +404,16 @@ private boolean recover( PhysicalLogFiles logFiles )
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
AssertableLogProvider logProvider = new AssertableLogProvider( true );
- life.add( new Recovery( new DefaultRecoverySPI( storageEngine, logFiles, fileSystemRule.get(),
- tailScanner, transactionIdStore, txStore, NO_MONITOR )
+ TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
+ life.add( new Recovery( new DefaultRecoverySPI( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
@Override
public void startRecovery()
{
recoveryRequired.set( true );
}
- }, monitor, new StartupStatisticsProvider(), new SimpleLogService( logProvider ) ) );
+ }, new StartupStatisticsProvider(), logPruner, new SimpleLogService( logProvider ),
+ monitor ) );
life.start();
}
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogicalTransactionStoreTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogicalTransactionStoreTest.java
index 740f34010a822..e1a28223b9376 100644
--- a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogicalTransactionStoreTest.java
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/PhysicalLogicalTransactionStoreTest.java
@@ -47,6 +47,7 @@
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.kernel.recovery.Recovery.RecoveryApplier;
+import org.neo4j.kernel.recovery.TransactionLogPruner;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.TransactionApplicationMode;
@@ -197,7 +198,7 @@ public void shouldOpenAndRecoverExistingData() throws Exception
life.add( new BatchingTransactionAppender( logFile, NO_ROTATION, positionCache,
transactionIdStore, BYPASS, DATABASE_HEALTH ) );
AssertableLogProvider logProvider = new AssertableLogProvider( true );
-
+ TransactionLogPruner logPruner = new TransactionLogPruner( testDir, logFiles, fileSystemRule.get() );
life.add( new Recovery( new Recovery.SPI()
{
@Override
@@ -232,10 +233,11 @@ public TransactionCursor getTransactionsInReverseOrder( LogPosition position ) t
}
public void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
- LogPosition positionAfterLastRecoveredTransaction ) throws Exception
+ LogPosition positionAfterLastRecoveredTransaction )
{
}
- }, mock( Recovery.Monitor.class ), new StartupStatisticsProvider(), new SimpleLogService( logProvider ) ) );
+ }, new StartupStatisticsProvider(), logPruner, new SimpleLogService( logProvider ),
+ mock( Recovery.Monitor.class ) ) );
// WHEN
try
diff --git a/community/kernel/src/test/java/org/neo4j/kernel/recovery/TransactionLogPrunerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/recovery/TransactionLogPrunerTest.java
new file mode 100644
index 0000000000000..c3d9521de229d
--- /dev/null
+++ b/community/kernel/src/test/java/org/neo4j/kernel/recovery/TransactionLogPrunerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.kernel.recovery;
+
+import org.apache.commons.io.FilenameUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import org.neo4j.helpers.ArrayUtil;
+import org.neo4j.io.fs.FileUtils;
+import org.neo4j.kernel.impl.storemigration.LogFiles;
+import org.neo4j.kernel.impl.transaction.log.FlushablePositionAwareChannel;
+import org.neo4j.kernel.impl.transaction.log.LogHeaderCache;
+import org.neo4j.kernel.impl.transaction.log.LogPosition;
+import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
+import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
+import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
+import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
+import org.neo4j.kernel.lifecycle.Lifespan;
+import org.neo4j.test.rule.TestDirectory;
+import org.neo4j.test.rule.fs.DefaultFileSystemRule;
+import org.neo4j.test.rule.fs.FileSystemRule;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TransactionLogPrunerTest
+{
+ private static final int SINGLE_LOG_FILE_SIZE = 25;
+ private static final int TOTAL_NUMBER_OF_LOG_FILES = 11;
+ @Rule
+ public TestDirectory testDirectory = TestDirectory.testDirectory();
+ public FileSystemRule fileSystemRule = new DefaultFileSystemRule();
+ private File storeDir;
+ private PhysicalLogFiles logFiles;
+ private TransactionLogPruner logPruner;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ storeDir = testDirectory.graphDbDir();
+ logFiles = new PhysicalLogFiles( storeDir, fileSystemRule );
+ logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule );
+ }
+
+ @Test
+ public void doNotPruneEmptyLogs() throws IOException
+ {
+ logPruner.prune( LogPosition.start( 0 ) );
+ assertTrue( FileUtils.isEmptyDirectory( storeDir ) );
+ }
+
+ @Test
+ public void doNotPruneNonCorruptedLogs() throws IOException
+ {
+ generateTransactionLogFiles();
+
+ long highestLogVersion = logFiles.getHighestLogVersion();
+ long fileSizeBeforePrune = logFiles.getHighestLogFile().length();
+ LogPosition endOfLogsPosition = new LogPosition( highestLogVersion, fileSizeBeforePrune );
+ assertEquals( TOTAL_NUMBER_OF_LOG_FILES - 1, highestLogVersion );
+
+ logPruner.prune( endOfLogsPosition );
+
+ assertEquals( TOTAL_NUMBER_OF_LOG_FILES, storeDir.listFiles( LogFiles.FILENAME_FILTER ).length );
+ assertEquals( fileSizeBeforePrune, logFiles.getHighestLogFile().length() );
+ assertTrue( ArrayUtil.isEmpty( storeDir.listFiles( File::isDirectory ) ) );
+ }
+
+ @Test
+ public void pruneAndArchiveLastLog() throws IOException
+ {
+ generateTransactionLogFiles();
+
+ long highestLogVersion = logFiles.getHighestLogVersion();
+ File highestLogFile = logFiles.getHighestLogFile();
+ long fileSizeBeforePrune = highestLogFile.length();
+ int bytesToPrune = 5;
+ long byteOffset = fileSizeBeforePrune - bytesToPrune;
+ LogPosition prunePosition = new LogPosition( highestLogVersion, byteOffset );
+
+ logPruner.prune( prunePosition );
+
+ assertEquals( TOTAL_NUMBER_OF_LOG_FILES, storeDir.listFiles( LogFiles.FILENAME_FILTER ).length );
+ assertEquals( byteOffset, highestLogFile.length() );
+
+ File corruptedLogsDirectory = new File( storeDir, TransactionLogPruner.CORRUPTED_TX_LOGS_FOLDER_NAME );
+ assertTrue( corruptedLogsDirectory.exists() );
+ File[] files = corruptedLogsDirectory.listFiles();
+ assertEquals( 1, files.length );
+
+ File corruptedLogsArchive = files[0];
+ checkArchiveName( highestLogVersion, byteOffset, corruptedLogsArchive );
+ try ( ZipFile zipFile = new ZipFile( corruptedLogsArchive ) )
+ {
+ assertEquals( 1, zipFile.size() );
+ checkEntryNameAndSize( zipFile, highestLogFile.getName(), bytesToPrune );
+ }
+ }
+
+ @Test
+ public void pruneAndArchiveMultipleLogs() throws IOException
+ {
+ generateTransactionLogFiles();
+
+ long highestCorrectLogFileIndex = 5;
+ File highestCorrectLogFile = logFiles.getLogFileForVersion( highestCorrectLogFileIndex );
+ long fileSizeBeforePrune = highestCorrectLogFile.length();
+ int bytesToPrune = 7;
+ long byteOffset = fileSizeBeforePrune - bytesToPrune;
+ LogPosition prunePosition = new LogPosition( highestCorrectLogFileIndex, byteOffset );
+
+ logPruner.prune( prunePosition );
+
+ assertEquals( 6, storeDir.listFiles( LogFiles.FILENAME_FILTER ).length );
+ assertEquals( byteOffset, highestCorrectLogFile.length() );
+
+ File corruptedLogsDirectory = new File( storeDir, TransactionLogPruner.CORRUPTED_TX_LOGS_FOLDER_NAME );
+ assertTrue( corruptedLogsDirectory.exists() );
+ File[] files = corruptedLogsDirectory.listFiles();
+ assertEquals( 1, files.length );
+
+ File corruptedLogsArchive = files[0];
+ checkArchiveName( highestCorrectLogFileIndex, byteOffset, corruptedLogsArchive );
+ try ( ZipFile zipFile = new ZipFile( corruptedLogsArchive ) )
+ {
+ assertEquals( 6, zipFile.size() );
+ checkEntryNameAndSize( zipFile, highestCorrectLogFile.getName(), bytesToPrune );
+ long nextLogFileIndex = highestCorrectLogFileIndex + 1;
+ int lastFileIndex = TOTAL_NUMBER_OF_LOG_FILES - 1;
+ for ( long index = nextLogFileIndex; index < lastFileIndex; index++ )
+ {
+ checkEntryNameAndSize( zipFile, PhysicalLogFile.DEFAULT_NAME + "." + index, SINGLE_LOG_FILE_SIZE );
+ }
+ checkEntryNameAndSize( zipFile, PhysicalLogFile.DEFAULT_NAME + "." + lastFileIndex,
+ SINGLE_LOG_FILE_SIZE - 1 );
+ }
+ }
+
+ private void checkEntryNameAndSize( ZipFile zipFile, String entryName, int expectedSize ) throws IOException
+ {
+ ZipEntry entry = zipFile.getEntry( entryName );
+ InputStream inputStream = zipFile.getInputStream( entry );
+ int entryBytes = 0;
+ while ( inputStream.read() >= 0 )
+ {
+ entryBytes++;
+ }
+ assertEquals( expectedSize, entryBytes );
+ }
+
+ private void checkArchiveName( long highestLogVersion, long byteOffset, File corruptedLogsArchive )
+ {
+ String name = corruptedLogsArchive.getName();
+ assertTrue( name.startsWith( "corrupted-logs-" + highestLogVersion + "-" + byteOffset ) );
+ assertTrue( FilenameUtils.isExtension( name, "zip" ) );
+ }
+
+ private void generateTransactionLogFiles() throws IOException
+ {
+ try ( Lifespan lifespan = new Lifespan() )
+ {
+ int expectedFileSize = LogHeader.LOG_HEADER_SIZE + 9;
+ PhysicalLogFile logFile = new PhysicalLogFile( fileSystemRule, logFiles, expectedFileSize, () -> 10L,
+ new LogFilesVersionRepository(), PhysicalLogFile.NO_MONITOR, new LogHeaderCache( 100 ) );
+ lifespan.add( logFile );
+ FlushablePositionAwareChannel writer = logFile.getWriter();
+ for ( byte i = 0; i < 107; i++ )
+ {
+ writer.put( i );
+ writer.prepareForFlush();
+ if ( logFile.rotationNeeded() )
+ {
+ logFile.rotate();
+ }
+ }
+ }
+ }
+
+ private static class LogFilesVersionRepository implements LogVersionRepository
+ {
+ private int version;
+
+ @Override
+ public long getCurrentLogVersion()
+ {
+ return version;
+ }
+
+ @Override
+ public long incrementAndGetVersion() throws IOException
+ {
+ return version++;
+ }
+ }
+}