Skip to content

Commit

Permalink
Renamed LatestCheckPointFinder to LogTailScanner starting to generify it
Browse files Browse the repository at this point in the history
  • Loading branch information
klaren committed Sep 5, 2017
1 parent 8156799 commit 8ba01f9
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 160 deletions.
Expand Up @@ -102,6 +102,7 @@
import org.neo4j.kernel.impl.transaction.log.LogFileInformation; import org.neo4j.kernel.impl.transaction.log.LogFileInformation;
import org.neo4j.kernel.impl.transaction.log.LogHeaderCache; import org.neo4j.kernel.impl.transaction.log.LogHeaderCache;
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository; import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LoggingLogFileMonitor; import org.neo4j.kernel.impl.transaction.log.LoggingLogFileMonitor;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
Expand Down Expand Up @@ -148,7 +149,6 @@
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.monitoring.tracing.Tracers; import org.neo4j.kernel.monitoring.tracing.Tracers;
import org.neo4j.kernel.recovery.DefaultRecoverySPI; import org.neo4j.kernel.recovery.DefaultRecoverySPI;
import org.neo4j.kernel.recovery.LatestCheckPointFinder;
import org.neo4j.kernel.recovery.PositionToRecoverFrom; import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.kernel.recovery.Recovery; import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.kernel.spi.legacyindex.IndexImplementation; import org.neo4j.kernel.spi.legacyindex.IndexImplementation;
Expand Down Expand Up @@ -691,11 +691,11 @@ private void buildRecovery(
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader, LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader,
LogicalTransactionStore logicalTransactionStore ) LogicalTransactionStore logicalTransactionStore )
{ {
final LatestCheckPointFinder checkPointFinder = final LogTailScanner logTailScanner =
new LatestCheckPointFinder( logFiles, fileSystemAbstraction, logEntryReader ); new LogTailScanner( logFiles, fileSystemAbstraction, logEntryReader );
Recovery.SPI spi = new DefaultRecoverySPI( Recovery.SPI spi = new DefaultRecoverySPI(
storageEngine, logFiles, fileSystemAbstraction, logVersionRepository, storageEngine, logFiles, fileSystemAbstraction, logVersionRepository,
checkPointFinder, transactionIdStore, logicalTransactionStore, positionMonitor ); logTailScanner, transactionIdStore, logicalTransactionStore, positionMonitor );
Recovery recovery = new Recovery( spi, recoveryMonitor ); Recovery recovery = new Recovery( spi, recoveryMonitor );
monitors.addMonitorListener( new Recovery.Monitor() monitors.addMonitorListener( new Recovery.Monitor()
{ {
Expand Down
Expand Up @@ -27,11 +27,11 @@
import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.NeoStores; import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.recovery.LatestCheckPointFinder;
import org.neo4j.kernel.recovery.PositionToRecoverFrom; import org.neo4j.kernel.recovery.PositionToRecoverFrom;


import static org.neo4j.kernel.recovery.PositionToRecoverFrom.NO_MONITOR; import static org.neo4j.kernel.recovery.PositionToRecoverFrom.NO_MONITOR;
Expand Down Expand Up @@ -67,7 +67,7 @@ public boolean isRecoveryRequiredAt( File dataDir ) throws IOException


LogEntryReader<ReadableClosablePositionAwareChannel> reader = new VersionAwareLogEntryReader<>(); LogEntryReader<ReadableClosablePositionAwareChannel> reader = new VersionAwareLogEntryReader<>();


LatestCheckPointFinder finder = new LatestCheckPointFinder( logFiles, fs, reader ); LogTailScanner finder = new LogTailScanner( logFiles, fs, reader );
return new PositionToRecoverFrom( finder, NO_MONITOR ).apply( logVersion ) != LogPosition.UNSPECIFIED; return new PositionToRecoverFrom( finder, NO_MONITOR ).apply( logVersion ) != LogPosition.UNSPECIFIED;
} }
} }
Expand Up @@ -34,12 +34,11 @@
import org.neo4j.kernel.impl.storemigration.StoreUpgrader.UpgradingStoreVersionNotFoundException; import org.neo4j.kernel.impl.storemigration.StoreUpgrader.UpgradingStoreVersionNotFoundException;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result; import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result;
import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result.Outcome; import org.neo4j.kernel.impl.storemigration.StoreVersionCheck.Result.Outcome;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.recovery.LatestCheckPointFinder;
import org.neo4j.kernel.recovery.LatestCheckPointFinder.LatestCheckPoint;


/** /**
* Logic to check whether a database version is upgradable to the current version. It looks at the * Logic to check whether a database version is upgradable to the current version. It looks at the
Expand Down Expand Up @@ -139,12 +138,12 @@ private Result checkCleanShutDownByCheckPoint( File storeDirectory )
// check version // check version
PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDirectory, fs ); PhysicalLogFiles logFiles = new PhysicalLogFiles( storeDirectory, fs );
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>(); LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>();
LatestCheckPointFinder latestCheckPointFinder = LogTailScanner logTailScanner =
new LatestCheckPointFinder( logFiles, fs, logEntryReader ); new LogTailScanner( logFiles, fs, logEntryReader );
try try
{ {
LatestCheckPoint latestCheckPoint = latestCheckPointFinder.find( logFiles.getHighestLogVersion() ); LogTailScanner.LogTailInformation logTailInformation = logTailScanner.find( logFiles.getHighestLogVersion() );
if ( !latestCheckPoint.commitsAfterCheckPoint ) if ( !logTailInformation.commitsAfterLastCheckPoint )
{ {
return new Result( Result.Outcome.ok, null, null ); return new Result( Result.Outcome.ok, null, null );
} }
Expand Down
Expand Up @@ -17,49 +17,52 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.kernel.recovery; package org.neo4j.kernel.impl.transaction.log;


import java.io.IOException; import java.io.IOException;


import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.impl.transaction.log.LogEntryCursor;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFile;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.kernel.impl.transaction.log.entry.CheckPoint; import org.neo4j.kernel.impl.transaction.log.entry.CheckPoint;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntry; import org.neo4j.kernel.impl.transaction.log.entry.LogEntry;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryVersion;


import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS; import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static org.neo4j.kernel.impl.transaction.log.LogVersionRepository.INITIAL_LOG_VERSION; import static org.neo4j.kernel.impl.transaction.log.LogVersionRepository.INITIAL_LOG_VERSION;


public class LatestCheckPointFinder /**
* This class collects information about the latest entries in the transaction log. Since the only way we have to collect
* said information is to scan the transaction log from beginning to end, which is costly, we do this once and save the
* result for others to consume.
* <p>
* Due to the nature of transaction logs and log rotation, a single transaction log file has to be scanned forward, and
* if the required data is not found we search backwards through log file versions.
*/
public class LogTailScanner
{ {
private final PhysicalLogFiles logFiles; private final PhysicalLogFiles logFiles;
private final FileSystemAbstraction fileSystem; private final FileSystemAbstraction fileSystem;
private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader; private final LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader;


public LatestCheckPointFinder( PhysicalLogFiles logFiles, FileSystemAbstraction fileSystem, public LogTailScanner( PhysicalLogFiles logFiles, FileSystemAbstraction fileSystem,
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader ) LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader )
{ {
this.logFiles = logFiles; this.logFiles = logFiles;
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
this.logEntryReader = logEntryReader; this.logEntryReader = logEntryReader;
} }


public LatestCheckPoint find( long fromVersionBackwards ) throws IOException public LogTailInformation find( long fromVersionBackwards ) throws IOException
{ {
long version = fromVersionBackwards; long version = fromVersionBackwards;
long versionToSearchForCommits = fromVersionBackwards; long versionToSearchForCommits = fromVersionBackwards;
LogEntryStart latestStartEntry = null; LogEntryStart latestStartEntry = null;
LogEntryStart oldestStartEntry = null; LogEntryStart oldestStartEntry = null;
long oldestVersionFound = -1; long oldestVersionFound = -1;
LogEntryVersion latestLogEntryVersion = null;

while ( version >= INITIAL_LOG_VERSION ) while ( version >= INITIAL_LOG_VERSION )
{ {
LogVersionedStoreChannel channel = LogVersionedStoreChannel channel =
Expand All @@ -82,10 +85,14 @@ public LatestCheckPoint find( long fromVersionBackwards ) throws IOException
while ( cursor.next() ) while ( cursor.next() )
{ {
entry = cursor.get(); entry = cursor.get();

// Collect data about latest checkpoint
if ( entry instanceof CheckPoint ) if ( entry instanceof CheckPoint )
{ {
latestCheckPoint = entry.as(); latestCheckPoint = entry.as();
} }

// Collect data about latest commits
if ( entry instanceof LogEntryStart ) if ( entry instanceof LogEntryStart )
{ {
LogEntryStart startEntry = entry.as(); LogEntryStart startEntry = entry.as();
Expand All @@ -102,13 +109,19 @@ public LatestCheckPoint find( long fromVersionBackwards ) throws IOException
firstStartEntry = false; firstStartEntry = false;
} }
} }

// Collect data about latest entry version
if ( latestLogEntryVersion == null || version == versionToSearchForCommits )
{
latestLogEntryVersion = entry.getVersion();
}
} }
} }


if ( latestCheckPoint != null ) if ( latestCheckPoint != null )
{ {
return latestCheckPoint( fromVersionBackwards, version, latestStartEntry, oldestVersionFound, return latestCheckPoint( fromVersionBackwards, version, latestStartEntry, oldestVersionFound,
latestCheckPoint ); latestCheckPoint, latestLogEntryVersion );
} }


version--; version--;
Expand All @@ -123,14 +136,15 @@ public LatestCheckPoint find( long fromVersionBackwards ) throws IOException
boolean commitsAfterCheckPoint = oldestStartEntry != null; boolean commitsAfterCheckPoint = oldestStartEntry != null;
long firstTxAfterPosition = commitsAfterCheckPoint long firstTxAfterPosition = commitsAfterCheckPoint
? extractFirstTxIdAfterPosition( oldestStartEntry.getStartPosition(), fromVersionBackwards ) ? extractFirstTxIdAfterPosition( oldestStartEntry.getStartPosition(), fromVersionBackwards )
: LatestCheckPoint.NO_TRANSACTION_ID; : LogTailInformation.NO_TRANSACTION_ID;


return new LatestCheckPoint( null, commitsAfterCheckPoint, firstTxAfterPosition, oldestVersionFound ); return new LogTailInformation( null, commitsAfterCheckPoint, firstTxAfterPosition, oldestVersionFound,
latestLogEntryVersion );
} }


protected LatestCheckPoint latestCheckPoint( long fromVersionBackwards, long version, LogEntryStart protected LogTailInformation latestCheckPoint( long fromVersionBackwards, long version,
latestStartEntry, LogEntryStart latestStartEntry, long oldestVersionFound, CheckPoint latestCheckPoint,
long oldestVersionFound, CheckPoint latestCheckPoint ) throws IOException LogEntryVersion latestLogEntryVersion ) throws IOException
{ {
// Is the latest start entry in this log file version later than what the latest check point targets? // Is the latest start entry in this log file version later than what the latest check point targets?
LogPosition target = latestCheckPoint.getLogPosition(); LogPosition target = latestCheckPoint.getLogPosition();
Expand All @@ -143,17 +157,17 @@ protected LatestCheckPoint latestCheckPoint( long fromVersionBackwards, long ver
// This check point entry targets a previous log file. // This check point entry targets a previous log file.
// Go there and see if there's a transaction. Reader is capped to that log version. // Go there and see if there's a transaction. Reader is capped to that log version.
startEntryAfterCheckPoint = extractFirstTxIdAfterPosition( target, version ) != startEntryAfterCheckPoint = extractFirstTxIdAfterPosition( target, version ) !=
LatestCheckPoint.NO_TRANSACTION_ID; LogTailInformation.NO_TRANSACTION_ID;
} }
} }


// Extract first transaction id after check point target position. // Extract first transaction id after check point target position.
// Reader may continue into log files after the initial version. // Reader may continue into log files after the initial version.
long firstTxIdAfterCheckPoint = startEntryAfterCheckPoint long firstTxIdAfterCheckPoint = startEntryAfterCheckPoint
? extractFirstTxIdAfterPosition( target, fromVersionBackwards ) ? extractFirstTxIdAfterPosition( target, fromVersionBackwards )
: LatestCheckPoint.NO_TRANSACTION_ID; : LogTailInformation.NO_TRANSACTION_ID;
return new LatestCheckPoint( latestCheckPoint, startEntryAfterCheckPoint, return new LogTailInformation( latestCheckPoint, startEntryAfterCheckPoint,
firstTxIdAfterCheckPoint, oldestVersionFound ); firstTxIdAfterCheckPoint, oldestVersionFound, latestLogEntryVersion );
} }


