Skip to content

Commit

Permalink
Switch recovery to use monitors to perform logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Oct 3, 2017
1 parent 364b4f1 commit 7f07792
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 26 deletions.
Expand Up @@ -693,7 +693,7 @@ private void buildRecovery(
{
Recovery.SPI spi = new DefaultRecoverySPI( storageEngine, tailScanner, transactionIdStore, logicalTransactionStore, positionMonitor );
TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemAbstraction );
Recovery recovery = new Recovery( spi, startupStatistics, logPruner, logService, recoveryMonitor );
Recovery recovery = new Recovery( spi, startupStatistics, logPruner, recoveryMonitor );
life.add( recovery );
}

Expand Down Expand Up @@ -725,7 +725,7 @@ private NeoStoreKernelModule buildKernel( TransactionAppender appender,

StatementOperationParts statementOperationParts = dependencies.satisfyDependency(
buildStatementOperations( storeLayer, autoIndexing,
constraintIndexCreator, databaseSchemaState, guard, explicitIndexStore ) );
constraintIndexCreator, databaseSchemaState, explicitIndexStore ) );

TransactionHooks hooks = new TransactionHooks();
KernelTransactions kernelTransactions = life.add( new KernelTransactions( statementLocksFactory,
Expand Down Expand Up @@ -851,10 +851,9 @@ public DependencyResolver getDependencyResolver()
return dependencies;
}

private StatementOperationParts buildStatementOperations(
StoreReadLayer storeReadLayer, AutoIndexing autoIndexing,
private StatementOperationParts buildStatementOperations( StoreReadLayer storeReadLayer, AutoIndexing autoIndexing,
ConstraintIndexCreator constraintIndexCreator, DatabaseSchemaState databaseSchemaState,
Guard guard, ExplicitIndexStore explicitIndexStore )
ExplicitIndexStore explicitIndexStore )
{
// The passed in StoreReadLayer is the bottom most layer: Read-access to committed data.
// To it we add:
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.File;

import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.kernel.recovery.Recovery;
Expand Down Expand Up @@ -63,6 +64,21 @@ public void recoveryCompleted( int numberOfRecoveredTransactions )
}
}

@Override
public void failToRecoverTransactionsAfterCommit( Throwable t, LogEntryCommit commitEntry, LogPosition recoveryToPosition )
{
log.warn( format( "Fail to recover all transactions. Last recoverable transaction id:%d, committed " +
"at:%d. Any later transaction after %s are unreadable and will be truncated.",
commitEntry.getTxId(), commitEntry.getTimeWritten(), recoveryToPosition ), t );
}

@Override
public void failToRecoverTransactionsAfterPosition( Throwable t, LogPosition recoveryFromPosition )
{
log.warn( format( "Fail to recover all transactions. Any later transactions after position %s are " +
"unreadable and will be truncated.", recoveryFromPosition ), t );
}

@Override
public void startedRotating( long currentVersion )
{
Expand Down
Expand Up @@ -319,8 +319,7 @@ public LogTailInformation( boolean recordAfterCheckpoint, long firstTxIdAfterLas
latestLogEntryVersion );
}

