Skip to content

Commit

Permalink
Allows for optional lenient reading of broken transaction log entries
Browse files Browse the repository at this point in the history
Disabled by default and can only be enabled in DumpLogicalLog for the time being.
Very useful when coming across a transaction log has broken entries or bad sections
in the file. Failing to read an entry will revert back to start of previous read
and one byte forward and try again. This will continue until next successful read
of a log entry happens. Getting to a successful read after a bad secion will perform
additional checks so that data in the read log entry makes sense. These checks
are made on transaction id and time stamps and similar data fields.
  • Loading branch information
tinwelint committed Mar 16, 2017
1 parent 5005910 commit 88f8240
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 25 deletions.
Expand Up @@ -158,6 +158,7 @@
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StoreReadLayer;

import static org.neo4j.kernel.impl.transaction.log.entry.InvalidLogEntryHandler.STRICT;
import static org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategyFactory.fromConfigValue;

public class NeoStoreDataSource implements Lifecycle, IndexProviders
Expand Down Expand Up @@ -448,7 +449,7 @@ public void start() throws IOException
indexConfigStore, updateableSchemaState::clear, legacyIndexTransactionOrdering );

LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader =
new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory() );
new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory(), STRICT );

TransactionIdStore transactionIdStore = dependencies.resolveDependency( TransactionIdStore.class );
LogVersionRepository logVersionRepository = dependencies.resolveDependency( LogVersionRepository.class );
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.storageengine.api.ReadPastEndException;

import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.lang.System.arraycopy;

/**
Expand Down Expand Up @@ -185,4 +186,19 @@ private void compactToBeginningOfBuffer( int remaining )
aheadBuffer.clear();
aheadBuffer.position( remaining );
}

public void setCurrentPosition( long byteOffset ) throws IOException
{
long positionRelativeToAheadBuffer = byteOffset - (channel.position() - aheadBuffer.limit());
if ( positionRelativeToAheadBuffer >= aheadBuffer.limit() || positionRelativeToAheadBuffer < 0 )
{
// Beyond what we currently have buffered
aheadBuffer.position( aheadBuffer.limit() );
channel.position( byteOffset );
}
else
{
aheadBuffer.position( toIntExact( positionRelativeToAheadBuffer ) );
}
}
}
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.entry;

import org.neo4j.kernel.impl.transaction.log.LogPosition;

/**
* Decides what happens to invalid log entries read by {@link LogEntryReader}.
*/
public interface InvalidLogEntryHandler
{
/**
* Allows no invalid log entries.
*/
public static final InvalidLogEntryHandler STRICT = new InvalidLogEntryHandler()
{
};

/**
* Log entry couldn't be read correctly. Could be invalid log entry in the log.
*
* @param e error during reading a log entry.
* @param position {@link LogPosition} of the start of the log entry attempted to be read.
* @return {@code true} if this error is accepted, otherwise {@code false} which means the exception
* causing this will be thrown by the caller.
*/
default boolean handleInvalidEntry( Exception e, LogPosition position )
{ // consider invalid by default
return false;
}

/**
* Tells this handler that, given that there were invalid entries, handler thinks they are OK
* to skip and that one or more entries after a bad section could be read then a certain number
* of bytes contained invalid log data and were therefore skipped. Log entry reading continues
* after this call.
*
* @param bytesSkipped number of bytes skipped.
*/
default void bytesSkipped( long bytesSkipped )
{ // do nothing by default
}
}
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.entry;

import java.util.concurrent.TimeUnit;

import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;

import static java.lang.Math.abs;
import static java.lang.System.currentTimeMillis;

