Skip to content

Commit

Permalink
Separated recovery visitor from DefaultRecoverySPI
Browse files Browse the repository at this point in the history
and puts more weight on getRecoveryVisitor() method by renaming to
startRecovery(). This way it makes more sense to add more responsibilities to it.
  • Loading branch information
tinwelint committed Sep 13, 2016
1 parent 5df3194 commit 6426197
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 27 deletions.
Expand Up @@ -738,7 +738,7 @@ private void buildRecovery(
final LatestCheckPointFinder checkPointFinder = final LatestCheckPointFinder checkPointFinder =
new LatestCheckPointFinder( logFiles, fileSystemAbstraction, logEntryReader ); new LatestCheckPointFinder( logFiles, fileSystemAbstraction, logEntryReader );
Recovery.SPI spi = new DefaultRecoverySPI( Recovery.SPI spi = new DefaultRecoverySPI(
storageEngine, logFiles, fileSystemAbstraction, logVersionRepository, storageEngine, logVersionRepository,
checkPointFinder, transactionIdStore, logicalTransactionStore ); checkPointFinder, transactionIdStore, logicalTransactionStore );
Recovery recovery = new Recovery( spi, recoveryMonitor ); Recovery recovery = new Recovery( spi, recoveryMonitor );
monitors.addMonitorListener( new Recovery.Monitor() monitors.addMonitorListener( new Recovery.Monitor()
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException; import java.io.IOException;


import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.kernel.impl.api.TransactionQueue; import org.neo4j.kernel.impl.api.TransactionQueue;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
Expand All @@ -31,7 +30,6 @@
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository; import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor; import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
Expand All @@ -40,26 +38,22 @@
import static org.neo4j.kernel.impl.transaction.log.Commitment.NO_COMMITMENT; import static org.neo4j.kernel.impl.transaction.log.Commitment.NO_COMMITMENT;
import static org.neo4j.storageengine.api.TransactionApplicationMode.RECOVERY; import static org.neo4j.storageengine.api.TransactionApplicationMode.RECOVERY;


public class DefaultRecoverySPI implements Recovery.SPI, Visitor<CommittedTransactionRepresentation,Exception> public class DefaultRecoverySPI implements Recovery.SPI
{ {
private final PhysicalLogFiles logFiles;
private final FileSystemAbstraction fileSystemAbstraction;
private final LogVersionRepository logVersionRepository; private final LogVersionRepository logVersionRepository;
private final PositionToRecoverFrom positionToRecoverFrom; private final PositionToRecoverFrom positionToRecoverFrom;
private final StorageEngine storageEngine; private final StorageEngine storageEngine;
private final TransactionIdStore transactionIdStore; private final TransactionIdStore transactionIdStore;
private final LogicalTransactionStore logicalTransactionStore; private final LogicalTransactionStore logicalTransactionStore;
private Visitor<CommittedTransactionRepresentation,Exception> recoveryVisitor;
private TransactionQueue transactionsToApply; private TransactionQueue transactionsToApply;


public DefaultRecoverySPI( public DefaultRecoverySPI(
StorageEngine storageEngine, StorageEngine storageEngine,
PhysicalLogFiles logFiles, FileSystemAbstraction fileSystemAbstraction,
LogVersionRepository logVersionRepository, LatestCheckPointFinder checkPointFinder, LogVersionRepository logVersionRepository, LatestCheckPointFinder checkPointFinder,
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore ) TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore )
{ {
this.storageEngine = storageEngine; this.storageEngine = storageEngine;
this.logFiles = logFiles;
this.fileSystemAbstraction = fileSystemAbstraction;
this.logVersionRepository = logVersionRepository; this.logVersionRepository = logVersionRepository;
this.transactionIdStore = transactionIdStore; this.transactionIdStore = transactionIdStore;
this.logicalTransactionStore = logicalTransactionStore; this.logicalTransactionStore = logicalTransactionStore;
Expand All @@ -80,7 +74,7 @@ public LogPosition getPositionToRecoverFrom() throws IOException
} }


@Override @Override
public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor() public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{ {
// Calling this method means that recovery is required, tell storage engine about it // Calling this method means that recovery is required, tell storage engine about it
// This method will be called before recovery actually starts and so will ensure that // This method will be called before recovery actually starts and so will ensure that
Expand All @@ -90,8 +84,9 @@ public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor(
storageEngine.prepareForRecoveryRequired(); storageEngine.prepareForRecoveryRequired();


transactionsToApply = new TransactionQueue( 10_000, (first,last) -> storageEngine.apply( first, RECOVERY ) ); transactionsToApply = new TransactionQueue( 10_000, (first,last) -> storageEngine.apply( first, RECOVERY ) );
recoveryVisitor = new RecoveryVisitor( transactionsToApply );


return this; return recoveryVisitor;
} }


@Override @Override
Expand All @@ -114,14 +109,24 @@ public void allTransactionsRecovered( CommittedTransactionRepresentation lastRec
// TODO: Also truncate last log after last known position // TODO: Also truncate last log after last known position
} }


@Override static class RecoveryVisitor implements Visitor<CommittedTransactionRepresentation,Exception>
public boolean visit( CommittedTransactionRepresentation transaction ) throws Exception
{ {
TransactionRepresentation txRepresentation = transaction.getTransactionRepresentation(); private final TransactionQueue transactionsToApply;
long txId = transaction.getCommitEntry().getTxId();
TransactionToApply tx = new TransactionToApply( txRepresentation, txId ); public RecoveryVisitor( TransactionQueue transactionsToApply )
tx.commitment( NO_COMMITMENT, txId ); {
transactionsToApply.queue( tx ); this.transactionsToApply = transactionsToApply;
return false; }

@Override
public boolean visit( CommittedTransactionRepresentation transaction ) throws Exception
{
TransactionRepresentation txRepresentation = transaction.getTransactionRepresentation();
long txId = transaction.getCommitEntry().getTxId();
TransactionToApply tx = new TransactionToApply( txRepresentation, txId );
tx.commitment( NO_COMMITMENT, txId );
transactionsToApply.queue( tx );
return false;
}
} }
} }
Expand Up @@ -56,7 +56,7 @@ public interface SPI


LogPosition getPositionToRecoverFrom() throws IOException; LogPosition getPositionToRecoverFrom() throws IOException;


Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor(); Visitor<CommittedTransactionRepresentation,Exception> startRecovery();


void allTransactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction, void allTransactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
LogPosition positionAfterLastRecoveredTransaction ) throws Exception; LogPosition positionAfterLastRecoveredTransaction ) throws Exception;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void init() throws Throwable


LogPosition recoveryToPosition; LogPosition recoveryToPosition;
CommittedTransactionRepresentation lastTransaction = null; CommittedTransactionRepresentation lastTransaction = null;
Visitor<CommittedTransactionRepresentation,Exception> recoveryVisitor = spi.getRecoveryVisitor(); Visitor<CommittedTransactionRepresentation,Exception> recoveryVisitor = spi.startRecovery();
try ( TransactionCursor transactionsToRecover = spi.getTransactions( recoveryFromPosition ) ) try ( TransactionCursor transactionsToRecover = spi.getTransactions( recoveryFromPosition ) )
{ {
while ( transactionsToRecover.next() ) while ( transactionsToRecover.next() )
Expand Down
Expand Up @@ -150,15 +150,15 @@ public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) th
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader ); LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );


life.add( new Recovery( new DefaultRecoverySPI( storageEngine, life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore ) logVersionRepository, finder, transactionIdStore, txStore )
{ {
private int nr = 0; private int nr = 0;


@Override @Override
public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor() public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{ {
recoveryRequired.set( true ); recoveryRequired.set( true );
final Visitor<CommittedTransactionRepresentation,Exception> actual = super.getRecoveryVisitor(); final Visitor<CommittedTransactionRepresentation,Exception> actual = super.startRecovery();
return new Visitor<CommittedTransactionRepresentation,Exception>() return new Visitor<CommittedTransactionRepresentation,Exception>()
{ {
@Override @Override
Expand Down Expand Up @@ -240,10 +240,10 @@ public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) th
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader ); LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );


life.add( new Recovery( new DefaultRecoverySPI( storageEngine, life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore ) logVersionRepository, finder, transactionIdStore, txStore )
{ {
@Override @Override
public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor() public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{ {
fail( "Recovery should not be required" ); fail( "Recovery should not be required" );
return null; // <-- to satisfy the compiler return null; // <-- to satisfy the compiler
Expand Down
Expand Up @@ -154,7 +154,7 @@ public void forceEverything()
} }


@Override @Override
public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor() public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{ {
recoveryRequired.set( true ); recoveryRequired.set( true );
return visitor; return visitor;
Expand Down

0 comments on commit 6426197

Please sign in to comment.