Skip to content

Commit

Permalink
More flexible support for reading multiple versions of log entries
Browse files Browse the repository at this point in the history
Being able to read log entries from different versions is great since it
makes migration and rolling upgrades easier, sometimes even transparent.
Currently Neo4j supports reading of multiple versions, although not quite.

This commit will clean up and greatly simplify the way log entry
versioning works as a whole, to go from this:

- Each log has a header specifying format version. A format version is
  really the format version of the commands contained in the log, where all
  commands, i.e. all transactions are assumed to have the same version.
- Each log entry (START,COMMIT,...) has a version which specifies in which
  format the entry data is in (not the command data, which is still
  controlled by the header of the log).
- Code full of log reader factories and dispatchers to try to be dynamic
  in selecting which log entry reader to use. Even though it looked
  dynamic any given log could only contain COMMAND log entries of one
  version.
- Confused class naming where log entry parsers where named after
  header format version.

to something like:

- Each log entry (START,COMMIT,...) has a version which specifies in which
  format the entry data is in (including command data).
- Get appropriate reader(s) from LogEntryVersion.byVersion()

LogEntryVersion also has a more detailed summary of how these things
work now.

After this commit any log can contain any number of log entries all with
different versions, mixed if so necessary. This allows for basically
seamless (non-)migration from one version to another if the only change in
format is in the log format. Rolling upgrades work by adhering to the
usual rules of master upgrading last and only writes through master during
rolling upgrade.
  • Loading branch information
