Skip to content

Commit

Permalink
Close LogEntryCursor after log validation. Do not use test class in c…
Browse files Browse the repository at this point in the history
…ode.

Close entry cursor to release file handle after transaction log validation.
Remove usage of test class in tools code.
Introduce utility to open entry cursors for tools.
  • Loading branch information
MishaDemianenko committed May 29, 2017
1 parent 9ded352 commit 15dd0f1
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 64 deletions.
25 changes: 3 additions & 22 deletions community/kernel/src/test/java/org/neo4j/test/LogTestUtils.java
Expand Up @@ -35,7 +35,6 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel; import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel; import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReaderLogVersionBridge;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry; import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader; import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
Expand Down Expand Up @@ -129,40 +128,22 @@ public static void assertLogContains( FileSystemAbstraction fileSystem, String l
} }
} }


/**
* Opens a {@link LogEntryCursor} over all log files found in the storeDirectory
*
* @param fs {@link FileSystemAbstraction} to find {@code storeDirectory} in.
* @param logFiles the physical log files to read from
*/
public static LogEntryCursor openLogs( final FileSystemAbstraction fs, PhysicalLogFiles logFiles )
{
File firstFile = logFiles.getLogFileForVersion( logFiles.getLowestLogVersion() );
return openLogEntryCursor( fs, firstFile, new ReaderLogVersionBridge( fs, logFiles ) );
}

/** /**
* Opens a {@link LogEntryCursor} over one log file * Opens a {@link LogEntryCursor} over one log file
*/ */
public static LogEntryCursor openLog( FileSystemAbstraction fs, File log ) public static LogEntryCursor openLog( FileSystemAbstraction fs, File log )
{
return openLogEntryCursor( fs, log, LogVersionBridge.NO_MORE_CHANNELS );
}

