From 88f8240288b371b187838ad9b3fc48333fbedb08 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Tue, 6 Sep 2016 10:56:35 +0200 Subject: [PATCH] Allows for optional lenient reading of broken transaction log entries Disabled by default and can only be enabled in DumpLogicalLog for the time being. Very useful when coming across a transaction log has broken entries or bad sections in the file. Failing to read an entry will revert back to start of previous read and one byte forward and try again. This will continue until next successful read of a log entry happens. Getting to a successful read after a bad secion will perform additional checks so that data in the read log entry makes sense. These checks are made on transaction id and time stamps and similar data fields. --- .../org/neo4j/kernel/NeoStoreDataSource.java | 3 +- .../transaction/log/ReadAheadChannel.java | 16 ++++ .../log/entry/InvalidLogEntryHandler.java | 60 +++++++++++++ .../transaction/log/entry/LogEntrySanity.java | 87 +++++++++++++++++++ .../log/entry/LogEntryVersion.java | 2 +- .../log/entry/VersionAwareLogEntryReader.java | 70 +++++++++++---- .../org/neo4j/tools/dump/DumpLogicalLog.java | 34 ++++++-- .../dump/LenientInvalidLogEntryHandler.java | 60 +++++++++++++ 8 files changed, 307 insertions(+), 25 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/InvalidLogEntryHandler.java create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/LogEntrySanity.java create mode 100644 tools/src/main/java/org/neo4j/tools/dump/LenientInvalidLogEntryHandler.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java index 4aae6d6bfa418..d9d09332815ad 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java @@ -158,6 +158,7 @@ import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StoreReadLayer; +import static org.neo4j.kernel.impl.transaction.log.entry.InvalidLogEntryHandler.STRICT; import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue; public class NeoStoreDataSource implements Lifecycle, IndexProviders @@ -448,7 +449,7 @@ public void start() throws IOException indexConfigStore, updateableSchemaState::clear, legacyIndexTransactionOrdering ); LogEntryReader logEntryReader = - new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory() ); + new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory(), STRICT ); TransactionIdStore transactionIdStore = dependencies.resolveDependency( TransactionIdStore.class ); LogVersionRepository logVersionRepository = dependencies.resolveDependency( LogVersionRepository.class ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/ReadAheadChannel.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/ReadAheadChannel.java index 5a5a4190d3919..ea4bd4a8d45fb 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/ReadAheadChannel.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/ReadAheadChannel.java @@ -26,6 +26,7 @@ import org.neo4j.storageengine.api.ReadPastEndException; import static java.lang.Math.min; +import static java.lang.Math.toIntExact; import static java.lang.System.arraycopy; /** @@ -185,4 +186,19 @@ private void compactToBeginningOfBuffer( int remaining ) aheadBuffer.clear(); aheadBuffer.position( remaining ); } + + public void setCurrentPosition( long byteOffset ) throws IOException + { + long positionRelativeToAheadBuffer = byteOffset - (channel.position() - aheadBuffer.limit()); + if ( positionRelativeToAheadBuffer >= aheadBuffer.limit() || positionRelativeToAheadBuffer < 0 ) + { + // Beyond what we currently have buffered + aheadBuffer.position( aheadBuffer.limit() ); + channel.position( byteOffset ); + } + else + { + aheadBuffer.position( toIntExact( positionRelativeToAheadBuffer ) ); + } + } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/InvalidLogEntryHandler.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/InvalidLogEntryHandler.java new file mode 100644 index 0000000000000..1133b8e510dd3 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/InvalidLogEntryHandler.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.transaction.log.entry; + +import org.neo4j.kernel.impl.transaction.log.LogPosition; + +/** + * Decides what happens to invalid log entries read by {@link LogEntryReader}. + */ +public interface InvalidLogEntryHandler +{ + /** + * Allows no invalid log entries. + */ + public static final InvalidLogEntryHandler STRICT = new InvalidLogEntryHandler() + { + }; + + /** + * Log entry couldn't be read correctly. Could be invalid log entry in the log. + * + * @param e error during reading a log entry. + * @param position {@link LogPosition} of the start of the log entry attempted to be read. + * @return {@code true} if this error is accepted, otherwise {@code false} which means the exception + * causing this will be thrown by the caller. + */ + default boolean handleInvalidEntry( Exception e, LogPosition position ) + { // consider invalid by default + return false; + } + + /** + * Tells this handler that, given that there were invalid entries, handler thinks they are OK + * to skip and that one or more entries after a bad section could be read then a certain number + * of bytes contained invalid log data and were therefore skipped. Log entry reading continues + * after this call. + * + * @param bytesSkipped number of bytes skipped. + */ + default void bytesSkipped( long bytesSkipped ) + { // do nothing by default + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/LogEntrySanity.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/LogEntrySanity.java new file mode 100644 index 0000000000000..2172dfd74ed4f --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/LogEntrySanity.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.transaction.log.entry; + +import java.util.concurrent.TimeUnit; + +import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; + +import static java.lang.Math.abs; +import static java.lang.System.currentTimeMillis; + +/** + * Sanity checking for read {@link LogEntry log entries}. + */ +class LogEntrySanity +{ + private static final long UNREASONABLY_LONG_TIME = TimeUnit.DAYS.toMillis( 30 * 356 /*years*/ ); + private static final int UNREASONABLY_HIGH_SERVER_ID = 10_000_000; + + static boolean logEntryMakesSense( LogEntry entry ) + { + if ( entry == null ) + { + return false; + } + + if ( entry instanceof IdentifiableLogEntry ) + { + IdentifiableLogEntry iEntry = (IdentifiableLogEntry) entry; + entry = iEntry.getEntry(); + } + + if ( entry instanceof LogEntryStart ) + { + return startEntryMakesSense( (LogEntryStart) entry ); + } + else if ( entry instanceof LogEntryCommit ) + { + return commitEntryMakesSense( (LogEntryCommit) entry ); + } + return true; + } + + static boolean commitEntryMakesSense( LogEntryCommit entry ) + { + return timeMakesSense( entry.getTimeWritten() ) && transactionIdMakesSense( entry ); + } + + private static boolean transactionIdMakesSense( LogEntryCommit entry ) + { + return entry.getTxId() > TransactionIdStore.BASE_TX_ID; + } + + static boolean startEntryMakesSense( LogEntryStart entry ) + { + return serverIdMakesSense( entry.getLocalId() ) && + serverIdMakesSense( entry.getMasterId() ) && + timeMakesSense( entry.getTimeWritten() ); + } + + private static boolean serverIdMakesSense( int serverId ) + { + return serverId >= 0 && serverId < UNREASONABLY_HIGH_SERVER_ID; + } + + static boolean timeMakesSense( long time ) + { + return abs( currentTimeMillis() - time ) < UNREASONABLY_LONG_TIME; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/LogEntryVersion.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/LogEntryVersion.java index 8ecf1e6a98e78..c2727bdccf5b5 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/LogEntryVersion.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/LogEntryVersion.java @@ -155,7 +155,7 @@ public static LogEntryVersion byVersion( byte version ) { byte flattenedVersion = (byte) -version; - if ( flattenedVersion < LOOKUP_BY_VERSION.length) + if ( flattenedVersion >= 0 && flattenedVersion < LOOKUP_BY_VERSION.length ) { return LOOKUP_BY_VERSION[flattenedVersion]; } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/VersionAwareLogEntryReader.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/VersionAwareLogEntryReader.java index 46e9390a8063e..b8919f1439d30 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/VersionAwareLogEntryReader.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/entry/VersionAwareLogEntryReader.java @@ -20,16 +20,17 @@ package org.neo4j.kernel.impl.transaction.log.entry; import java.io.IOException; - import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory; import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPositionMarker; +import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel; import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import org.neo4j.storageengine.api.CommandReaderFactory; import org.neo4j.storageengine.api.ReadPastEndException; import static org.neo4j.helpers.Exceptions.launderedException; import static org.neo4j.helpers.Exceptions.withMessage; +import static org.neo4j.kernel.impl.transaction.log.entry.LogEntrySanity.logEntryMakesSense; import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion.byVersion; /** @@ -44,15 +45,18 @@ public class VersionAwareLogEntryReader implements LogEntryReader { private final CommandReaderFactory commandReaderFactory; + private final InvalidLogEntryHandler invalidLogEntryHandler; public VersionAwareLogEntryReader() { - this( new RecordStorageCommandReaderFactory() ); + this( new RecordStorageCommandReaderFactory(), InvalidLogEntryHandler.STRICT ); } - public VersionAwareLogEntryReader( CommandReaderFactory commandReaderFactory ) + public VersionAwareLogEntryReader( CommandReaderFactory commandReaderFactory, + InvalidLogEntryHandler invalidLogEntryHandler ) { this.commandReaderFactory = commandReaderFactory; + this.invalidLogEntryHandler = invalidLogEntryHandler; } @Override @@ -61,41 +65,70 @@ public LogEntry readLogEntry( SOURCE channel ) throws IOException try { LogPositionMarker positionMarker = new LogPositionMarker(); - channel.getCurrentPosition( positionMarker ); + long skipped = 0; while ( true ) { - LogEntryVersion version = null; - LogEntryParser entryReader; + channel.getCurrentPosition( positionMarker ); + + /* + * if the read type is negative than it is actually the log entry version + * so we need to read an extra byte which will contain the type + */ + byte typeCode; + byte versionCode; try { - /* - * if the read type is negative than it is actually the log entry version - * so we need to read an extra byte which will contain the type - */ - byte typeCode = channel.get(); - byte versionCode = 0; + typeCode = channel.get(); + versionCode = 0; if ( typeCode < 0 ) { versionCode = typeCode; typeCode = channel.get(); } - - version = byVersion( versionCode ); - entryReader = version.entryParser( typeCode ); } catch ( ReadPastEndException e ) { // Make these exceptions slip by straight out to the outer handler throw e; } + + LogEntryVersion version = null; + LogEntryParser entryReader; + LogEntry entry; + try + { + version = byVersion( versionCode ); + entryReader = version.entryParser( typeCode ); + entry = entryReader.parse( version, channel, positionMarker, commandReaderFactory ); + if ( entry != null && skipped > 0 ) + { + // Take extra care when reading an entry in a bad section. Just because entry reading + // didn't throw exception doesn't mean that it's a sane entry. + if ( !logEntryMakesSense( entry ) ) + { + throw new IllegalArgumentException( "Log entry " + entry + " which was read after " + + "a bad section of " + skipped + " bytes was read successfully, but " + + "its contents is unrealistic, so treating as part of bad section" ); + } + invalidLogEntryHandler.bytesSkipped( skipped ); + skipped = 0; + } + } catch ( Exception e ) { // Tag all other exceptions with log position and other useful information LogPosition position = positionMarker.newPosition(); e = withMessage( e, e.getMessage() + ". At position " + position + " and entry version " + version ); + + if ( channelSupportsPositioning( channel ) && + invalidLogEntryHandler.handleInvalidEntry( e, position ) ) + { + ((ReadAheadLogChannel)channel).setCurrentPosition( positionMarker.getByteOffset() + 1 ); + skipped++; + continue; + } throw launderedException( IOException.class, e ); } - LogEntry entry = entryReader.parse( version, channel, positionMarker, commandReaderFactory ); if ( !entryReader.skip() ) { return entry; @@ -107,4 +140,9 @@ public LogEntry readLogEntry( SOURCE channel ) throws IOException return null; } } + + private boolean channelSupportsPositioning( SOURCE channel ) + { + return channel instanceof ReadAheadLogChannel; + } } diff --git a/tools/src/main/java/org/neo4j/tools/dump/DumpLogicalLog.java b/tools/src/main/java/org/neo4j/tools/dump/DumpLogicalLog.java index 76fc39a56e14d..a1dbf97201a88 100644 --- a/tools/src/main/java/org/neo4j/tools/dump/DumpLogicalLog.java +++ b/tools/src/main/java/org/neo4j/tools/dump/DumpLogicalLog.java @@ -36,6 +36,7 @@ import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.StoreChannel; +import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory; import org.neo4j.kernel.impl.transaction.command.Command.NodeCommand; import org.neo4j.kernel.impl.transaction.command.Command.PropertyCommand; import org.neo4j.kernel.impl.transaction.command.Command.RelationshipCommand; @@ -50,6 +51,7 @@ import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import org.neo4j.kernel.impl.transaction.log.ReaderLogVersionBridge; import org.neo4j.kernel.impl.transaction.log.TransactionLogEntryCursor; +import org.neo4j.kernel.impl.transaction.log.entry.InvalidLogEntryHandler; import org.neo4j.kernel.impl.transaction.log.entry.LogEntry; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommand; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; @@ -74,6 +76,7 @@ public class DumpLogicalLog private static final String TO_FILE = "tofile"; private static final String TX_FILTER = "txfilter"; private static final String CC_FILTER = "ccfilter"; + private static final String LENIENT = "lenient"; private final FileSystemAbstraction fileSystem; @@ -83,7 +86,8 @@ public DumpLogicalLog( FileSystemAbstraction fileSystem ) } public void dump( String filenameOrDirectory, PrintStream out, - Predicate filter, Function serializer ) throws IOException + Predicate filter, Function serializer, + InvalidLogEntryHandler invalidLogEntryHandler ) throws IOException { File file = new File( filenameOrDirectory ); printFile( file, out ); @@ -135,8 +139,10 @@ public LogVersionedStoreChannel next( LogVersionedStoreChannel channel ) throws PhysicalLogVersionedStoreChannel channel = new PhysicalLogVersionedStoreChannel( fileChannel, logHeader.logVersion, logHeader.logFormatVersion ); - ReadableClosablePositionAwareChannel logChannel = new ReadAheadLogChannel( channel, bridge, DEFAULT_READ_AHEAD_SIZE ); - LogEntryReader entryReader = new VersionAwareLogEntryReader<>(); + ReadableClosablePositionAwareChannel logChannel = new ReadAheadLogChannel( channel, bridge, + DEFAULT_READ_AHEAD_SIZE ); + LogEntryReader entryReader = new VersionAwareLogEntryReader<>( + new RecordStorageCommandReaderFactory(), invalidLogEntryHandler ); IOCursor entryCursor = new LogEntryCursor( entryReader, logChannel ); TransactionLogEntryCursor transactionCursor = new TransactionLogEntryCursor( entryCursor ); @@ -281,7 +287,7 @@ public String apply( LogEntry logEntry ) } /** - * Usage: [--txfilter "regex"] [--ccfilter cc-report-file] [--tofile] storeDirOrFile1 storeDirOrFile2 ... + * Usage: [--txfilter "regex"] [--ccfilter cc-report-file] [--tofile] [--lenient] storeDirOrFile1 storeDirOrFile2 ... * * --txfilter * Will match regex against each {@link LogEntry} and if there is a match, @@ -294,23 +300,37 @@ public String apply( LogEntry logEntry ) * * --tofile * Redirects output to dump-logical-log.txt in the store directory + * + * --lenient + * Will attempt to read log entries even if some look broken along the way */ public static void main( String args[] ) throws IOException { - Args arguments = Args.withFlags( TO_FILE ).parse( args ); + Args arguments = Args.withFlags( TO_FILE, LENIENT ).parse( args ); TimeZone timeZone = parseTimeZoneConfig( arguments ); Predicate filter = parseFilter( arguments, timeZone ); Function serializer = parseSerializer( filter, timeZone ); + Function invalidLogEntryHandler = parseInvalidLogEntryHandler( arguments ); try ( Printer printer = getPrinter( arguments ) ) { for ( String fileAsString : arguments.orphans() ) { - new DumpLogicalLog( new DefaultFileSystemAbstraction() ) - .dump( fileAsString, printer.getFor( fileAsString ), filter, serializer ); + PrintStream out = printer.getFor( fileAsString ); + new DumpLogicalLog( new DefaultFileSystemAbstraction() ).dump( fileAsString, out, filter, serializer, + invalidLogEntryHandler.apply( out ) ); } } } + private static Function parseInvalidLogEntryHandler( Args arguments ) + { + if ( arguments.getBoolean( LENIENT ) ) + { + return out -> new LenientInvalidLogEntryHandler( out ); + } + return out -> InvalidLogEntryHandler.STRICT; + } + @SuppressWarnings( "unchecked" ) private static Function parseSerializer( Predicate filter, TimeZone timeZone ) { diff --git a/tools/src/main/java/org/neo4j/tools/dump/LenientInvalidLogEntryHandler.java b/tools/src/main/java/org/neo4j/tools/dump/LenientInvalidLogEntryHandler.java new file mode 100644 index 0000000000000..65f2d3bf03db2 --- /dev/null +++ b/tools/src/main/java/org/neo4j/tools/dump/LenientInvalidLogEntryHandler.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.tools.dump; + +import java.io.PrintStream; + +import org.neo4j.kernel.impl.transaction.log.LogPosition; +import org.neo4j.kernel.impl.transaction.log.entry.InvalidLogEntryHandler; + +/** + * Is less strict with invalid log entries, allowing reader to try and read through bad sections. + * Prints problems along the way. + */ +class LenientInvalidLogEntryHandler implements InvalidLogEntryHandler +{ + private final PrintStream out; + + LenientInvalidLogEntryHandler( PrintStream out ) + { + this.out = out; + } + + @Override + public boolean handleInvalidVersion( byte typeCode, byte versionCode ) + { + out.println( "Read header of an entry which doesn't make sense version:" + versionCode + + ", type:" + typeCode + " will go one byte ahead and try again" ); + return true; + } + + @Override + public boolean handleInvalidEntry( Exception e, LogPosition position ) + { + out.println( "Read broken entry error:" + e + " will go one byte ahead and try again" ); + return true; + } + + @Override + public void bytesSkipped( long bytesSkipped ) + { + out.println( "... skipped " + bytesSkipped + " bytes of weird tx log data" ); + } +}