Skip to content

Commit

Permalink
Use monitors for reversed transaction reader cursor as well.
Browse files Browse the repository at this point in the history
Reorganised reversed cursors into their own packages.
  • Loading branch information
MishaDemianenko committed Oct 3, 2017
1 parent 192dd70 commit 5b8f005
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 54 deletions.
Expand Up @@ -131,6 +131,8 @@
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruneStrategy;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruning;
import org.neo4j.kernel.impl.transaction.log.pruning.LogPruningImpl;
import org.neo4j.kernel.impl.transaction.log.reverse.ReverseTransactionCursorLoggingMonitor;
import org.neo4j.kernel.impl.transaction.log.reverse.ReversedSingleFileTransactionCursor;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotationImpl;
import org.neo4j.kernel.impl.transaction.state.DefaultSchemaIndexProviderMap;
Expand Down Expand Up @@ -437,6 +439,7 @@ public void start() throws IOException

LogTailScanner tailScanner = new LogTailScanner( logFiles, fs, logEntryReader, monitors, failOnCorruptedLogFiles );
monitors.addMonitorListener( new LoggingLogTailScannerMonitor( logService.getInternalLog( LogTailScanner.class ) ) );
monitors.addMonitorListener( new ReverseTransactionCursorLoggingMonitor( logService.getInternalLog( ReversedSingleFileTransactionCursor.class ) ) );
LogVersionUpgradeChecker.check( tailScanner, config );

// Upgrade the store before we begin
Expand Down Expand Up @@ -659,7 +662,7 @@ private NeoStoreTransactionLogModule buildTransactionLogs(
logFile, logRotation, transactionMetadataCache, transactionIdStore, explicitIndexTransactionOrdering,
databaseHealth ) );
final LogicalTransactionStore logicalTransactionStore =
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader, logService );
new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader, monitors );

int txThreshold = config.get( GraphDatabaseSettings.check_point_interval_tx );
final CountCommittedTransactionThreshold countCommittedTransactionThreshold =
Expand Down
Expand Up @@ -23,14 +23,16 @@
import java.io.IOException;

import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache.TransactionMetadata;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.impl.transaction.log.reverse.ReversedMultiFileTransactionCursor;
import org.neo4j.kernel.impl.transaction.log.reverse.ReversedTransactionCursorMonitor;
import org.neo4j.kernel.monitoring.Monitors;

import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_CHECKSUM;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
Expand All @@ -47,15 +49,15 @@ public class PhysicalLogicalTransactionStore implements LogicalTransactionStore
private final LogFile logFile;
private final TransactionMetadataCache transactionMetadataCache;
private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader;
private final LogService logService;
private final Monitors monitors;

public PhysicalLogicalTransactionStore( LogFile logFile, TransactionMetadataCache transactionMetadataCache,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, LogService logService )
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, Monitors monitors )
{
this.logFile = logFile;
this.transactionMetadataCache = transactionMetadataCache;
this.logEntryReader = logEntryReader;
this.logService = logService;
this.monitors = monitors;
}

@Override
Expand All @@ -68,7 +70,8 @@ public TransactionCursor getTransactions( LogPosition position ) throws IOExcept
public TransactionCursor getTransactionsInReverseOrder( LogPosition backToPosition ) throws
IOException
{
return ReversedMultiFileTransactionCursor.fromLogFile( logFile, backToPosition, logService );
return ReversedMultiFileTransactionCursor
.fromLogFile( logFile, backToPosition, monitors.newMonitor( ReversedTransactionCursorMonitor.class ) );
}

@Override
Expand Down
Expand Up @@ -24,7 +24,6 @@

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache.TransactionMetadata;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
Expand All @@ -40,8 +39,8 @@ public class ReadOnlyTransactionStore extends LifecycleAdapter implements Logica
private final LifeSupport life = new LifeSupport();
private final LogicalTransactionStore physicalStore;

