Skip to content

Commit

Permalink
Rename and extract recovery related interfaces and services.
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Oct 3, 2017
1 parent 7f07792 commit eb04e08
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 97 deletions.
Expand Up @@ -148,11 +148,13 @@
import org.neo4j.kernel.lifecycle.Lifecycles; import org.neo4j.kernel.lifecycle.Lifecycles;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.monitoring.tracing.Tracers; import org.neo4j.kernel.monitoring.tracing.Tracers;
import org.neo4j.kernel.recovery.DefaultRecoverySPI; import org.neo4j.kernel.recovery.DefaultRecoveryService;
import org.neo4j.kernel.recovery.LogTailScanner; import org.neo4j.kernel.recovery.LogTailScanner;
import org.neo4j.kernel.recovery.LoggingLogTailScannerMonitor; import org.neo4j.kernel.recovery.LoggingLogTailScannerMonitor;
import org.neo4j.kernel.recovery.PositionToRecoverFrom; import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.kernel.recovery.Recovery; import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.kernel.recovery.RecoveryMonitor;
import org.neo4j.kernel.recovery.RecoveryService;
import org.neo4j.kernel.recovery.TransactionLogPruner; import org.neo4j.kernel.recovery.TransactionLogPruner;
import org.neo4j.kernel.spi.explicitindex.IndexImplementation; import org.neo4j.kernel.spi.explicitindex.IndexImplementation;
import org.neo4j.kernel.spi.explicitindex.IndexProviders; import org.neo4j.kernel.spi.explicitindex.IndexProviders;
Expand Down Expand Up @@ -463,7 +465,7 @@ public void start() throws IOException
buildRecovery( fs, buildRecovery( fs,
transactionIdStore, transactionIdStore,
tailScanner, tailScanner,
monitors.newMonitor( Recovery.Monitor.class ), monitors.newMonitor( RecoveryMonitor.class ),
monitors.newMonitor( PositionToRecoverFrom.Monitor.class ), monitors.newMonitor( PositionToRecoverFrom.Monitor.class ),
logFiles, startupStatistics, logFiles, startupStatistics,
storageEngine, transactionLogModule.logicalTransactionStore() storageEngine, transactionLogModule.logicalTransactionStore()
Expand Down Expand Up @@ -684,14 +686,15 @@ private void buildRecovery(
final FileSystemAbstraction fileSystemAbstraction, final FileSystemAbstraction fileSystemAbstraction,
TransactionIdStore transactionIdStore, TransactionIdStore transactionIdStore,
LogTailScanner tailScanner, LogTailScanner tailScanner,
Recovery.Monitor recoveryMonitor, RecoveryMonitor recoveryMonitor,
PositionToRecoverFrom.Monitor positionMonitor, PositionToRecoverFrom.Monitor positionMonitor,
final PhysicalLogFiles logFiles, final PhysicalLogFiles logFiles,
final StartupStatisticsProvider startupStatistics, final StartupStatisticsProvider startupStatistics,
StorageEngine storageEngine, StorageEngine storageEngine,
LogicalTransactionStore logicalTransactionStore ) LogicalTransactionStore logicalTransactionStore )
{ {
Recovery.SPI spi = new DefaultRecoverySPI( storageEngine, tailScanner, transactionIdStore, logicalTransactionStore, positionMonitor ); RecoveryService
spi = new DefaultRecoveryService( storageEngine, tailScanner, transactionIdStore, logicalTransactionStore, positionMonitor );
TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemAbstraction ); TransactionLogPruner logPruner = new TransactionLogPruner( storeDir, logFiles, fileSystemAbstraction );
Recovery recovery = new Recovery( spi, startupStatistics, logPruner, recoveryMonitor ); Recovery recovery = new Recovery( spi, startupStatistics, logPruner, recoveryMonitor );
life.add( recovery ); life.add( recovery );
Expand Down
Expand Up @@ -24,15 +24,14 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation; import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.recovery.PositionToRecoverFrom; import org.neo4j.kernel.recovery.PositionToRecoverFrom;
import org.neo4j.kernel.recovery.Recovery; import org.neo4j.kernel.recovery.RecoveryMonitor;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import static java.lang.String.format; import static java.lang.String.format;


public class LoggingLogFileMonitor implements public class LoggingLogFileMonitor implements
PhysicalLogFile.Monitor, PhysicalLogFile.Monitor,
LogRotation.Monitor, LogRotation.Monitor, RecoveryMonitor,
Recovery.Monitor,
PositionToRecoverFrom.Monitor PositionToRecoverFrom.Monitor
{ {
private long firstTransactionRecovered = -1; private long firstTransactionRecovered = -1;
Expand Down
Expand Up @@ -31,20 +31,19 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.recovery.Recovery.RecoveryApplier;
import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.storageengine.api.TransactionApplicationMode;


import static org.neo4j.kernel.impl.transaction.log.Commitment.NO_COMMITMENT; import static org.neo4j.kernel.impl.transaction.log.Commitment.NO_COMMITMENT;


public class DefaultRecoverySPI implements Recovery.SPI public class DefaultRecoveryService implements RecoveryService
{ {
private final PositionToRecoverFrom positionToRecoverFrom; private final PositionToRecoverFrom positionToRecoverFrom;
private final StorageEngine storageEngine; private final StorageEngine storageEngine;
private final TransactionIdStore transactionIdStore; private final TransactionIdStore transactionIdStore;
private final LogicalTransactionStore logicalTransactionStore; private final LogicalTransactionStore logicalTransactionStore;


public DefaultRecoverySPI( public DefaultRecoveryService(
StorageEngine storageEngine, StorageEngine storageEngine,
LogTailScanner logTailScanner, LogTailScanner logTailScanner,
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore, TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore,
Expand Down
Expand Up @@ -19,17 +19,13 @@
*/ */
package org.neo4j.kernel.recovery; package org.neo4j.kernel.recovery;


import java.io.IOException;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.impl.core.StartupStatisticsProvider; import org.neo4j.kernel.impl.core.StartupStatisticsProvider;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor; import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.storageengine.api.TransactionApplicationMode;


import static org.neo4j.storageengine.api.TransactionApplicationMode.RECOVERY; import static org.neo4j.storageengine.api.TransactionApplicationMode.RECOVERY;
import static org.neo4j.storageengine.api.TransactionApplicationMode.REVERSE_RECOVERY; import static org.neo4j.storageengine.api.TransactionApplicationMode.REVERSE_RECOVERY;
Expand All @@ -40,67 +36,17 @@
*/ */
public class Recovery extends LifecycleAdapter public class Recovery extends LifecycleAdapter
{ {
// TODO
public interface Monitor
{
default void recoveryRequired( LogPosition recoveryPosition )
{ // no-op by default
}

default void transactionRecovered( long txId )
{ // no-op by default
}

default void recoveryCompleted( int numberOfRecoveredTransactions )
{ // no-op by default
}

default void reverseStoreRecoveryCompleted( long lowestRecoveredTxId )
{ // no-op by default
}

default void failToRecoverTransactionsAfterCommit( Throwable t, LogEntryCommit commitEntry,
LogPosition recoveryToPosition )
{
// no-op
}

default void failToRecoverTransactionsAfterPosition( Throwable t, LogPosition recoveryFromPosition )
{
// no-op
}
}

public interface RecoveryApplier extends Visitor<CommittedTransactionRepresentation,Exception>, AutoCloseable
{
}

public interface SPI
{
TransactionCursor getTransactions( LogPosition recoveryFromPosition ) throws IOException;

TransactionCursor getTransactionsInReverseOrder( LogPosition recoveryFromPosition ) throws IOException;

LogPosition getPositionToRecoverFrom() throws IOException;

void startRecovery();

RecoveryApplier getRecoveryApplier( TransactionApplicationMode mode ) throws Exception;

void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
LogPosition positionAfterLastRecoveredTransaction );
}


private final SPI spi; private final RecoveryService recoveryService;
private final Monitor monitor; private final RecoveryMonitor monitor;
private final StartupStatisticsProvider startupStatistics; private final StartupStatisticsProvider startupStatistics;
private final TransactionLogPruner logPruner; private final TransactionLogPruner logPruner;
private int numberOfRecoveredTransactions; private int numberOfRecoveredTransactions;


public Recovery( SPI spi, StartupStatisticsProvider startupStatistics, TransactionLogPruner logPruner, public Recovery( RecoveryService recoveryService, StartupStatisticsProvider startupStatistics,
Monitor monitor ) TransactionLogPruner logPruner, RecoveryMonitor monitor )
{ {
this.spi = spi; this.recoveryService = recoveryService;
this.monitor = monitor; this.monitor = monitor;
this.startupStatistics = startupStatistics; this.startupStatistics = startupStatistics;
this.logPruner = logPruner; this.logPruner = logPruner;
Expand All @@ -109,23 +55,23 @@ public Recovery( SPI spi, StartupStatisticsProvider startupStatistics, Transacti
@Override @Override
public void init() throws Throwable public void init() throws Throwable
{ {
LogPosition recoveryFromPosition = spi.getPositionToRecoverFrom(); LogPosition recoveryFromPosition = recoveryService.getPositionToRecoverFrom();
if ( LogPosition.UNSPECIFIED.equals( recoveryFromPosition ) ) if ( LogPosition.UNSPECIFIED.equals( recoveryFromPosition ) )
{ {
return; return;
} }


monitor.recoveryRequired( recoveryFromPosition ); monitor.recoveryRequired( recoveryFromPosition );
spi.startRecovery(); recoveryService.startRecovery();


LogPosition recoveryToPosition = recoveryFromPosition; LogPosition recoveryToPosition = recoveryFromPosition;
CommittedTransactionRepresentation lastTransaction = null; CommittedTransactionRepresentation lastTransaction = null;
CommittedTransactionRepresentation lastReversedTransaction = null; CommittedTransactionRepresentation lastReversedTransaction = null;
try try
{ {
long lowestRecoveredTxId = TransactionIdStore.BASE_TX_ID; long lowestRecoveredTxId = TransactionIdStore.BASE_TX_ID;
try ( TransactionCursor transactionsToRecover = spi.getTransactionsInReverseOrder( recoveryFromPosition ); try ( TransactionCursor transactionsToRecover = recoveryService.getTransactionsInReverseOrder( recoveryFromPosition );
RecoveryApplier recoveryVisitor = spi.getRecoveryApplier( REVERSE_RECOVERY ) ) RecoveryApplier recoveryVisitor = recoveryService.getRecoveryApplier( REVERSE_RECOVERY ) )
{ {
while ( transactionsToRecover.next() ) while ( transactionsToRecover.next() )
{ {
Expand All @@ -141,8 +87,8 @@ public void init() throws Throwable


monitor.reverseStoreRecoveryCompleted( lowestRecoveredTxId ); monitor.reverseStoreRecoveryCompleted( lowestRecoveredTxId );


try ( TransactionCursor transactionsToRecover = spi.getTransactions( recoveryFromPosition ); try ( TransactionCursor transactionsToRecover = recoveryService.getTransactions( recoveryFromPosition );
RecoveryApplier recoveryVisitor = spi.getRecoveryApplier( RECOVERY ) ) RecoveryApplier recoveryVisitor = recoveryService.getRecoveryApplier( RECOVERY ) )
{ {
while ( transactionsToRecover.next() ) while ( transactionsToRecover.next() )
{ {
Expand Down Expand Up @@ -170,7 +116,7 @@ public void init() throws Throwable
} }
logPruner.prune( recoveryToPosition ); logPruner.prune( recoveryToPosition );


spi.transactionsRecovered( lastTransaction, recoveryToPosition ); recoveryService.transactionsRecovered( lastTransaction, recoveryToPosition );
startupStatistics.setNumberOfRecoveredTransactions( numberOfRecoveredTransactions ); startupStatistics.setNumberOfRecoveredTransactions( numberOfRecoveredTransactions );
monitor.recoveryCompleted( numberOfRecoveredTransactions ); monitor.recoveryCompleted( numberOfRecoveredTransactions );
} }
Expand Down
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.recovery;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;

/**
* Recovery transaction applier that will apply all recovered transaction to underlying store.
*/
public interface RecoveryApplier extends Visitor<CommittedTransactionRepresentation,Exception>, AutoCloseable
{
}
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.recovery;

import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryCommit;

public interface RecoveryMonitor
{
default void recoveryRequired( LogPosition recoveryPosition )
{
// noop
}

default void transactionRecovered( long txId )
{
//noop
}

default void recoveryCompleted( int numberOfRecoveredTransactions )
{
//noop
}

default void reverseStoreRecoveryCompleted( long lowestRecoveredTxId )
{
//noop
}

default void failToRecoverTransactionsAfterCommit( Throwable t, LogEntryCommit commitEntry,
LogPosition recoveryToPosition )
{
//noop
}

default void failToRecoverTransactionsAfterPosition( Throwable t, LogPosition recoveryFromPosition )
{
//noop
}
}
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.recovery;

import java.io.IOException;

import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.storageengine.api.TransactionApplicationMode;

public interface RecoveryService
{
void startRecovery();

TransactionCursor getTransactions( LogPosition recoveryFromPosition ) throws IOException;

TransactionCursor getTransactionsInReverseOrder( LogPosition recoveryFromPosition ) throws IOException;

LogPosition getPositionToRecoverFrom() throws IOException;

RecoveryApplier getRecoveryApplier( TransactionApplicationMode mode ) throws Exception;

void transactionsRecovered( CommittedTransactionRepresentation lastRecoveredTransaction,
LogPosition positionAfterLastRecoveredTransaction );
}
Expand Up @@ -68,7 +68,6 @@
import org.neo4j.kernel.lifecycle.Lifespan; import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.recovery.LogTailScanner; import org.neo4j.kernel.recovery.LogTailScanner;
import org.neo4j.kernel.recovery.Recovery;
import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.TestGraphDatabaseFactory;
Expand Down Expand Up @@ -490,7 +489,7 @@ public void writeStartEntry( int masterId, int authorId, long timeWritten, long
} }
} }


private static class RecoveryMonitor implements Recovery.Monitor private static class RecoveryMonitor implements org.neo4j.kernel.recovery.RecoveryMonitor
{ {
private List<Long> recoveredTransactions = new ArrayList<>(); private List<Long> recoveredTransactions = new ArrayList<>();
private int numberOfRecoveredTransactions; private int numberOfRecoveredTransactions;
Expand Down

0 comments on commit eb04e08

Please sign in to comment.