tinwelint authored and digitalstain committed Jul 27, 2015
1 parent ffcd017 commit d7bc110
Show file tree
Hide file tree
Showing 77 changed files with 1,013 additions and 1,754 deletions.
Expand Up @@ -116,6 +116,7 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFileInformation;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableVersionableLogChannel;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
Expand All @@ -128,8 +129,8 @@
import org.neo4j.kernel.impl.transaction.log.checkpoint.TimeCheckPointThreshold;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReaderFactory;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategy;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruning;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruningImpl;
Expand Down Expand Up @@ -758,8 +759,7 @@ public long getTimestampForVersion( long version ) throws IOException
LogPosition position = LogPosition.start( version );
try ( ReadableVersionableLogChannel channel = logFile.getReader( position ) )
{
final LogEntryReader<ReadableVersionableLogChannel> reader =
new LogEntryReaderFactory().versionable();
final LogEntryReader<ReadableVersionableLogChannel> reader = new VersionAwareLogEntryReader<>();
LogEntry entry;
while ( (entry = reader.readLogEntry( channel )) != null )
{
Expand Down Expand Up @@ -903,7 +903,7 @@ private void buildRecovery( final FileSystemAbstraction fileSystemAbstraction, C
RecoveryVisitor recoveryVisitor =
new RecoveryVisitor( neoStore, storeRecoverer, indexUpdatesValidator, recoveryVisitorMonitor );

LogEntryReader<ReadableVersionableLogChannel> logEntryReader = new LogEntryReaderFactory().versionable();
LogEntryReader<ReadableLogChannel> logEntryReader = new VersionAwareLogEntryReader<>();
final Visitor<LogVersionedStoreChannel,IOException> logFileRecoverer =
new LogFileRecoverer( logEntryReader, recoveryVisitor );

Expand Down
Expand Up @@ -30,9 +30,10 @@
import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableVersionableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReaderFactory;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.recovery.LatestCheckPointFinder;
import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -72,7 +73,7 @@ public boolean recoveryNeededAt( File dataDir, long currentLogVersion ) throws I

PhysicalLogFiles logFiles = new PhysicalLogFiles( dataDir, fs );

LogEntryReader<ReadableVersionableLogChannel> reader = new LogEntryReaderFactory().versionable();
LogEntryReader<ReadableLogChannel> reader = new VersionAwareLogEntryReader<>( LogEntryVersion.CURRENT.byteCode() );

LatestCheckPointFinder finder = new LatestCheckPointFinder( logFiles, fs, reader );

Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.IOCursor;
import org.neo4j.kernel.impl.transaction.log.LogDeserializer;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
Expand Down Expand Up @@ -308,7 +308,7 @@ private IOCursor<LogEntry> logEntryCursor( LogFile file, ByteBuffer buffer ) thr

private static IOCursor<LogEntry> logEntryCursor( StoreChannel channel, LogHeader header ) throws IOException
{
return new LogDeserializer().logEntries( new ReadAheadLogChannel(
return new LogEntryCursor( new ReadAheadLogChannel(
new PhysicalLogVersionedStoreChannel( channel, header.logVersion, header.logFormatVersion ),
NO_MORE_CHANNELS, DEFAULT_READ_AHEAD_SIZE ) );
}
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.PhysicalWritableLogChannel;
import org.neo4j.kernel.impl.transaction.log.TransactionLogWriter;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriterV1;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter;

import static org.neo4j.kernel.impl.transaction.log.entry.LogHeaderWriter.writeLogHeader;

Expand Down Expand Up @@ -71,7 +71,7 @@ public void checkPoint( long logVersion, long lastCommittedTx) throws IOExceptio
try ( PhysicalWritableLogChannel channel = new PhysicalWritableLogChannel( storeChannel ) )
{
final TransactionLogWriter writer =
new TransactionLogWriter( new LogEntryWriterV1( channel, new CommandWriter( channel ) ) );
new TransactionLogWriter( new LogEntryWriter( channel, new CommandWriter( channel ) ) );
writer.checkPoint( new LogPosition( logVersion, offset ) );
}
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import org.neo4j.function.Function;
import org.neo4j.helpers.Pair;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
Expand All @@ -32,8 +33,8 @@
import org.neo4j.kernel.impl.transaction.log.ReadableVersionableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReaderFactory;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;

import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel.DEFAULT_READ_AHEAD_SIZE;
Expand All @@ -45,17 +46,25 @@
class LegacyLogEntryReader
{
private final FileSystemAbstraction fs;
private final LogEntryReader<ReadableVersionableLogChannel> reader;
private final Function<LogHeader,LogEntryReader<ReadableVersionableLogChannel>> readerFactory;

LegacyLogEntryReader( FileSystemAbstraction fs, LogEntryReader<ReadableVersionableLogChannel> reader )
LegacyLogEntryReader( FileSystemAbstraction fs,
Function<LogHeader,LogEntryReader<ReadableVersionableLogChannel>> readerFactory )
{
this.fs = fs;
this.reader = reader;
this.readerFactory = readerFactory;
}

LegacyLogEntryReader( FileSystemAbstraction fs )
{
this( fs, new LogEntryReaderFactory().versionable() );
this( fs, new Function<LogHeader,LogEntryReader<ReadableVersionableLogChannel>>()
{
@Override
public LogEntryReader<ReadableVersionableLogChannel> apply( LogHeader from ) throws RuntimeException
{
return new VersionAwareLogEntryReader<>( from.logFormatVersion );
}
} );
}

public Pair<LogHeader, IOCursor<LogEntry>> openReadableChannel( File logFile ) throws IOException
Expand All @@ -64,6 +73,7 @@ public Pair<LogHeader, IOCursor<LogEntry>> openReadableChannel( File logFile ) t
final ByteBuffer buffer = ByteBuffer.allocate( LOG_HEADER_SIZE );

final LogHeader header = readLogHeader( buffer, rawChannel, false );
LogEntryReader<ReadableVersionableLogChannel> reader = readerFactory.apply( header );

// this ensures that the last committed txId field in the header is initialized properly
long lastCommittedTxId = Math.max( BASE_TX_ID, header.lastCommittedTxId );
Expand Down
Expand Up @@ -40,7 +40,6 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriterV1;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeaderWriter;

Expand All @@ -55,7 +54,7 @@ class LegacyLogEntryWriter
@Override
public LogEntryWriter apply( WritableLogChannel channel )
{
return new LogEntryWriterV1( channel, new CommandWriter( channel ) );
return new LogEntryWriter( channel, new CommandWriter( channel ) );
}
};

Expand Down
Expand Up @@ -26,76 +26,8 @@
import org.neo4j.kernel.impl.store.record.PropertyKeyTokenRecord;
import org.neo4j.kernel.impl.store.record.PropertyRecord;

import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersions.LEGACY_LOG_ENTRY_VERSION;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersions.LOG_ENTRY_VERSION_2_1;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersions.LOG_ENTRY_VERSION_2_2;
import static org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersions.LOG_ENTRY_VERSION_2_3;
import static org.neo4j.kernel.impl.transaction.log.entry.LogVersions.LOG_VERSION_1_9;
import static org.neo4j.kernel.impl.transaction.log.entry.LogVersions.LOG_VERSION_2_0;
import static org.neo4j.kernel.impl.transaction.log.entry.LogVersions.LOG_VERSION_2_1;
import static org.neo4j.kernel.impl.transaction.log.entry.LogVersions.LOG_VERSION_2_2;
import static org.neo4j.kernel.impl.transaction.log.entry.LogVersions.LOG_VERSION_2_3;
import static org.neo4j.kernel.impl.transaction.log.entry.LogVersions.LOG_VERSION_2_2_4;

public abstract class CommandReaderFactory
{
public abstract CommandReader newInstance( byte logFormatVersion, byte logEntryVersion );

public static class Default extends CommandReaderFactory
{
// Remember the latest log formant and log entry versions of the last reader returned.
// Don't use a map because of the overhead of it.
// Typically lots of log entries of the same version comes together.
private byte lastFormatVersion;
private byte lastEntryVersion;
private CommandReader lastReader;

@Override
public CommandReader newInstance( byte logFormatVersion, byte logEntryVersion )
{
if ( logFormatVersion == lastFormatVersion && logEntryVersion == lastEntryVersion && lastReader != null )
{
return lastReader;
}

lastFormatVersion = logFormatVersion;
lastEntryVersion = logEntryVersion;
return (lastReader = figureOutCorrectReader( logFormatVersion, logEntryVersion ));
}

private CommandReader figureOutCorrectReader( byte logFormatVersion, byte logEntryVersion )
{
switch ( logEntryVersion )
{
// These are not thread safe, so if they are to be cached it has to be done in an object pool
case LEGACY_LOG_ENTRY_VERSION:
switch ( logFormatVersion )
{
case LOG_VERSION_2_0:
return new PhysicalLogNeoCommandReaderV0_20();
case LOG_VERSION_1_9:
return new PhysicalLogNeoCommandReaderV0_19();
}
case LOG_ENTRY_VERSION_2_1:
case LOG_ENTRY_VERSION_2_2:
case LOG_ENTRY_VERSION_2_3:
switch ( logFormatVersion )
{
case LOG_VERSION_2_1:
return new PhysicalLogNeoCommandReaderV1();
case LOG_VERSION_2_2:
case LOG_VERSION_2_3:
return new PhysicalLogNeoCommandReaderV2();
case LOG_VERSION_2_2_4:
return new PhysicalLogNeoCommandReaderV2_2_4();
}
}
throw new IllegalArgumentException( "Unknown log format version (" + logFormatVersion + ") and " +
"log entry version (" + logEntryVersion + ")" );

}
}

public interface DynamicRecordAdder<T>
{
void add( T target, DynamicRecord record );
Expand Down
Expand Up @@ -49,7 +49,7 @@
import static org.neo4j.kernel.impl.transaction.command.CommandReaderFactory.PROPERTY_DELETED_DYNAMIC_RECORD_ADDER;
import static org.neo4j.kernel.impl.transaction.command.CommandReaderFactory.PROPERTY_INDEX_DYNAMIC_RECORD_ADDER;

public class PhysicalLogNeoCommandReaderV0_19 implements CommandReader
public class PhysicalLogNeoCommandReaderV1_9 implements CommandReader
{
private final PhysicalNeoCommandReader reader = new PhysicalNeoCommandReader();
private ReadableLogChannel channel;
Expand Down
Expand Up @@ -49,7 +49,7 @@
import static org.neo4j.kernel.impl.transaction.command.CommandReaderFactory.PROPERTY_DELETED_DYNAMIC_RECORD_ADDER;
import static org.neo4j.kernel.impl.transaction.command.CommandReaderFactory.PROPERTY_INDEX_DYNAMIC_RECORD_ADDER;

public class PhysicalLogNeoCommandReaderV0_20 implements CommandReader
public class PhysicalLogNeoCommandReaderV2_0 implements CommandReader
{
private final PhysicalNeoCommandReader reader = new PhysicalNeoCommandReader();
private ReadableLogChannel channel;
Expand Down
Expand Up @@ -66,7 +66,7 @@
import static org.neo4j.kernel.impl.util.IoPrimitiveUtils.read2bMap;
import static org.neo4j.kernel.impl.util.IoPrimitiveUtils.read3bLengthAndString;

public class PhysicalLogNeoCommandReaderV1 implements CommandReader
public class PhysicalLogNeoCommandReaderV2_1 implements CommandReader
{
private final PhysicalNeoCommandReader reader = new PhysicalNeoCommandReader();
private ReadableLogChannel channel;
Expand Down
Expand Up @@ -66,7 +66,7 @@
import static org.neo4j.kernel.impl.util.IoPrimitiveUtils.read2bMap;
import static org.neo4j.kernel.impl.util.IoPrimitiveUtils.read3bLengthAndString;

public class PhysicalLogNeoCommandReaderV2 implements CommandReader
public class PhysicalLogNeoCommandReaderV2_2 implements CommandReader
{
private final PhysicalNeoCommandReader reader = new PhysicalNeoCommandReader();
private ReadableLogChannel channel;
Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.neo4j.helpers.ThisShouldNotHappenError;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriterV1;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogCheckPointEvent;
Expand Down Expand Up @@ -110,7 +110,7 @@ public void start() throws Throwable
{
this.writer = logFile.getWriter();
this.indexCommandDetector = new IndexCommandDetector( new CommandWriter( writer ) );
this.transactionLogWriter = new TransactionLogWriter( new LogEntryWriterV1( writer, indexCommandDetector ) );
this.transactionLogWriter = new TransactionLogWriter( new LogEntryWriter( writer, indexCommandDetector ) );
}

@Override
Expand Down

This file was deleted.

0 comments on commit d7bc110

Please sign in to comment.