Skip to content

Commit

Permalink
Separated recovery visitor from DefaultRecoverySPI
Browse files Browse the repository at this point in the history
and puts more weight on getRecoveryVisitor() method by renaming to
startRecovery(). This way it makes more sense to add more responsibilities to it.
  • Loading branch information
tinwelint committed Sep 12, 2016
1 parent e8a785a commit 6f8996d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 34 deletions.
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException; import java.io.IOException;


import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.RecoveryLabelScanWriterProvider; import org.neo4j.kernel.RecoveryLabelScanWriterProvider;
import org.neo4j.kernel.impl.api.RecoveryLegacyIndexApplierLookup; import org.neo4j.kernel.impl.api.RecoveryLegacyIndexApplierLookup;
import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier; import org.neo4j.kernel.impl.api.TransactionRepresentationStoreApplier;
Expand All @@ -36,15 +35,13 @@
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository; import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor; import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher; import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;

import static org.neo4j.kernel.impl.api.TransactionApplicationMode.RECOVERY; import static org.neo4j.kernel.impl.api.TransactionApplicationMode.RECOVERY;


public class DefaultRecoverySPI implements Recovery.SPI, Visitor<CommittedTransactionRepresentation,Exception> public class DefaultRecoverySPI implements Recovery.SPI
{ {
private final RecoveryLabelScanWriterProvider labelScanWriters; private final RecoveryLabelScanWriterProvider labelScanWriters;
private final RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup; private final RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup;
Expand All @@ -55,12 +52,11 @@ public class DefaultRecoverySPI implements Recovery.SPI, Visitor<CommittedTransa
private final RecoveryIndexingUpdatesValidator indexUpdatesValidator; private final RecoveryIndexingUpdatesValidator indexUpdatesValidator;
private final TransactionIdStore transactionIdStore; private final TransactionIdStore transactionIdStore;
private final LogicalTransactionStore logicalTransactionStore; private final LogicalTransactionStore logicalTransactionStore;
private final TransactionRepresentationStoreApplier storeApplier; private final Visitor<CommittedTransactionRepresentation,Exception> recoveryVisitor;


public DefaultRecoverySPI( RecoveryLabelScanWriterProvider labelScanWriters, public DefaultRecoverySPI( RecoveryLabelScanWriterProvider labelScanWriters,
RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup, RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup,
StoreFlusher storeFlusher, NeoStores neoStores, StoreFlusher storeFlusher, NeoStores neoStores,
PhysicalLogFiles logFiles, FileSystemAbstraction fileSystemAbstraction,
LogVersionRepository logVersionRepository, LatestCheckPointFinder checkPointFinder, LogVersionRepository logVersionRepository, LatestCheckPointFinder checkPointFinder,
RecoveryIndexingUpdatesValidator indexUpdatesValidator, RecoveryIndexingUpdatesValidator indexUpdatesValidator,
TransactionIdStore transactionIdStore, TransactionIdStore transactionIdStore,
Expand All @@ -75,8 +71,8 @@ public DefaultRecoverySPI( RecoveryLabelScanWriterProvider labelScanWriters,
this.indexUpdatesValidator = indexUpdatesValidator; this.indexUpdatesValidator = indexUpdatesValidator;
this.transactionIdStore = transactionIdStore; this.transactionIdStore = transactionIdStore;
this.logicalTransactionStore = logicalTransactionStore; this.logicalTransactionStore = logicalTransactionStore;
this.storeApplier = storeApplier;
this.positionToRecoverFrom = new PositionToRecoverFrom( checkPointFinder ); this.positionToRecoverFrom = new PositionToRecoverFrom( checkPointFinder );
this.recoveryVisitor = new RecoveryVisitor( storeApplier, indexUpdatesValidator );
} }


@Override @Override
Expand All @@ -102,15 +98,15 @@ public LogPosition getPositionToRecoverFrom() throws IOException
} }


@Override @Override
public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor() public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{ {
// Calling this method means that recovery is required, tell storage engine about it // Calling this method means that recovery is required, tell storage engine about it
// This method will be called before recovery actually starts and so will ensure that // This method will be called before recovery actually starts and so will ensure that
// each store is aware that recovery will be performed. At this point all the stores have // each store is aware that recovery will be performed. At this point all the stores have
// already started btw. // already started btw.
// Go and read more at {@link CommonAbstractStore#deleteIdGenerator()} // Go and read more at {@link CommonAbstractStore#deleteIdGenerator()}
neoStores.deleteIdGenerators(); neoStores.deleteIdGenerators();
return this; return recoveryVisitor;
} }


@Override @Override
Expand All @@ -132,28 +128,41 @@ public void allTransactionsRecovered( CommittedTransactionRepresentation lastRec
// TODO: Also truncate last log after last known position // TODO: Also truncate last log after last known position
} }


