From 15dd0f16b0e84371da8c8a2a79a8482990f3c5f9 Mon Sep 17 00:00:00 2001 From: MishaDemianenko Date: Mon, 29 May 2017 20:19:21 +0200 Subject: [PATCH] Close LogEntryCursor after log validation. Do not use test class in code. Close entry cursor to release file handle after transaction log validation. Remove usage of test class in tools code. Introduce utility to open entry cursors for tools. --- .../java/org/neo4j/test/LogTestUtils.java | 25 +----- tools/pom.xml | 13 ++-- .../neo4j/tools/rawstorereader/RsdrMain.java | 24 +----- .../org/neo4j/tools/txlog/CheckTxLogs.java | 37 +++++---- .../neo4j/tools/util/TransactionLogUtils.java | 76 +++++++++++++++++++ .../neo4j/tools/txlog/CheckTxLogsTest.java | 39 ++++++++++ 6 files changed, 150 insertions(+), 64 deletions(-) create mode 100644 tools/src/main/java/org/neo4j/tools/util/TransactionLogUtils.java diff --git a/community/kernel/src/test/java/org/neo4j/test/LogTestUtils.java b/community/kernel/src/test/java/org/neo4j/test/LogTestUtils.java index 89c491b4fa10f..beb6af0f60f65 100644 --- a/community/kernel/src/test/java/org/neo4j/test/LogTestUtils.java +++ b/community/kernel/src/test/java/org/neo4j/test/LogTestUtils.java @@ -35,7 +35,6 @@ import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel; import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel; import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; -import org.neo4j.kernel.impl.transaction.log.ReaderLogVersionBridge; import org.neo4j.kernel.impl.transaction.log.entry.LogEntry; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; @@ -129,40 +128,22 @@ public static void assertLogContains( FileSystemAbstraction fileSystem, String l } } - /** - * Opens a {@link LogEntryCursor} over all log files found in the storeDirectory - * - * @param fs {@link FileSystemAbstraction} to find {@code storeDirectory} in. - * @param logFiles the physical log files to read from - */ - public static LogEntryCursor openLogs( final FileSystemAbstraction fs, PhysicalLogFiles logFiles ) - { - File firstFile = logFiles.getLogFileForVersion( logFiles.getLowestLogVersion() ); - return openLogEntryCursor( fs, firstFile, new ReaderLogVersionBridge( fs, logFiles ) ); - } - /** * Opens a {@link LogEntryCursor} over one log file */ public static LogEntryCursor openLog( FileSystemAbstraction fs, File log ) - { - return openLogEntryCursor( fs, log, LogVersionBridge.NO_MORE_CHANNELS ); - } - - private static LogEntryCursor openLogEntryCursor( FileSystemAbstraction fs, File firstFile, - LogVersionBridge versionBridge ) { StoreChannel channel = null; try { - channel = fs.open( firstFile, "r" ); + channel = fs.open( log, "r" ); ByteBuffer buffer = ByteBuffer.allocate( LogHeader.LOG_HEADER_SIZE ); - LogHeader header = LogHeaderReader.readLogHeader( buffer, channel, true, firstFile ); + LogHeader header = LogHeaderReader.readLogHeader( buffer, channel, true, log ); PhysicalLogVersionedStoreChannel logVersionedChannel = new PhysicalLogVersionedStoreChannel( channel, header.logVersion, header.logFormatVersion ); ReadableLogChannel logChannel = new ReadAheadLogChannel( logVersionedChannel, - versionBridge, ReadAheadLogChannel.DEFAULT_READ_AHEAD_SIZE ); + LogVersionBridge.NO_MORE_CHANNELS, ReadAheadLogChannel.DEFAULT_READ_AHEAD_SIZE ); return new LogEntryCursor( new VersionAwareLogEntryReader<>(), logChannel ); } diff --git a/tools/pom.xml b/tools/pom.xml index a38eab8fe30c4..bf6da2adb900c 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -62,12 +62,6 @@ ${project.version} test-jar - - org.neo4j - neo4j-kernel - ${project.version} - test-jar - org.neo4j neo4j-io @@ -118,6 +112,13 @@ mockito-core test + + org.neo4j + neo4j-kernel + ${project.version} + test + test-jar + diff --git a/tools/src/main/java/org/neo4j/tools/rawstorereader/RsdrMain.java b/tools/src/main/java/org/neo4j/tools/rawstorereader/RsdrMain.java index f544b5a7202b2..274840c05f19b 100644 --- a/tools/src/main/java/org/neo4j/tools/rawstorereader/RsdrMain.java +++ b/tools/src/main/java/org/neo4j/tools/rawstorereader/RsdrMain.java @@ -44,22 +44,13 @@ import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; -import org.neo4j.kernel.impl.transaction.log.LogEntryCursor; -import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel; -import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel; -import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; -import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; import org.neo4j.kernel.impl.transaction.log.entry.LogEntry; -import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; -import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; -import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader; import org.neo4j.logging.NullLogProvider; +import org.neo4j.tools.util.TransactionLogUtils; import static org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory.createPageCache; import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK; import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS; -import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; -import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader; /** * Tool to read raw data from various stores. @@ -288,17 +279,8 @@ private static RecordStore getStore( String fname, NeoStores neoStores ) private static IOCursor getLogCursor( FileSystemAbstraction fileSystem, String fname, NeoStores neoStores ) throws IOException { - File file = new File( neoStores.getStoreDir(), fname ); - StoreChannel fileChannel = fileSystem.open( file, "r" ); - LogHeader logHeader = readLogHeader( ByteBuffer.allocateDirect( LOG_HEADER_SIZE ), fileChannel, false, file ); - console.printf( "Logical log version: %s with prev committed tx[%s]%n", - logHeader.logVersion, logHeader.lastCommittedTxId ); - - PhysicalLogVersionedStoreChannel channel = - new PhysicalLogVersionedStoreChannel( fileChannel, logHeader.logVersion, logHeader.logFormatVersion ); - ReadableLogChannel logChannel = new ReadAheadLogChannel( channel, NO_MORE_CHANNELS ); - LogEntryReader logEntryReader = new VersionAwareLogEntryReader<>(); - return new LogEntryCursor( logEntryReader, logChannel ); + return TransactionLogUtils + .openLogEntryCursor( fileSystem, new File( neoStores.getStoreDir(), fname ), NO_MORE_CHANNELS ); } private static void readLog( diff --git a/tools/src/main/java/org/neo4j/tools/txlog/CheckTxLogs.java b/tools/src/main/java/org/neo4j/tools/txlog/CheckTxLogs.java index efcc5098fbdbc..e5dbf533ffe13 100644 --- a/tools/src/main/java/org/neo4j/tools/txlog/CheckTxLogs.java +++ b/tools/src/main/java/org/neo4j/tools/txlog/CheckTxLogs.java @@ -43,11 +43,11 @@ import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommand; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit; import org.neo4j.storageengine.api.StorageCommand; -import org.neo4j.test.LogTestUtils; import org.neo4j.tools.txlog.checktypes.CheckType; import org.neo4j.tools.txlog.checktypes.CheckTypes; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; +import static org.neo4j.tools.util.TransactionLogUtils.openLogs; /** * Tool that verifies consistency of transaction logs. @@ -133,24 +133,26 @@ boolean validateCheckPoints( PhysicalLogFiles logFiles, InconsistenciesHandler h logFileSizes.put( i, fs.getFileSize( logFiles.getLogFileForVersion( i ) ) ); } - LogEntryCursor logEntryCursor = LogTestUtils.openLogs( fs, logFiles ); - while ( logEntryCursor.next() ) + try ( LogEntryCursor logEntryCursor = openLogEntryCursor( logFiles ) ) { - LogEntry logEntry = logEntryCursor.get(); - if ( logEntry instanceof CheckPoint ) + while ( logEntryCursor.next() ) { - LogPosition logPosition = logEntry.as().getLogPosition(); - // if the file has been pruned we cannot validate the check point - if ( logPosition.getLogVersion() >= lowestLogVersion ) + LogEntry logEntry = logEntryCursor.get(); + if ( logEntry instanceof CheckPoint ) { - long size = logFileSizes.get( logPosition.getLogVersion() ); - if ( logPosition.getByteOffset() < 0 || size < 0 || logPosition.getByteOffset() > size ) + LogPosition logPosition = logEntry.as().getLogPosition(); + // if the file has been pruned we cannot validate the check point + if ( logPosition.getLogVersion() >= lowestLogVersion ) { - long currentLogVersion = logEntryCursor.getCurrentLogVersion(); - handler.reportInconsistentCheckPoint( currentLogVersion, logPosition, size ); - success = false; - } + long size = logFileSizes.get( logPosition.getLogVersion() ); + if ( logPosition.getByteOffset() < 0 || size < 0 || logPosition.getByteOffset() > size ) + { + long currentLogVersion = logEntryCursor.getCurrentLogVersion(); + handler.reportInconsistentCheckPoint( currentLogVersion, logPosition, size ); + success = false; + } + } } } } @@ -158,6 +160,11 @@ boolean validateCheckPoints( PhysicalLogFiles logFiles, InconsistenciesHandler h return success; } + LogEntryCursor openLogEntryCursor( PhysicalLogFiles logFiles ) throws IOException + { + return openLogs( fs, logFiles ); + } + boolean scan( PhysicalLogFiles logFiles, InconsistenciesHandler handler, CheckType... checkTypes ) throws IOException { @@ -193,7 +200,7 @@ private boolean scan( Physical boolean validLogs = true; long commandsRead = 0; long lastSeenTxId = BASE_TX_ID; - try ( LogEntryCursor logEntryCursor = LogTestUtils.openLogs( fs, logFiles ) ) + try ( LogEntryCursor logEntryCursor = openLogEntryCursor( logFiles ) ) { while ( logEntryCursor.next() ) { diff --git a/tools/src/main/java/org/neo4j/tools/util/TransactionLogUtils.java b/tools/src/main/java/org/neo4j/tools/util/TransactionLogUtils.java new file mode 100644 index 0000000000000..cad19b2396d2c --- /dev/null +++ b/tools/src/main/java/org/neo4j/tools/util/TransactionLogUtils.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.tools.util; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.fs.StoreChannel; +import org.neo4j.kernel.impl.transaction.log.LogEntryCursor; +import org.neo4j.kernel.impl.transaction.log.LogVersionBridge; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; +import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel; +import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel; +import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; +import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; +import org.neo4j.kernel.impl.transaction.log.ReaderLogVersionBridge; +import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; +import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; +import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader; + +import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; +import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader; + +public class TransactionLogUtils +{ + /** + * Opens a {@link LogEntryCursor} over all log files found in the storeDirectory + * + * @param fs {@link FileSystemAbstraction} to find {@code storeDirectory} in. + * @param logFiles the physical log files to read from + */ + public static LogEntryCursor openLogs( final FileSystemAbstraction fs, PhysicalLogFiles logFiles ) + throws IOException + { + File firstFile = logFiles.getLogFileForVersion( logFiles.getLowestLogVersion() ); + return openLogEntryCursor( fs, firstFile, new ReaderLogVersionBridge( fs, logFiles ) ); + } + + /** + * Opens a {@link LogEntryCursor} for requested file + * + * @param fileSystem to find {@code file} in. + * @param file file to open + * @param readerLogVersionBridge log version bridge to use + */ + public static LogEntryCursor openLogEntryCursor( FileSystemAbstraction fileSystem, File file, + LogVersionBridge readerLogVersionBridge ) throws IOException + { + StoreChannel fileChannel = fileSystem.open( file, "r" ); + LogHeader logHeader = readLogHeader( ByteBuffer.allocateDirect( LOG_HEADER_SIZE ), fileChannel, true, file ); + PhysicalLogVersionedStoreChannel channel = + new PhysicalLogVersionedStoreChannel( fileChannel, logHeader.logVersion, logHeader.logFormatVersion ); + ReadableLogChannel logChannel = new ReadAheadLogChannel( channel, readerLogVersionBridge ); + LogEntryReader logEntryReader = new VersionAwareLogEntryReader<>(); + return new LogEntryCursor( logEntryReader, logChannel ); + } +} diff --git a/tools/src/test/java/org/neo4j/tools/txlog/CheckTxLogsTest.java b/tools/src/test/java/org/neo4j/tools/txlog/CheckTxLogsTest.java index 2ef8c56a53573..d230acc1f595e 100644 --- a/tools/src/test/java/org/neo4j/tools/txlog/CheckTxLogsTest.java +++ b/tools/src/test/java/org/neo4j/tools/txlog/CheckTxLogsTest.java @@ -21,9 +21,11 @@ import org.junit.Rule; import org.junit.Test; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -41,6 +43,7 @@ import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.kernel.impl.transaction.command.Command; +import org.neo4j.kernel.impl.transaction.log.LogEntryCursor; import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel; import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel; @@ -59,6 +62,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; import static org.neo4j.tools.txlog.checktypes.CheckTypes.CHECK_TYPES; import static org.neo4j.tools.txlog.checktypes.CheckTypes.NEO_STORE; @@ -905,6 +909,23 @@ public void shouldReportNoInconsistenciesIfTxIdSequenceIsStriclyIncreasingAndHas assertTrue( handler.txIdSequenceInconsistencies.isEmpty() ); } + @Test + public void closeLogEntryCursorAfterValidation() throws IOException + { + ensureLogExists( logFile( 1 ) ); + writeCheckPoint( logFile( 2 ), 1, 42 ); + LogEntryCursor entryCursor = Mockito.mock( LogEntryCursor.class ); + + CheckTxLogsWithCustomLogEntryCursor checkTxLogs = + new CheckTxLogsWithCustomLogEntryCursor( System.out, fsRule.get(), entryCursor ); + CapturingInconsistenciesHandler handler = new CapturingInconsistenciesHandler(); + PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDirectory, fsRule.get() ); + boolean logsValidity = checkTxLogs.validateCheckPoints( logFiles, handler ); + + assertTrue("empty logs should be valid", logsValidity); + verify( entryCursor ).close(); + } + private File logFile( long version ) { fsRule.get().mkdirs( storeDirectory ); @@ -975,6 +996,24 @@ private FileSystemAbstraction ensureLogExists( File log ) throws IOException return fs; } + private class CheckTxLogsWithCustomLogEntryCursor extends CheckTxLogs + { + + private final LogEntryCursor logEntryCursor; + + CheckTxLogsWithCustomLogEntryCursor( PrintStream out, FileSystemAbstraction fs, LogEntryCursor logEntryCursor ) + { + super( out, fs ); + this.logEntryCursor = logEntryCursor; + } + + @Override + LogEntryCursor openLogEntryCursor( PhysicalLogFiles logFiles ) throws IOException + { + return logEntryCursor; + } + } + private static class CapturingInconsistenciesHandler implements InconsistenciesHandler { List txIdSequenceInconsistencies = new ArrayList<>();