Skip to content

Commit

Permalink
Lets StorageEngine be in charge of providing CommandReader
Browse files Browse the repository at this point in the history
For this that aspect was broken out from LogEntryVersion into a
CommandReaderFactory interface. LogEntryVersion is now responsible of
providing parser for log entries, command reading is just one aspect of
one of the entries, the COMMAND entry. It's OK to have neo4j, regardless of
which StorageEngine is used (hypothetical since there's currently only
one), be in charge of log entry layout, but StorageEngine must be in
charge of the command part.

Since LogEntryVersion was a public enum and provided CommandReader most
users of log entry reading didn't have a concept of a specific log entry
parser since it just used whatever LogEntryVersion provided. Now
LogEntryReader is injected in many places and even HaRequestType, which
was enums had to be changed to be able to accept a LogEntryReader
instance.is injected in many places and even HaRequestType, which was
enums had to be changed to be able to accept a LogEntryReader instance.
  • Loading branch information
tinwelint authored and davidegrohmann committed Dec 18, 2015
1 parent 0e69b03 commit 1c7c466
Show file tree
Hide file tree
Showing 64 changed files with 851 additions and 526 deletions.
Expand Up @@ -451,17 +451,18 @@ public void start() throws IOException
storageEngine = buildStorageEngine(
propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, legacyIndexProviderLookup,
indexConfigStore, updateableSchemaState::clear );
LogEntryReader<ReadableLogChannel> logEntryReader =
new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory() );

TransactionLogModule transactionLogModule =
buildTransactionLogs( storeDir, config, logProvider, scheduler,
fs,
indexProviders.values(), storageEngine );
buildTransactionLogs( storeDir, config, logProvider, scheduler, fs,
indexProviders.values(), storageEngine, logEntryReader );

buildRecovery( fs,
storageEngine.neoStores(),
monitors.newMonitor( RecoveryVisitor.Monitor.class ), monitors.newMonitor( Recovery.Monitor.class ),
transactionLogModule.logFiles(), transactionLogModule.storeFlusher(), startupStatistics,
storageEngine );
storageEngine, logEntryReader );

KernelModule kernelModule = buildKernel(
transactionLogModule.transactionAppender(),
Expand Down Expand Up @@ -490,6 +491,7 @@ public void start() throws IOException
dependencies.satisfyDependency( storageEngine.schemaIndexProviderMap() );
dependencies.satisfyDependency( storageEngine.legacyIndexApplierLookup() );
dependencies.satisfyDependency( storageEngine.storeReadLayer() );
dependencies.satisfyDependency( logEntryReader );
dependencies.satisfyDependency( storageEngine );
satisfyDependencies( transactionLogModule, kernelModule );
}
Expand Down Expand Up @@ -585,7 +587,7 @@ private TransactionLogModule buildTransactionLogs(
JobScheduler scheduler,
FileSystemAbstraction fileSystemAbstraction,
Iterable<IndexImplementation> indexProviders,
StorageEngine storageEngine )
StorageEngine storageEngine, LogEntryReader<ReadableLogChannel> logEntryReader )
{
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 1000, 100_000 );
final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, PhysicalLogFile.DEFAULT_NAME,
Expand All @@ -607,9 +609,8 @@ public long getTimestampForVersion( long version ) throws IOException
LogPosition position = LogPosition.start( version );
try ( ReadableVersionableLogChannel channel = logFile.getReader( position ) )
{
final LogEntryReader<ReadableVersionableLogChannel> reader = new VersionAwareLogEntryReader<>();
LogEntry entry;
while ( (entry = reader.readLogEntry( channel )) != null )
while ( (entry = logEntryReader.readLogEntry( channel )) != null )
{
if ( entry instanceof LogEntryStart )
{
Expand Down Expand Up @@ -641,7 +642,7 @@ public long getTimestampForVersion( long version ) throws IOException
logFile, logRotation, transactionMetadataCache, transactionIdStore, legacyIndexTransactionOrdering,
databaseHealth ) );
final LogicalTransactionStore logicalTransactionStore =
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache );
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader );

int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx );
final CountCommittedTransactionThreshold countCommittedTransactionThreshold =
Expand Down Expand Up @@ -730,15 +731,12 @@ private void buildRecovery(
final PhysicalLogFiles logFiles,
final StoreFlusher storeFlusher,
final StartupStatisticsProvider startupStatistics,
StorageEngine storageEngine )
StorageEngine storageEngine,
LogEntryReader<ReadableLogChannel> logEntryReader )
{
@SuppressWarnings( "resource" )
MetaDataStore metaDataStore = neoStores.getMetaDataStore();

@SuppressWarnings( "resource" )
RecoveryVisitor recoveryVisitor = new RecoveryVisitor( metaDataStore, storageEngine, recoveryVisitorMonitor );

LogEntryReader<ReadableLogChannel> logEntryReader = new VersionAwareLogEntryReader<>();
final Visitor<LogVersionedStoreChannel,Exception> logFileRecoverer =
new LogFileRecoverer( logEntryReader, recoveryVisitor );

Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
Expand Down Expand Up @@ -64,7 +65,8 @@ public boolean isRecoveryRequiredAt( File dataDir ) throws IOException
long logVersion = MetaDataStore.getRecord( pageCache, neoStore, MetaDataStore.Position.LOG_VERSION );
PhysicalLogFiles logFiles = new PhysicalLogFiles( dataDir, fs );

LogEntryReader<ReadableLogChannel> reader = new VersionAwareLogEntryReader<>( LogEntryVersion.CURRENT.byteCode() );
LogEntryReader<ReadableLogChannel> reader = new VersionAwareLogEntryReader<>( LogEntryVersion.CURRENT.byteCode(),
new RecordStorageCommandReaderFactory() );

LatestCheckPointFinder finder = new LatestCheckPointFinder( logFiles, fs, reader );
return new PositionToRecoverFrom( finder ).apply( logVersion ) != LogPosition.UNSPECIFIED;
Expand Down
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.storageengine;

import org.neo4j.kernel.impl.transaction.command.CommandReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;

/**
* Provides {@link CommandReader} instances for specific entry versions.
*/
public interface CommandReaderFactory
{
/**
* Given {@code version} give back a {@link CommandReader} capable of reading such commands.
*
* @param version {@link LogEntry} version.
* @return {@link CommandReader} for reading commands of that version.
*/
CommandReader byVersion( LogEntryVersion version );
}
Expand Up @@ -112,7 +112,7 @@ Collection<Command> createCommands(
*/
void apply( TransactionToApply batch, TransactionApplicationMode mode ) throws Exception;

// Stream<Command> deserialise( SequenceOfBytes source );
CommandReaderFactory commandReaderFactory();

// ====================================================================
// All these methods below are temporary while in the process of
Expand Down
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.storageengine.impl.recordstorage;

import org.neo4j.kernel.impl.storageengine.CommandReaderFactory;
import org.neo4j.kernel.impl.transaction.command.CommandReader;
import org.neo4j.kernel.impl.transaction.command.PhysicalLogCommandReaderV1_9;
import org.neo4j.kernel.impl.transaction.command.PhysicalLogCommandReaderV2_0;
import org.neo4j.kernel.impl.transaction.command.PhysicalLogCommandReaderV2_1;
import org.neo4j.kernel.impl.transaction.command.PhysicalLogCommandReaderV2_2_4;
import org.neo4j.kernel.impl.transaction.command.PhysicalLogNeoCommandReaderV2_2;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;

public class RecordStorageCommandReaderFactory implements CommandReaderFactory
{
// All supported readers. Key/index is LogEntryVersion ordinal.
private final CommandReader[] readers;

public RecordStorageCommandReaderFactory()
{
LogEntryVersion[] versions = LogEntryVersion.values();
readers = new CommandReader[versions.length];
readers[LogEntryVersion.V1_9.ordinal()] = new PhysicalLogCommandReaderV1_9();
readers[LogEntryVersion.V2_0.ordinal()] = new PhysicalLogCommandReaderV2_0();
readers[LogEntryVersion.V2_1.ordinal()] = new PhysicalLogCommandReaderV2_1();
readers[LogEntryVersion.V2_2.ordinal()] = new PhysicalLogNeoCommandReaderV2_2();
readers[LogEntryVersion.V2_2_4.ordinal()] = new PhysicalLogCommandReaderV2_2_4();
readers[LogEntryVersion.V2_3.ordinal()] = new PhysicalLogCommandReaderV2_2_4();
for ( int i = 0; i < versions.length; i++ )
{
if ( versions[i] == null )
{
throw new IllegalStateException( "Version " + versions[i] + " not handled" );
}
}
}

@Override
public CommandReader byVersion( LogEntryVersion version )
{
int key = version.ordinal();
if ( key >= readers.length )
{
throw new IllegalArgumentException( "Unsupported entry version " + version );
}
return readers[key];
}
}
Expand Up @@ -80,6 +80,7 @@
import org.neo4j.kernel.impl.locking.LockGroup;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.storageengine.CommandReaderFactory;
import org.neo4j.kernel.impl.storageengine.StorageEngine;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
Expand Down Expand Up @@ -146,6 +147,7 @@ public class RecordStorageEngine implements StorageEngine, Lifecycle
private final IdOrderingQueue legacyIndexTransactionOrdering;
private final LockService lockService;
private final WorkSync<Supplier<LabelScanWriter>,LabelUpdateWork> labelScanStoreSync;
private final CommandReaderFactory commandReaderFactory;

public RecordStorageEngine(
File storeDir,
Expand Down Expand Up @@ -210,6 +212,8 @@ neoStores, databaseHealth, new PropertyLoader( neoStores ),

indexUpdatesValidatorForRecovery = new RecoveryIndexingUpdatesValidator( indexingService );
labelScanStoreSync = new WorkSync<>( labelScanStore::newWriter );

this.commandReaderFactory = new RecordStorageCommandReaderFactory();
}
catch ( Throwable failure )
{
Expand All @@ -232,6 +236,12 @@ public StoreReadLayer storeReadLayer()
return storeLayer;
}

@Override
public CommandReaderFactory commandReaderFactory()
{
return commandReaderFactory;
}

@SuppressWarnings( "resource" )
@Override
public Collection<Command> createCommands(
Expand Down
Expand Up @@ -37,9 +37,11 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommand;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;

Expand All @@ -53,17 +55,20 @@ class TransactionStream
private final FileSystemAbstraction fs;
private final boolean isContiguous;
private final LogFile firstFile;
private final LogEntryReader<ReadableLogChannel> logEntryReader;

public TransactionStream( FileSystemAbstraction fs, File path ) throws IOException
public TransactionStream( FileSystemAbstraction fs, File path, LogEntryReader<ReadableLogChannel> logEntryReader )
throws IOException
{
this.fs = fs;
this.logEntryReader = logEntryReader;
{ // read metadata from all files in the directory
ByteBuffer buffer = ByteBuffer.allocateDirect( LOG_HEADER_SIZE );
File[] files = fs.listFiles( path, new TxFileFilter( fs ) );
ArrayList<LogFile> logFiles = new ArrayList<>( files.length );
for ( File file : files )
{
LogFile logFile = new LogFile( fs, file, buffer );
LogFile logFile = new LogFile( fs, file, buffer, logEntryReader );
if ( logFile.lastTxId > 0 )
{
logFiles.add( logFile );
Expand Down Expand Up @@ -106,7 +111,7 @@ public long firstTransactionId()
return firstFile.header.lastCommittedTxId;
}

public IOCursor<CommittedTransactionRepresentation> cursor() throws IOException
public IOCursor<CommittedTransactionRepresentation> cursor()
{
return new Cursor();
}
Expand Down Expand Up @@ -217,14 +222,15 @@ private static class LogFile implements Comparable<LogFile>
private final long lastTxId;
long cap = Long.MAX_VALUE;

LogFile( FileSystemAbstraction fs, File file, ByteBuffer buffer ) throws IOException
LogFile( FileSystemAbstraction fs, File file, ByteBuffer buffer,
LogEntryReader<ReadableLogChannel> logEntryReader ) throws IOException
{
this.file = file;
try ( StoreChannel channel = fs.open( file, "r" ) )
{
header = readLogHeader( buffer, channel, false );

try ( IOCursor<LogEntry> cursor = logEntryCursor( channel, header ) )
try ( IOCursor<LogEntry> cursor = logEntryCursor( logEntryReader, channel, header ) )
{
long txId = header.lastCommittedTxId;
while ( cursor.next() )
Expand Down Expand Up @@ -300,12 +306,13 @@ private IOCursor<LogEntry> logEntryCursor( LogFile file, ByteBuffer buffer ) thr
{
buffer.clear();
}
return logEntryCursor( channel, file.header );
return logEntryCursor( logEntryReader, channel, file.header );
}

private static IOCursor<LogEntry> logEntryCursor( StoreChannel channel, LogHeader header ) throws IOException
private static IOCursor<LogEntry> logEntryCursor( LogEntryReader<ReadableLogChannel> logEntryReader,
StoreChannel channel, LogHeader header ) throws IOException
{
return new LogEntryCursor( new ReadAheadLogChannel(
return new LogEntryCursor( logEntryReader, new ReadAheadLogChannel(
new PhysicalLogVersionedStoreChannel( channel, header.logVersion, header.logFormatVersion ),
NO_MORE_CHANNELS ) );
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
import org.neo4j.kernel.impl.store.CommonAbstractStore;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result;
Expand All @@ -33,6 +34,8 @@
import org.neo4j.kernel.impl.storemigration.legacystore.v22.Legacy22Store;
import org.neo4j.kernel.impl.storemigration.legacystore.v23.Legacy23Store;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.recovery.LatestCheckPointFinder;
import org.neo4j.kernel.recovery.LatestCheckPointFinder.LatestCheckPoint;
Expand Down Expand Up @@ -137,8 +140,10 @@ private Result checkUpgradeableFor( File storeDirectory, String version )
}

PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDirectory, fs );
LogEntryReader<ReadableLogChannel> logEntryReader =
new VersionAwareLogEntryReader<>( new RecordStorageCommandReaderFactory() );
LatestCheckPointFinder latestCheckPointFinder =
new LatestCheckPointFinder( logFiles, fs, new VersionAwareLogEntryReader<>() );
new LatestCheckPointFinder( logFiles, fs, logEntryReader );
try
{
LatestCheckPoint latestCheckPoint = latestCheckPointFinder.find( logFiles.getHighestLogVersion() );
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.helpers.Pair;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
import org.neo4j.kernel.impl.transaction.log.IOCursor;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
Expand Down Expand Up @@ -56,7 +57,8 @@ class LegacyLogEntryReader

LegacyLogEntryReader( FileSystemAbstraction fs )
{
this( fs, from -> new VersionAwareLogEntryReader<>( from.logFormatVersion ) );
this( fs, from -> new VersionAwareLogEntryReader<>( from.logFormatVersion,
new RecordStorageCommandReaderFactory() ) );
}

public Pair<LogHeader, IOCursor<LogEntry>> openReadableChannel( File logFile ) throws IOException
Expand Down

0 comments on commit 1c7c466

Please sign in to comment.