/** /**
Expand All @@ -163,7 +177,7 @@ protected LatestCheckPoint latestCheckPoint( long fromVersionBackwards, long ver
* *
* @param initialPosition {@link LogPosition} to start scan from. * @param initialPosition {@link LogPosition} to start scan from.
* @param maxLogVersion max log version to scan. * @param maxLogVersion max log version to scan.
* @return txId of closes commit entry to {@code initialPosition}, or {@link LatestCheckPoint#NO_TRANSACTION_ID} * @return txId of closes commit entry to {@code initialPosition}, or {@link LogTailInformation#NO_TRANSACTION_ID}
* if not found. * if not found.
* @throws IOException on I/O error. * @throws IOException on I/O error.
*/ */
Expand Down Expand Up @@ -200,25 +214,27 @@ protected long extractFirstTxIdAfterPosition( LogPosition initialPosition, long


currentPosition = LogPosition.start( currentPosition.getLogVersion() + 1 ); currentPosition = LogPosition.start( currentPosition.getLogVersion() + 1 );
} }
return LatestCheckPoint.NO_TRANSACTION_ID; return LogTailInformation.NO_TRANSACTION_ID;
} }


public static class LatestCheckPoint public static class LogTailInformation
{ {
public static long NO_TRANSACTION_ID = -1; public static long NO_TRANSACTION_ID = -1;


public final CheckPoint checkPoint; public final CheckPoint lastCheckPoint;
public final boolean commitsAfterCheckPoint; public final boolean commitsAfterLastCheckPoint;
public final long firstTxIdAfterLastCheckPoint; public final long firstTxIdAfterLastCheckPoint;
public final long oldestLogVersionFound; public final long oldestLogVersionFound;
public final LogEntryVersion latestLogEntryVersion;


public LatestCheckPoint( CheckPoint checkPoint, boolean commitsAfterCheckPoint, public LogTailInformation( CheckPoint lastCheckPoint, boolean commitsAfterLastCheckPoint,
long firstTxIdAfterLastCheckPoint, long oldestLogVersionFound ) long firstTxIdAfterLastCheckPoint, long oldestLogVersionFound, LogEntryVersion latestLogEntryVersion )
{ {
this.checkPoint = checkPoint; this.lastCheckPoint = lastCheckPoint;
this.commitsAfterCheckPoint = commitsAfterCheckPoint; this.commitsAfterLastCheckPoint = commitsAfterLastCheckPoint;
this.firstTxIdAfterLastCheckPoint = firstTxIdAfterLastCheckPoint; this.firstTxIdAfterLastCheckPoint = firstTxIdAfterLastCheckPoint;
this.oldestLogVersionFound = oldestLogVersionFound; this.oldestLogVersionFound = oldestLogVersionFound;
this.latestLogEntryVersion = latestLogEntryVersion;
} }


@Override @Override
Expand All @@ -233,34 +249,37 @@ public boolean equals( Object o )
return false; return false;
} }


