Skip to content

Commit

Permalink
Made some wiggle room in the interface hierarchy to decouple buffer r…
Browse files Browse the repository at this point in the history
…eading from logs specifically.
  • Loading branch information
jimwebber committed Jan 18, 2016
1 parent dabdbcf commit 82342d3
Show file tree
Hide file tree
Showing 95 changed files with 526 additions and 511 deletions.
Expand Up @@ -44,6 +44,7 @@
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.builtinprocs.BuiltInProcedures;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.extension.dependency.HighestSelectionStrategy;
import org.neo4j.kernel.guard.Guard;
Expand Down Expand Up @@ -80,8 +81,6 @@
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ReentrantLockService;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.builtinprocs.BuiltInProcedures;
import org.neo4j.proc.Procedures;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores;
Expand All @@ -105,11 +104,11 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFileInformation;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableVersionableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.VersionableReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointScheduler;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThreshold;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointThresholds;
Expand Down Expand Up @@ -153,6 +152,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.Logger;
import org.neo4j.proc.Procedures;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StoreReadLayer;

Expand Down Expand Up @@ -459,7 +459,7 @@ public void start() throws IOException
storageEngine = buildStorageEngine(
propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, legacyIndexProviderLookup,
indexConfigStore, updateableSchemaState::clear );
LogEntryReader<ReadableLogChannel> logEntryReader =
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader =
new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory() );