public ReadOnlyTransactionStore( PageCache pageCache, FileSystemAbstraction fs, File fromPath, Monitors monitors,
LogService logService ) throws IOException
public ReadOnlyTransactionStore( PageCache pageCache, FileSystemAbstraction fs, File fromPath, Monitors monitors )
throws IOException
{
PhysicalLogFiles logFiles = new PhysicalLogFiles( fromPath, fs );
TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache( 100 );
Expand All @@ -53,7 +52,7 @@ public ReadOnlyTransactionStore( PageCache pageCache, FileSystemAbstraction fs,
monitors.newMonitor( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
physicalStore = new PhysicalLogicalTransactionStore( logFile, transactionMetadataCache, logEntryReader,
logService );
monitors );
}

@Override
Expand Down
Expand Up @@ -17,13 +17,15 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log;
package org.neo4j.kernel.impl.transaction.log.reverse;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;

/**
* Eagerly exhausts a {@link TransactionCursor} and allows moving through it in reverse order.
Expand All @@ -35,13 +37,13 @@
*
* @see ReversedMultiFileTransactionCursor
*/
class EagerlyReversedTransactionCursor implements TransactionCursor
public class EagerlyReversedTransactionCursor implements TransactionCursor
{
private final List<CommittedTransactionRepresentation> txs = new ArrayList<>();
private final TransactionCursor cursor;
private int indexToReturn;

EagerlyReversedTransactionCursor( TransactionCursor cursor ) throws IOException
public EagerlyReversedTransactionCursor( TransactionCursor cursor ) throws IOException
{
this.cursor = cursor;
while ( cursor.next() )
Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.reverse;

import org.neo4j.logging.Log;

import static java.lang.String.format;

public class ReverseTransactionCursorLoggingMonitor implements ReversedTransactionCursorMonitor
{
private final Log log;

public ReverseTransactionCursorLoggingMonitor( Log log )
{
this.log = log;
}

@Override
public void transactionalLogRecordReadFailure( Throwable t, long[] transactionOffsets, int transactionIndex, long logVersion )
{
log.warn( transactionIndex > 0 ?
format( "Fail to read transaction log version %d. Last valid transaction start offset is: %d.",
logVersion, transactionOffsets[transactionIndex - 1] ) :
format( "Fail to read first transaction of log version %d.", logVersion) );
}
}
Expand Up @@ -17,19 +17,25 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log;
package org.neo4j.kernel.impl.transaction.log.reverse;

import java.io.IOException;

import org.neo4j.function.ThrowingFunction;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogFile;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionCursor;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;

import static org.neo4j.kernel.impl.transaction.log.EagerlyReversedTransactionCursor.eagerlyReverse;
import static org.neo4j.kernel.impl.transaction.log.LogPosition.start;
import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static org.neo4j.kernel.impl.transaction.log.reverse.EagerlyReversedTransactionCursor.eagerlyReverse;

/**
* Similar to {@link PhysicalTransactionCursor} and actually uses it internally. This main difference is that transactions
Expand All @@ -43,7 +49,7 @@
*
* @see ReversedSingleFileTransactionCursor
*/
class ReversedMultiFileTransactionCursor implements TransactionCursor
public class ReversedMultiFileTransactionCursor implements TransactionCursor
{
private final LogPosition backToPosition;
private final ThrowingFunction<LogPosition,TransactionCursor,IOException> cursorFactory;
Expand All @@ -57,13 +63,13 @@ class ReversedMultiFileTransactionCursor implements TransactionCursor
*
* @param logFile {@link LogFile} to supply log entries forming transactions.
* @param backToPosition {@link LogPosition} to read backwards to.
* @param logService logging service used to to create log.
* @param monitor reverse transaction cursor monitor
* @return a {@link TransactionCursor} which returns transactions from the end of the log stream and backwards to
* and including transaction starting at {@link LogPosition}.
* @throws IOException on I/O error.
*/
static TransactionCursor fromLogFile( LogFile logFile, LogPosition backToPosition, LogService logService ) throws
IOException
public static TransactionCursor fromLogFile( LogFile logFile, LogPosition backToPosition,
ReversedTransactionCursorMonitor monitor ) throws IOException
{
long highestVersion = logFile.currentLogVersion();
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
Expand All @@ -74,7 +80,7 @@ static TransactionCursor fromLogFile( LogFile logFile, LogPosition backToPositio
{
// This is a channel which can be positioned explicitly and is the typical case for such channels
// Let's take advantage of this fact and use a bit smarter reverse implementation
return new ReversedSingleFileTransactionCursor( (ReadAheadLogChannel) channel, logEntryReader, logService );
return new ReversedSingleFileTransactionCursor( (ReadAheadLogChannel) channel, logEntryReader, monitor );
}

// Fall back to simply eagerly reading each single log file and reversing in memory
Expand Down
Expand Up @@ -17,17 +17,22 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log;
package org.neo4j.kernel.impl.transaction.log.reverse;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;

import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionBridge;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionCursor;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.logging.Log;

import static java.lang.String.format;

Expand Down Expand Up @@ -57,16 +62,16 @@
*
* @see ReversedMultiFileTransactionCursor
*/
class ReversedSingleFileTransactionCursor implements TransactionCursor
public class ReversedSingleFileTransactionCursor implements TransactionCursor
{
// Should this be passed in or extracted from the read-ahead channel instead?
private static final int CHUNK_SIZE = ReadAheadChannel.DEFAULT_READ_AHEAD_SIZE;

private final ReadAheadLogChannel channel;
private final ReversedTransactionCursorMonitor monitor;
private final TransactionCursor transactionCursor;
// Should be generally large enough to hold transactions in a chunk, where one chunk is the read-ahead size of ReadAheadLogChannel
private final Deque<CommittedTransactionRepresentation> chunkTransactions = new ArrayDeque<>( 20 );
private final Log log;
private CommittedTransactionRepresentation currentChunkTransaction;
// May be longer than required, offsetLength holds the actual length.
private final long[] offsets;
Expand All @@ -75,19 +80,18 @@ class ReversedSingleFileTransactionCursor implements TransactionCursor
private long totalSize;

ReversedSingleFileTransactionCursor( ReadAheadLogChannel channel,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, LogService logService )
throws IOException
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader,
ReversedTransactionCursorMonitor monitor ) throws IOException
{
this.channel = channel;
this.monitor = monitor;
// There's an assumption here: that the underlying channel can move in between calls and that the
// transaction cursor will just happily read from the new position.
this.transactionCursor = new PhysicalTransactionCursor<>( channel, logEntryReader );
this.log = logService.getInternalLog( ReversedSingleFileTransactionCursor.class );
this.offsets = sketchOutTransactionStartOffsets();
}

// Also initializes offset indexes
// This method could use some way of reading log entries w/o creating objects. Would be great
private long[] sketchOutTransactionStartOffsets() throws IOException
{
// Grows on demand. Initially sized to be able to hold all transaction start offsets for a single log file
Expand All @@ -110,7 +114,7 @@ private long[] sketchOutTransactionStartOffsets() throws IOException
}
catch ( Throwable t )
{
log.warn( buildReadErrorMessage( offsets, offsetCursor, logVersion ), t );
monitor.transactionalLogRecordReadFailure( t, offsets, offsetCursor, logVersion );
}

if ( channel.getVersion() != logVersion )
Expand Down
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.transaction.log.reverse;

public interface ReversedTransactionCursorMonitor
{
void transactionalLogRecordReadFailure( Throwable t, long[] offsets, int offsetCursor, long logVersion );
}
Expand Up @@ -156,7 +156,8 @@ public void shouldRecoverExistingData() throws Exception
LogFile logFile = life.add( new PhysicalLogFile( fileSystemRule.get(), logFiles, 50,
transactionIdStore::getLastCommittedTransactionId, logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader,
monitors );
TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
life.add( new Recovery( new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
Expand Down Expand Up @@ -259,7 +260,7 @@ public void shouldSeeThatACleanDatabaseShouldNotRequireRecovery() throws Excepti
LogFile logFile = life.add( new PhysicalLogFile( fileSystemRule.get(), logFiles, 50,
transactionIdStore::getLastCommittedTransactionId, logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, monitors );
TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
life.add( new Recovery( new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
Expand Down Expand Up @@ -399,7 +400,7 @@ private boolean recover( File storeDir, PhysicalLogFiles logFiles )
LogFile logFile = life.add( new PhysicalLogFile( fileSystemRule.get(), logFiles, 50,
transactionIdStore::getLastCommittedTransactionId, logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, logService );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader, monitors );
TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemRule.get() );
life.add( new Recovery( new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore, txStore, NO_MONITOR )
{
Expand Down

0 comments on commit 5b8f005

Please sign in to comment.