LatestCheckPoint that = (LatestCheckPoint) o; LogTailInformation that = (LogTailInformation) o;


return commitsAfterCheckPoint == that.commitsAfterCheckPoint && return commitsAfterLastCheckPoint == that.commitsAfterLastCheckPoint &&
firstTxIdAfterLastCheckPoint == that.firstTxIdAfterLastCheckPoint && firstTxIdAfterLastCheckPoint == that.firstTxIdAfterLastCheckPoint &&
oldestLogVersionFound == that.oldestLogVersionFound && oldestLogVersionFound == that.oldestLogVersionFound &&
(checkPoint == null ? that.checkPoint == null : checkPoint.equals( that.checkPoint )); (lastCheckPoint == null ? that.lastCheckPoint == null : lastCheckPoint
.equals( that.lastCheckPoint )) &&
latestLogEntryVersion.equals( that.latestLogEntryVersion );
} }


@Override @Override
public int hashCode() public int hashCode()
{ {
int result = checkPoint != null ? checkPoint.hashCode() : 0; int result = lastCheckPoint != null ? lastCheckPoint.hashCode() : 0;
result = 31 * result + (commitsAfterCheckPoint ? 1 : 0); result = 31 * result + (commitsAfterLastCheckPoint ? 1 : 0);
if ( commitsAfterCheckPoint ) if ( commitsAfterLastCheckPoint )
{ {
result = 31 * result + Long.hashCode( firstTxIdAfterLastCheckPoint ); result = 31 * result + Long.hashCode( firstTxIdAfterLastCheckPoint );
} }
result = 31 * result + Long.hashCode( oldestLogVersionFound ); result = 31 * result + Long.hashCode( oldestLogVersionFound );
result = 31 * result + latestLogEntryVersion.hashCode();
return result; return result;
} }