public LogTailInformation( CheckPoint lastCheckPoint, boolean recordAfterCheckpoint,
long firstTxIdAfterLastCheckPoint,
LogTailInformation( CheckPoint lastCheckPoint, boolean recordAfterCheckpoint, long firstTxIdAfterLastCheckPoint,
long oldestLogVersionFound, long currentLogVersion, LogEntryVersion latestLogEntryVersion )
{
this.lastCheckPoint = lastCheckPoint;
Expand Down
Expand Up @@ -23,17 +23,14 @@

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.impl.core.StartupStatisticsProvider;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.storageengine.api.TransactionApplicationMode;

import static java.lang.String.format;
import static org.neo4j.storageengine.api.TransactionApplicationMode.RECOVERY;
import static org.neo4j.storageengine.api.TransactionApplicationMode.REVERSE_RECOVERY;

Expand All @@ -43,6 +40,7 @@
*/
public class Recovery extends LifecycleAdapter
{
// TODO
public interface Monitor
{
default void recoveryRequired( LogPosition recoveryPosition )
Expand All @@ -60,6 +58,17 @@ default void recoveryCompleted( int numberOfRecoveredTransactions )
default void reverseStoreRecoveryCompleted( long lowestRecoveredTxId )
{ // no-op by default
}

default void failToRecoverTransactionsAfterCommit( Throwable t, LogEntryCommit commitEntry,
LogPosition recoveryToPosition )
{
// no-op
}

default void failToRecoverTransactionsAfterPosition( Throwable t, LogPosition recoveryFromPosition )
{
// no-op
}
}

public interface RecoveryApplier extends Visitor<CommittedTransactionRepresentation,Exception>, AutoCloseable
Expand All @@ -86,17 +95,15 @@ void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTran
private final Monitor monitor;
private final StartupStatisticsProvider startupStatistics;
private final TransactionLogPruner logPruner;
private final Log log;
private int numberOfRecoveredTransactions;

public Recovery( SPI spi, StartupStatisticsProvider startupStatistics, TransactionLogPruner logPruner,
LogService logService, Monitor monitor )
Monitor monitor )
{
this.spi = spi;
this.monitor = monitor;
this.startupStatistics = startupStatistics;
this.logPruner = logPruner;
this.log = logService.getInternalLog( Recovery.class );
}

@Override
Expand Down Expand Up @@ -153,14 +160,11 @@ public void init() throws Throwable
if ( lastTransaction != null )
{
LogEntryCommit commitEntry = lastTransaction.getCommitEntry();
log.warn( format( "Fail to recover all transactions. Last recoverable transaction id:%d, committed " +
"at:%d. Any later transaction after %s are unreadable and will be truncated.",
commitEntry.getTxId(), commitEntry.getTimeWritten(), recoveryToPosition ), e );
monitor.failToRecoverTransactionsAfterCommit( e, commitEntry, recoveryToPosition );
}
else
{
log.warn( format( "Fail to recover all transactions. Any later transactions after position %s are " +
"unreadable and will be truncated.", recoveryFromPosition ), e );
monitor.failToRecoverTransactionsAfterPosition( e, recoveryFromPosition );
recoveryToPosition = recoveryFromPosition;
}
}
Expand Down
Expand Up @@ -207,7 +207,7 @@ public boolean visit( CommittedTransactionRepresentation tx ) throws Exception
}
};
}
}, new StartupStatisticsProvider(), logPruner, new SimpleLogService( logProvider ), monitor ) );
}, new StartupStatisticsProvider(), logPruner, monitor ) );

life.start();

Expand Down Expand Up @@ -271,8 +271,7 @@ public void startRecovery()
{
fail( "Recovery should not be required" );
}
}, new StartupStatisticsProvider(), logPruner, new SimpleLogService( logProvider ),
monitor ));
}, new StartupStatisticsProvider(), logPruner, monitor ));

life.start();

Expand Down Expand Up @@ -414,8 +413,7 @@ public void startRecovery()
{
recoveryRequired.set( true );
}
}, new StartupStatisticsProvider(), logPruner, new SimpleLogService( logProvider ),
monitor ) );
}, new StartupStatisticsProvider(), logPruner, monitor ) );

life.start();
}
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.core.StartupStatisticsProvider;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.logging.SimpleLogService;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.DeadSimpleTransactionIdStore;
Expand Down Expand Up @@ -236,8 +235,7 @@ public void transactionsRecovered( CommittedTransactionRepresentation lastRecove
LogPosition positionAfterLastRecoveredTransaction )
{
}
}, new StartupStatisticsProvider(), logPruner, new SimpleLogService( logProvider ),
mock( Recovery.Monitor.class ) ) );
}, new StartupStatisticsProvider(), logPruner, mock( Recovery.Monitor.class ) ) );

// WHEN
try
Expand Down

0 comments on commit 7f07792

Please sign in to comment.