TransactionLogModule transactionLogModule =
Expand Down Expand Up @@ -592,7 +592,7 @@ private TransactionLogModule buildTransactionLogs(
JobScheduler scheduler,
FileSystemAbstraction fileSystemAbstraction,
Iterable<IndexImplementation> indexProviders,
StorageEngine storageEngine, LogEntryReader<ReadableLogChannel> logEntryReader )
StorageEngine storageEngine, LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader )
{
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 1000, 100_000 );
final PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDir, PhysicalLogFile.DEFAULT_NAME,
Expand All @@ -613,7 +613,7 @@ private TransactionLogModule buildTransactionLogs(
public long getTimestampForVersion( long version ) throws IOException
{
LogPosition position = LogPosition.start( version );
try ( ReadableVersionableLogChannel channel = logFile.getReader( position ) )
try ( VersionableReadableClosablePositionAwareChannel channel = logFile.getReader( position ) )
{
LogEntry entry;
while ( (entry = logEntryReader.readLogEntry( channel )) != null )
Expand Down Expand Up @@ -738,7 +738,7 @@ private void buildRecovery(
final StoreFlusher storeFlusher,
final StartupStatisticsProvider startupStatistics,
StorageEngine storageEngine,
LogEntryReader<ReadableLogChannel> logEntryReader )
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader )
{
MetaDataStore metaDataStore = neoStores.getMetaDataStore();
RecoveryVisitor recoveryVisitor = new RecoveryVisitor( metaDataStore, storageEngine, recoveryVisitorMonitor );
Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
Expand Down Expand Up @@ -65,8 +65,9 @@ public boolean isRecoveryRequiredAt( File dataDir ) throws IOException
long logVersion = MetaDataStore.getRecord( pageCache, neoStore, MetaDataStore.Position.LOG_VERSION );
PhysicalLogFiles logFiles = new PhysicalLogFiles( dataDir, fs );

LogEntryReader<ReadableLogChannel> reader = new VersionAwareLogEntryReader<>( LogEntryVersion.CURRENT.byteCode(),
new RecordStorageCommandReaderFactory() );
LogEntryReader<ReadableClosablePositionAwareChannel> reader =
new VersionAwareLogEntryReader<>( LogEntryVersion.CURRENT.byteCode(),
new RecordStorageCommandReaderFactory() );

LatestCheckPointFinder finder = new LatestCheckPointFinder( logFiles, fs, reader );
return new PositionToRecoverFrom( finder ).apply( logVersion ) != LogPosition.UNSPECIFIED;
Expand Down
Expand Up @@ -25,7 +25,7 @@

import org.neo4j.kernel.impl.store.counts.keys.CountsKey;
import org.neo4j.kernel.impl.store.counts.keys.CountsKeyType;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosableChannel;

import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexSampleKey;
import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexStatisticsKey;
Expand All @@ -35,7 +35,7 @@

public class CountsSnapshotDeserializer
{
public static CountsSnapshot deserialize( ReadableLogChannel channel ) throws IOException
public static CountsSnapshot deserialize( ReadableClosableChannel channel ) throws IOException
{
long txid = channel.getLong();
int size = channel.getInt();
Expand Down
Expand Up @@ -35,8 +35,8 @@
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadPositionableReadableChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommand;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
Expand All @@ -55,9 +55,10 @@ class TransactionStream
private final FileSystemAbstraction fs;
private final boolean isContiguous;
private final LogFile firstFile;
private final LogEntryReader<ReadableLogChannel> logEntryReader;
private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader;

public TransactionStream( FileSystemAbstraction fs, File path, LogEntryReader<ReadableLogChannel> logEntryReader )
public TransactionStream( FileSystemAbstraction fs, File path,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader )
throws IOException
{
this.fs = fs;
Expand Down Expand Up @@ -201,9 +202,9 @@ else if ( entry instanceof LogEntryCommit && start != null )
PhysicalTransactionRepresentation transaction = new PhysicalTransactionRepresentation(
commands );
transaction.setHeader( start.getAdditionalHeader(), start.getMasterId(),
start.getLocalId(), start.getTimeWritten(),
start.getLastCommittedTxWhenTransactionStarted(),
commit.getTimeWritten(), -1 );
start.getLocalId(), start.getTimeWritten(),
start.getLastCommittedTxWhenTransactionStarted(),
commit.getTimeWritten(), -1 );
return new CommittedTransactionRepresentation( start, transaction, commit );
}
else
Expand All @@ -223,7 +224,7 @@ private static class LogFile implements Comparable<LogFile>
long cap = Long.MAX_VALUE;

LogFile( FileSystemAbstraction fs, File file, ByteBuffer buffer,
LogEntryReader<ReadableLogChannel> logEntryReader ) throws IOException
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader ) throws IOException
{
this.file = file;
try ( StoreChannel channel = fs.open( file, "r" ) )
Expand Down Expand Up @@ -287,7 +288,7 @@ public int compareTo( LogFile that )
StringBuilder rangeString( StringBuilder target )
{
return target.append( ']' ).append( header.lastCommittedTxId )
.append( ", " ).append( lastTxId ).append( ']' );
.append( ", " ).append( lastTxId ).append( ']' );
}
}

Expand All @@ -309,10 +310,11 @@ private IOCursor<LogEntry> logEntryCursor( LogFile file, ByteBuffer buffer ) thr
return logEntryCursor( logEntryReader, channel, file.header );
}

private static IOCursor<LogEntry> logEntryCursor( LogEntryReader<ReadableLogChannel> logEntryReader,
private static IOCursor<LogEntry> logEntryCursor(
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader,
StoreChannel channel, LogHeader header ) throws IOException
{
return new LogEntryCursor( logEntryReader, new ReadAheadLogChannel(
return new LogEntryCursor( logEntryReader, new ReadAheadPositionableReadableChannel(
new PhysicalLogVersionedStoreChannel( channel, header.logVersion, header.logFormatVersion ),
NO_MORE_CHANNELS ) );
}
Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.neo4j.kernel.impl.storemigration.legacystore.v22.Legacy22Store;
import org.neo4j.kernel.impl.storemigration.legacystore.v23.Legacy23Store;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.recovery.LatestCheckPointFinder;
Expand Down Expand Up @@ -140,7 +140,7 @@ private Result checkUpgradeableFor( File storeDirectory, String version )
}

PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDirectory, fs );
LogEntryReader<ReadableLogChannel> logEntryReader =
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader =
new VersionAwareLogEntryReader<>( new RecordStorageCommandReaderFactory() );
LatestCheckPointFinder latestCheckPointFinder =
new LatestCheckPointFinder( logFiles, fs, logEntryReader );
Expand Down
Expand Up @@ -30,8 +30,8 @@
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageCommandReaderFactory;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableVersionableLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadPositionableReadableChannel;
import org.neo4j.kernel.impl.transaction.log.VersionableReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
Expand All @@ -46,10 +46,10 @@
class LegacyLogEntryReader
{
private final FileSystemAbstraction fs;
private final Function<LogHeader,LogEntryReader<ReadableVersionableLogChannel>> readerFactory;
private final Function<LogHeader,LogEntryReader<VersionableReadableClosablePositionAwareChannel>> readerFactory;

LegacyLogEntryReader( FileSystemAbstraction fs,
Function<LogHeader,LogEntryReader<ReadableVersionableLogChannel>> readerFactory )
Function<LogHeader,LogEntryReader<VersionableReadableClosablePositionAwareChannel>> readerFactory )
{
this.fs = fs;
this.readerFactory = readerFactory;
Expand All @@ -66,14 +66,14 @@ public Pair<LogHeader, IOCursor<LogEntry>> openReadableChannel( File logFile ) t
final StoreChannel rawChannel = fs.open( logFile, "r" );

final LogHeader header = readLogHeader( ByteBuffer.allocate( LOG_HEADER_SIZE ), rawChannel, false );
LogEntryReader<ReadableVersionableLogChannel> reader = readerFactory.apply( header );
LogEntryReader<VersionableReadableClosablePositionAwareChannel> reader = readerFactory.apply( header );

// this ensures that the last committed txId field in the header is initialized properly
long lastCommittedTxId = Math.max( BASE_TX_ID, header.lastCommittedTxId );

final PhysicalLogVersionedStoreChannel channel =
new PhysicalLogVersionedStoreChannel( rawChannel, header.logVersion, header.logFormatVersion );
final ReadableVersionableLogChannel readableChannel = new ReadAheadLogChannel( channel, NO_MORE_CHANNELS );
final VersionableReadableClosablePositionAwareChannel readableChannel = new ReadAheadPositionableReadableChannel( channel, NO_MORE_CHANNELS );
final IOCursor<LogEntry> cursor = new LogEntrySortingCursor( reader, readableChannel );

return Pair.of( new LogHeader( CURRENT_LOG_VERSION, header.logVersion, lastCommittedTxId ), cursor );
Expand Down
Expand Up @@ -26,24 +26,24 @@
import java.util.Map;

import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.log.ReadableVersionableLogChannel;
import org.neo4j.kernel.impl.transaction.log.VersionableReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.IdentifiableLogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;

class LogEntrySortingCursor implements IOCursor<LogEntry>
{
private final ReadableVersionableLogChannel channel;
private final LogEntryReader<ReadableVersionableLogChannel> reader;
private final VersionableReadableClosablePositionAwareChannel channel;
private final LogEntryReader<VersionableReadableClosablePositionAwareChannel> reader;
// identifier -> log entry
private final Map<Integer, List<LogEntry>> idToEntries = new HashMap<>();

private LogEntry toReturn;
private int idToFetchFrom = -1;

LogEntrySortingCursor( LogEntryReader<ReadableVersionableLogChannel> reader,
ReadableVersionableLogChannel channel )
LogEntrySortingCursor( LogEntryReader<VersionableReadableClosablePositionAwareChannel> reader,
VersionableReadableClosablePositionAwareChannel channel )
{
this.reader = reader;
this.channel = channel;
Expand Down
Expand Up @@ -23,7 +23,8 @@

import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.transaction.log.LogPositionMarker;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.PositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosableChannel;
import org.neo4j.storageengine.api.CommandReader;
import org.neo4j.storageengine.api.ReadableChannel;

Expand Down Expand Up @@ -56,7 +57,7 @@ public final Command read( ReadableChannel channel ) throws IOException
* Reads the next {@link Command} from {@code channel}.
*
* @param commandType type of command to read, f.ex. node command, relationship command a.s.o.
* @param channel {@link ReadableLogChannel} to read from.
* @param channel {@link ReadableClosableChannel} to read from.
* @return {@link Command} or {@code null} if end reached.
* @throws IOException if channel throws exception.
*/
Expand All @@ -65,9 +66,9 @@ public final Command read( ReadableChannel channel ) throws IOException
protected IOException unknownCommandType( byte commandType, ReadableChannel channel ) throws IOException
{
String message = "Unknown command type[" + commandType + "]";
if ( channel instanceof ReadableLogChannel )
if ( channel instanceof PositionAwareChannel )
{
ReadableLogChannel logChannel = (ReadableLogChannel) channel;
PositionAwareChannel logChannel = (PositionAwareChannel) channel;
LogPositionMarker position = new LogPositionMarker();
logChannel.getCurrentPosition( position );
message += " near " + position.newPosition();
Expand Down
Expand Up @@ -27,11 +27,12 @@

public class LogEntryCursor implements IOCursor<LogEntry>
{
private final LogEntryReader<ReadableLogChannel> logEntryReader;
private final ReadableLogChannel channel;
private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader;
private final ReadableClosablePositionAwareChannel channel;
private LogEntry entry;

public LogEntryCursor( LogEntryReader<ReadableLogChannel> logEntryReader, ReadableLogChannel channel )
public LogEntryCursor( LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader,
ReadableClosablePositionAwareChannel channel )
{
this.logEntryReader = logEntryReader;
this.channel = channel;
Expand Down
Expand Up @@ -29,7 +29,7 @@ public interface LogFile
{
interface LogFileVisitor
{
boolean visit( LogPosition position, ReadableLogChannel channel ) throws IOException;
boolean visit( LogPosition position, ReadableClosablePositionAwareChannel channel ) throws IOException;
}

/**
Expand All @@ -39,10 +39,10 @@ interface LogFileVisitor

/**
* @param position {@link LogPosition} to position the returned reader at.
* @return {@link ReadableLogChannel} capable of reading log data, starting from {@link LogPosition position}.
* @return {@link ReadableClosableChannel} capable of reading log data, starting from {@link LogPosition position}.
* @throws IOException
*/
ReadableLogChannel getReader( LogPosition position ) throws IOException;
VersionableReadableClosablePositionAwareChannel getReader( LogPosition position ) throws IOException;

void accept( LogFileVisitor visitor, LogPosition startingFromPosition ) throws IOException;

Expand Down

0 comments on commit 82342d3

Please sign in to comment.