@Override static class RecoveryVisitor implements Visitor<CommittedTransactionRepresentation,Exception>
public boolean visit( CommittedTransactionRepresentation transaction ) throws Exception
{ {
TransactionRepresentation txRepresentation = transaction.getTransactionRepresentation(); private final TransactionRepresentationStoreApplier storeApplier;
long txId = transaction.getCommitEntry().getTxId(); private final RecoveryIndexingUpdatesValidator indexUpdatesValidator;
try ( LockGroup locks = new LockGroup();
ValidatedIndexUpdates indexUpdates = prepareIndexUpdates( txRepresentation ) ) public RecoveryVisitor( TransactionRepresentationStoreApplier storeApplier,
RecoveryIndexingUpdatesValidator indexUpdatesValidator )
{ {
storeApplier.apply( txRepresentation, indexUpdates, locks, txId, RECOVERY ); this.storeApplier = storeApplier;
this.indexUpdatesValidator = indexUpdatesValidator;
} }
return false;
}


/** @Override
* Recovery operates under a condition that all index updates are valid because otherwise they have no chance to public boolean visit( CommittedTransactionRepresentation transaction ) throws Exception
* appear in write ahead log. {
* This step is still needed though, because it is not only about validation of index sizes but also about TransactionRepresentation txRepresentation = transaction.getTransactionRepresentation();
* inferring {@link org.neo4j.kernel.api.index.NodePropertyUpdate}s from commands in transaction state and long txId = transaction.getCommitEntry().getTxId();
* grouping those by {@link org.neo4j.kernel.api.index.IndexUpdater}s. try ( LockGroup locks = new LockGroup();
*/ ValidatedIndexUpdates indexUpdates = prepareIndexUpdates( txRepresentation ) )
private ValidatedIndexUpdates prepareIndexUpdates( TransactionRepresentation txRepresentation ) throws IOException {
{ storeApplier.apply( txRepresentation, indexUpdates, locks, txId, RECOVERY );
return indexUpdatesValidator.validate( txRepresentation ); }
return false;
}

/**
* Recovery operates under a condition that all index updates are valid because otherwise they have no chance to
* appear in write ahead log.
* This step is still needed though, because it is not only about validation of index sizes but also about
* inferring {@link org.neo4j.kernel.api.index.NodePropertyUpdate}s from commands in transaction state and
* grouping those by {@link org.neo4j.kernel.api.index.IndexUpdater}s.
*/
private ValidatedIndexUpdates prepareIndexUpdates( TransactionRepresentation txRepresentation ) throws IOException
{
return indexUpdatesValidator.validate( txRepresentation );
}
} }
} }
Expand Up @@ -50,7 +50,7 @@ public interface SPI


LogPosition getPositionToRecoverFrom() throws IOException; LogPosition getPositionToRecoverFrom() throws IOException;


Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor(); Visitor<CommittedTransactionRepresentation,Exception> startRecovery();


void allTransactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction, void allTransactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
LogPosition positionAfterLastRecoveredTransaction ) throws Exception; LogPosition positionAfterLastRecoveredTransaction ) throws Exception;
Expand Down Expand Up @@ -80,7 +80,7 @@ public void init() throws Throwable


LogPosition recoveryToPosition; LogPosition recoveryToPosition;
CommittedTransactionRepresentation lastTransaction = null; CommittedTransactionRepresentation lastTransaction = null;
Visitor<CommittedTransactionRepresentation,Exception> recoveryVisitor = spi.getRecoveryVisitor(); Visitor<CommittedTransactionRepresentation,Exception> recoveryVisitor = spi.startRecovery();
try ( TransactionCursor transactionsToRecover = spi.getTransactions( recoveryFromPosition ) ) try ( TransactionCursor transactionsToRecover = spi.getTransactions( recoveryFromPosition ) )
{ {
while ( transactionsToRecover.next() ) while ( transactionsToRecover.next() )
Expand Down
Expand Up @@ -164,10 +164,10 @@ transactionIdStore, logVersionRepository, mock( PhysicalLogFile.Monitor.class ),
private int nr = 0; private int nr = 0;


@Override @Override
public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor() public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{ {
recoveryRequired.set( true ); recoveryRequired.set( true );
final Visitor<CommittedTransactionRepresentation,Exception> actual = super.getRecoveryVisitor(); final Visitor<CommittedTransactionRepresentation,Exception> actual = super.startRecovery();
return new Visitor<CommittedTransactionRepresentation,Exception>() return new Visitor<CommittedTransactionRepresentation,Exception>()
{ {
@Override @Override
Expand Down Expand Up @@ -257,7 +257,7 @@ public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) th
logVersionRepository, finder, validator, transactionIdStore, txStore, storeApplier ) logVersionRepository, finder, validator, transactionIdStore, txStore, storeApplier )
{ {
@Override @Override
public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor() public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{ {
fail( "Recovery should not be required" ); fail( "Recovery should not be required" );
return null; // <-- to satisfy the compiler return null; // <-- to satisfy the compiler
Expand Down
Expand Up @@ -146,7 +146,7 @@ public void forceEverything()
} }


@Override @Override
public Visitor<CommittedTransactionRepresentation,Exception> getRecoveryVisitor() public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{ {
recoveryRequired.set( true ); recoveryRequired.set( true );
return visitor; return visitor;
Expand Down

0 comments on commit 6f8996d

Please sign in to comment.