/**
* Sanity checking for read {@link LogEntry log entries}.
*/
class LogEntrySanity
{
private static final long UNREASONABLY_LONG_TIME = TimeUnit.DAYS.toMillis( 30 * 356 /*years*/ );
private static final int UNREASONABLY_HIGH_SERVER_ID = 10_000_000;

static boolean logEntryMakesSense( LogEntry entry )
{
if ( entry == null )
{
return false;
}

if ( entry instanceof IdentifiableLogEntry )
{
IdentifiableLogEntry iEntry = (IdentifiableLogEntry) entry;
entry = iEntry.getEntry();
}

if ( entry instanceof LogEntryStart )
{
return startEntryMakesSense( (LogEntryStart) entry );
}
else if ( entry instanceof LogEntryCommit )
{
return commitEntryMakesSense( (LogEntryCommit) entry );
}
return true;
}

static boolean commitEntryMakesSense( LogEntryCommit entry )
{
return timeMakesSense( entry.getTimeWritten() ) && transactionIdMakesSense( entry );
}

private static boolean transactionIdMakesSense( LogEntryCommit entry )
{
return entry.getTxId() > TransactionIdStore.BASE_TX_ID;
}

static boolean startEntryMakesSense( LogEntryStart entry )
{
return serverIdMakesSense( entry.getLocalId() ) &&
serverIdMakesSense( entry.getMasterId() ) &&
timeMakesSense( entry.getTimeWritten() );
}

private static boolean serverIdMakesSense( int serverId )
{
return serverId >= 0 && serverId < UNREASONABLY_HIGH_SERVER_ID;
}

static boolean timeMakesSense( long time )
{
return abs( currentTimeMillis() - time ) < UNREASONABLY_LONG_TIME;
}
}
Expand Up @@ -155,7 +155,7 @@ public static LogEntryVersion byVersion( byte version )
{
byte flattenedVersion = (byte) -version;

if ( flattenedVersion < LOOKUP_BY_VERSION.length)
if ( flattenedVersion >= 0 && flattenedVersion < LOOKUP_BY_VERSION.length )
{
return LOOKUP_BY_VERSION[flattenedVersion];
}
Expand Down
Expand Up @@ -20,16 +20,17 @@
package org.neo4j.kernel.impl.transaction.log.entry;

import java.io.IOException;

import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogPositionMarker;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.storageengine.api.CommandReaderFactory;
import org.neo4j.storageengine.api.ReadPastEndException;

import static org.neo4j.helpers.Exceptions.launderedException;
import static org.neo4j.helpers.Exceptions.withMessage;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntrySanity.logEntryMakesSense;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion.byVersion;

/**
Expand All @@ -44,15 +45,18 @@
public class VersionAwareLogEntryReader<SOURCE extends ReadableClosablePositionAwareChannel> implements LogEntryReader<SOURCE>
{
private final CommandReaderFactory commandReaderFactory;
private final InvalidLogEntryHandler invalidLogEntryHandler;

public VersionAwareLogEntryReader()
{
this( new RecordStorageCommandReaderFactory() );
this( new RecordStorageCommandReaderFactory(), InvalidLogEntryHandler.STRICT );
}

public VersionAwareLogEntryReader( CommandReaderFactory commandReaderFactory )
public VersionAwareLogEntryReader( CommandReaderFactory commandReaderFactory,
InvalidLogEntryHandler invalidLogEntryHandler )
{
this.commandReaderFactory = commandReaderFactory;
this.invalidLogEntryHandler = invalidLogEntryHandler;
}

@Override
Expand All @@ -61,41 +65,70 @@ public LogEntry readLogEntry( SOURCE channel ) throws IOException
try
{
LogPositionMarker positionMarker = new LogPositionMarker();
channel.getCurrentPosition( positionMarker );
long skipped = 0;
while ( true )
{
LogEntryVersion version = null;
LogEntryParser<LogEntry> entryReader;
channel.getCurrentPosition( positionMarker );

/*
* if the read type is negative than it is actually the log entry version
* so we need to read an extra byte which will contain the type
*/
byte typeCode;
byte versionCode;
try
{
/*
* if the read type is negative than it is actually the log entry version
* so we need to read an extra byte which will contain the type
*/
byte typeCode = channel.get();
byte versionCode = 0;
typeCode = channel.get();
versionCode = 0;
if ( typeCode < 0 )
{
versionCode = typeCode;
typeCode = channel.get();
}

version = byVersion( versionCode );
entryReader = version.entryParser( typeCode );
}
catch ( ReadPastEndException e )
{ // Make these exceptions slip by straight out to the outer handler
throw e;
}

LogEntryVersion version = null;
LogEntryParser<LogEntry> entryReader;
LogEntry entry;
try
{
version = byVersion( versionCode );
entryReader = version.entryParser( typeCode );
entry = entryReader.parse( version, channel, positionMarker, commandReaderFactory );
if ( entry != null && skipped > 0 )
{
// Take extra care when reading an entry in a bad section. Just because entry reading
// didn't throw exception doesn't mean that it's a sane entry.
if ( !logEntryMakesSense( entry ) )
{
throw new IllegalArgumentException( "Log entry " + entry + " which was read after " +
"a bad section of " + skipped + " bytes was read successfully, but " +
"its contents is unrealistic, so treating as part of bad section" );
}
invalidLogEntryHandler.bytesSkipped( skipped );
skipped = 0;
}
}
catch ( Exception e )
{ // Tag all other exceptions with log position and other useful information
LogPosition position = positionMarker.newPosition();
e = withMessage( e, e.getMessage() + ". At position " + position +
" and entry version " + version );

if ( channelSupportsPositioning( channel ) &&
invalidLogEntryHandler.handleInvalidEntry( e, position ) )
{
((ReadAheadLogChannel)channel).setCurrentPosition( positionMarker.getByteOffset() + 1 );
skipped++;
continue;
}
throw launderedException( IOException.class, e );
}

LogEntry entry = entryReader.parse( version, channel, positionMarker, commandReaderFactory );
if ( !entryReader.skip() )
{
return entry;
Expand All @@ -107,4 +140,9 @@ public LogEntry readLogEntry( SOURCE channel ) throws IOException
return null;
}
}

private boolean channelSupportsPositioning( SOURCE channel )
{
return channel instanceof ReadAheadLogChannel;
}
}

0 comments on commit 88f8240

Please sign in to comment.