private static LogEntryCursor openLogEntryCursor( FileSystemAbstraction fs, File firstFile,
LogVersionBridge versionBridge )
{ {
StoreChannel channel = null; StoreChannel channel = null;
try try
{ {
channel = fs.open( firstFile, "r" ); channel = fs.open( log, "r" );
ByteBuffer buffer = ByteBuffer.allocate( LogHeader.LOG_HEADER_SIZE ); ByteBuffer buffer = ByteBuffer.allocate( LogHeader.LOG_HEADER_SIZE );
LogHeader header = LogHeaderReader.readLogHeader( buffer, channel, true, firstFile ); LogHeader header = LogHeaderReader.readLogHeader( buffer, channel, true, log );


PhysicalLogVersionedStoreChannel logVersionedChannel = new PhysicalLogVersionedStoreChannel( channel, PhysicalLogVersionedStoreChannel logVersionedChannel = new PhysicalLogVersionedStoreChannel( channel,
header.logVersion, header.logFormatVersion ); header.logVersion, header.logFormatVersion );
ReadableLogChannel logChannel = new ReadAheadLogChannel( logVersionedChannel, ReadableLogChannel logChannel = new ReadAheadLogChannel( logVersionedChannel,
versionBridge, ReadAheadLogChannel.DEFAULT_READ_AHEAD_SIZE ); LogVersionBridge.NO_MORE_CHANNELS, ReadAheadLogChannel.DEFAULT_READ_AHEAD_SIZE );


return new LogEntryCursor( new VersionAwareLogEntryReader<>(), logChannel ); return new LogEntryCursor( new VersionAwareLogEntryReader<>(), logChannel );
} }
Expand Down
13 changes: 7 additions & 6 deletions tools/pom.xml
Expand Up @@ -62,12 +62,6 @@
<version>${project.version}</version> <version>${project.version}</version>
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.neo4j</groupId> <groupId>org.neo4j</groupId>
<artifactId>neo4j-io</artifactId> <artifactId>neo4j-io</artifactId>
Expand Down Expand Up @@ -118,6 +112,13 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies> </dependencies>


<build> <build>
Expand Down
24 changes: 3 additions & 21 deletions tools/src/main/java/org/neo4j/tools/rawstorereader/RsdrMain.java
Expand Up @@ -44,22 +44,13 @@
import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory; import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry; import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.logging.NullLogProvider; import org.neo4j.logging.NullLogProvider;
import org.neo4j.tools.util.TransactionLogUtils;


import static org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory.createPageCache; import static org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory.createPageCache;
import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK; import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK;
import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS; import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader;


/** /**
* Tool to read raw data from various stores. * Tool to read raw data from various stores.
Expand Down Expand Up @@ -288,17 +279,8 @@ private static RecordStore getStore( String fname, NeoStores neoStores )
private static IOCursor<LogEntry> getLogCursor( FileSystemAbstraction fileSystem, String fname, private static IOCursor<LogEntry> getLogCursor( FileSystemAbstraction fileSystem, String fname,
NeoStores neoStores ) throws IOException NeoStores neoStores ) throws IOException
{ {
File file = new File( neoStores.getStoreDir(), fname ); return TransactionLogUtils
StoreChannel fileChannel = fileSystem.open( file, "r" ); .openLogEntryCursor( fileSystem, new File( neoStores.getStoreDir(), fname ), NO_MORE_CHANNELS );
LogHeader logHeader = readLogHeader( ByteBuffer.allocateDirect( LOG_HEADER_SIZE ), fileChannel, false, file );
console.printf( "Logical log version: %s with prev committed tx[%s]%n",
logHeader.logVersion, logHeader.lastCommittedTxId );

PhysicalLogVersionedStoreChannel channel =
new PhysicalLogVersionedStoreChannel( fileChannel, logHeader.logVersion, logHeader.logFormatVersion );
ReadableLogChannel logChannel = new ReadAheadLogChannel( channel, NO_MORE_CHANNELS );
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
return new LogEntryCursor( logEntryReader, logChannel );
} }


private static void readLog( private static void readLog(
Expand Down
37 changes: 22 additions & 15 deletions tools/src/main/java/org/neo4j/tools/txlog/CheckTxLogs.java
Expand Up @@ -43,11 +43,11 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommand; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommand;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.test.LogTestUtils;
import org.neo4j.tools.txlog.checktypes.CheckType; import org.neo4j.tools.txlog.checktypes.CheckType;
import org.neo4j.tools.txlog.checktypes.CheckTypes; import org.neo4j.tools.txlog.checktypes.CheckTypes;


import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;
import static org.neo4j.tools.util.TransactionLogUtils.openLogs;


/** /**
* Tool that verifies consistency of transaction logs. * Tool that verifies consistency of transaction logs.
Expand Down Expand Up @@ -133,31 +133,38 @@ boolean validateCheckPoints( PhysicalLogFiles logFiles, InconsistenciesHandler h
logFileSizes.put( i, fs.getFileSize( logFiles.getLogFileForVersion( i ) ) ); logFileSizes.put( i, fs.getFileSize( logFiles.getLogFileForVersion( i ) ) );
} }


LogEntryCursor logEntryCursor = LogTestUtils.openLogs( fs, logFiles ); try ( LogEntryCursor logEntryCursor = openLogEntryCursor( logFiles ) )
while ( logEntryCursor.next() )
{ {
LogEntry logEntry = logEntryCursor.get(); while ( logEntryCursor.next() )
if ( logEntry instanceof CheckPoint )
{ {
LogPosition logPosition = logEntry.<CheckPoint>as().getLogPosition(); LogEntry logEntry = logEntryCursor.get();
// if the file has been pruned we cannot validate the check point if ( logEntry instanceof CheckPoint )
if ( logPosition.getLogVersion() >= lowestLogVersion )
{ {
long size = logFileSizes.get( logPosition.getLogVersion() ); LogPosition logPosition = logEntry.<CheckPoint>as().getLogPosition();
if ( logPosition.getByteOffset() < 0 || size < 0 || logPosition.getByteOffset() > size ) // if the file has been pruned we cannot validate the check point
if ( logPosition.getLogVersion() >= lowestLogVersion )
{ {
long currentLogVersion = logEntryCursor.getCurrentLogVersion(); long size = logFileSizes.get( logPosition.getLogVersion() );
handler.reportInconsistentCheckPoint( currentLogVersion, logPosition, size ); if ( logPosition.getByteOffset() < 0 || size < 0 || logPosition.getByteOffset() > size )
success = false; {
} long currentLogVersion = logEntryCursor.getCurrentLogVersion();
handler.reportInconsistentCheckPoint( currentLogVersion, logPosition, size );
success = false;
}


}
} }
} }
} }
} }
return success; return success;
} }


LogEntryCursor openLogEntryCursor( PhysicalLogFiles logFiles ) throws IOException
{
return openLogs( fs, logFiles );
}

boolean scan( PhysicalLogFiles logFiles, InconsistenciesHandler handler, CheckType<?,?>... checkTypes ) boolean scan( PhysicalLogFiles logFiles, InconsistenciesHandler handler, CheckType<?,?>... checkTypes )
throws IOException throws IOException
{ {
Expand Down Expand Up @@ -193,7 +200,7 @@ private <C extends Command, R extends AbstractBaseRecord> boolean scan( Physical
boolean validLogs = true; boolean validLogs = true;
long commandsRead = 0; long commandsRead = 0;
long lastSeenTxId = BASE_TX_ID; long lastSeenTxId = BASE_TX_ID;
try ( LogEntryCursor logEntryCursor = LogTestUtils.openLogs( fs, logFiles ) ) try ( LogEntryCursor logEntryCursor = openLogEntryCursor( logFiles ) )
{ {
while ( logEntryCursor.next() ) while ( logEntryCursor.next() )
{ {
Expand Down
76 changes: 76 additions & 0 deletions tools/src/main/java/org/neo4j/tools/util/TransactionLogUtils.java
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.tools.util;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.LogVersionBridge;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReaderLogVersionBridge;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;

import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderReader.readLogHeader;

public class TransactionLogUtils
{
/**
* Opens a {@link LogEntryCursor} over all log files found in the storeDirectory
*
* @param fs {@link FileSystemAbstraction} to find {@code storeDirectory} in.
* @param logFiles the physical log files to read from
*/
public static LogEntryCursor openLogs( final FileSystemAbstraction fs, PhysicalLogFiles logFiles )
throws IOException
{
File firstFile = logFiles.getLogFileForVersion( logFiles.getLowestLogVersion() );
return openLogEntryCursor( fs, firstFile, new ReaderLogVersionBridge( fs, logFiles ) );
}

/**
* Opens a {@link LogEntryCursor} for requested file
*
* @param fileSystem to find {@code file} in.
* @param file file to open
* @param readerLogVersionBridge log version bridge to use
*/
public static LogEntryCursor openLogEntryCursor( FileSystemAbstraction fileSystem, File file,
LogVersionBridge readerLogVersionBridge ) throws IOException
{
StoreChannel fileChannel = fileSystem.open( file, "r" );
LogHeader logHeader = readLogHeader( ByteBuffer.allocateDirect( LOG_HEADER_SIZE ), fileChannel, true, file );
PhysicalLogVersionedStoreChannel channel =
new PhysicalLogVersionedStoreChannel( fileChannel, logHeader.logVersion, logHeader.logFormatVersion );
ReadableLogChannel logChannel = new ReadAheadLogChannel( channel, readerLogVersionBridge );
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
return new LogEntryCursor( logEntryReader, logChannel );
}
}
39 changes: 39 additions & 0 deletions tools/src/test/java/org/neo4j/tools/txlog/CheckTxLogsTest.java
Expand Up @@ -21,9 +21,11 @@


import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
Expand All @@ -41,6 +43,7 @@
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel; import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel; import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel;
Expand All @@ -59,6 +62,7 @@
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
import static org.neo4j.tools.txlog.checktypes.CheckTypes.CHECK_TYPES; import static org.neo4j.tools.txlog.checktypes.CheckTypes.CHECK_TYPES;
import static org.neo4j.tools.txlog.checktypes.CheckTypes.NEO_STORE; import static org.neo4j.tools.txlog.checktypes.CheckTypes.NEO_STORE;
Expand Down Expand Up @@ -905,6 +909,23 @@ public void shouldReportNoInconsistenciesIfTxIdSequenceIsStriclyIncreasingAndHas
assertTrue( handler.txIdSequenceInconsistencies.isEmpty() ); assertTrue( handler.txIdSequenceInconsistencies.isEmpty() );
} }