@Override @Override
public String toString() public String toString()
{ {
return "LatestCheckPoint{" + return "LogTailInformation{" +
"checkPoint=" + checkPoint + "lastCheckPoint=" + lastCheckPoint +
", commitsAfterCheckPoint=" + commitsAfterCheckPoint + ", commitsAfterLastCheckPoint=" + commitsAfterLastCheckPoint +
(commitsAfterCheckPoint ? ", firstTxIdAfterLastCheckPoint=" + firstTxIdAfterLastCheckPoint : "") + (commitsAfterLastCheckPoint ? ", firstTxIdAfterLastCheckPoint=" + firstTxIdAfterLastCheckPoint : "") +
", oldestLogVersionFound=" + oldestLogVersionFound + ", oldestLogVersionFound=" + oldestLogVersionFound +
'}'; '}';
} }
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogTailScanner;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository; import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles; import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
Expand All @@ -52,7 +53,7 @@ public class DefaultRecoverySPI implements Recovery.SPI
public DefaultRecoverySPI( public DefaultRecoverySPI(
StorageEngine storageEngine, StorageEngine storageEngine,
PhysicalLogFiles logFiles, FileSystemAbstraction fs, PhysicalLogFiles logFiles, FileSystemAbstraction fs,
LogVersionRepository logVersionRepository, LatestCheckPointFinder checkPointFinder, LogVersionRepository logVersionRepository, LogTailScanner logTailScanner,
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore, TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore,
PositionToRecoverFrom.Monitor monitor ) PositionToRecoverFrom.Monitor monitor )
{ {
Expand All @@ -62,7 +63,7 @@ public DefaultRecoverySPI(
this.logVersionRepository = logVersionRepository; this.logVersionRepository = logVersionRepository;
this.transactionIdStore = transactionIdStore; this.transactionIdStore = transactionIdStore;
this.logicalTransactionStore = logicalTransactionStore; this.logicalTransactionStore = logicalTransactionStore;
this.positionToRecoverFrom = new PositionToRecoverFrom( checkPointFinder, monitor ); this.positionToRecoverFrom = new PositionToRecoverFrom( logTailScanner, monitor );
} }


@Override @Override
Expand Down

0 comments on commit 8ba01f9

Please sign in to comment.