@Test
public void closeLogEntryCursorAfterValidation() throws IOException
{
ensureLogExists( logFile( 1 ) );
writeCheckPoint( logFile( 2 ), 1, 42 );
LogEntryCursor entryCursor = Mockito.mock( LogEntryCursor.class );

CheckTxLogsWithCustomLogEntryCursor checkTxLogs =
new CheckTxLogsWithCustomLogEntryCursor( System.out, fsRule.get(), entryCursor );
CapturingInconsistenciesHandler handler = new CapturingInconsistenciesHandler();
PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDirectory, fsRule.get() );
boolean logsValidity = checkTxLogs.validateCheckPoints( logFiles, handler );

assertTrue("empty logs should be valid", logsValidity);
verify( entryCursor ).close();
}

private File logFile( long version ) private File logFile( long version )
{ {
fsRule.get().mkdirs( storeDirectory ); fsRule.get().mkdirs( storeDirectory );
Expand Down Expand Up @@ -975,6 +996,24 @@ private FileSystemAbstraction ensureLogExists( File log ) throws IOException
return fs; return fs;
} }


private class CheckTxLogsWithCustomLogEntryCursor extends CheckTxLogs
{

private final LogEntryCursor logEntryCursor;

CheckTxLogsWithCustomLogEntryCursor( PrintStream out, FileSystemAbstraction fs, LogEntryCursor logEntryCursor )
{
super( out, fs );
this.logEntryCursor = logEntryCursor;
}

@Override
LogEntryCursor openLogEntryCursor( PhysicalLogFiles logFiles ) throws IOException
{
return logEntryCursor;
}
}

private static class CapturingInconsistenciesHandler implements InconsistenciesHandler private static class CapturingInconsistenciesHandler implements InconsistenciesHandler
{ {
List<TxIdSequenceInconsistency> txIdSequenceInconsistencies = new ArrayList<>(); List<TxIdSequenceInconsistency> txIdSequenceInconsistencies = new ArrayList<>();
Expand Down

0 comments on commit 15dd0f1

Please sign in to comment.