Skip to content

Commit

Permalink
StoreCopyServer checkpoint before full backup
Browse files Browse the repository at this point in the history
Instead of flushing store files on disk and cause a race with the
check pointer, it will ask the check pointer to try check point for
making sure we have flushed store files on disk, and after that it
will start streaming transactions starting from the transaction used
in the check point.

This should solve a failure caused by the race mentioned above where
we were trying to rotate the counts store while it was in a rotation
state.
  • Loading branch information
davidegrohmann committed Sep 21, 2015
1 parent 04061f1 commit 80e4e2f
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 51 deletions.
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.Monitors;

Expand All @@ -51,7 +51,7 @@ public interface Dependencies

NeoStoreDataSource neoStoreDataSource();

Supplier<StoreFlusher> storeFlusherSupplier();
Supplier<CheckPointer> checkPointer();

Supplier<TransactionIdStore> transactionIdStoreSupplier();

Expand Down Expand Up @@ -79,7 +79,7 @@ public Lifecycle newKernelExtension( Dependencies dependencies ) throws Throwabl
return new OnlineBackupKernelExtension( dependencies.getConfig(), dependencies.getGraphDatabaseAPI(),
dependencies.logService().getInternalLogProvider(), dependencies.monitors(),
dependencies.neoStoreDataSource(),
dependencies.storeFlusherSupplier(),
dependencies.checkPointer(),
dependencies.transactionIdStoreSupplier(),
dependencies.logicalTransactionStoreSupplier(),
dependencies.logFileInformationSupplier(),
Expand Down
Expand Up @@ -39,9 +39,9 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.LogFileInformation;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -75,7 +75,7 @@ public interface BackupProvider

public OnlineBackupKernelExtension( Config config, final GraphDatabaseAPI graphDatabaseAPI, final LogProvider logProvider,
final Monitors monitors, final NeoStoreDataSource neoStoreDataSource,
final Supplier<StoreFlusher> storeFlusherSupplier,
final Supplier<CheckPointer> checkPointerSupplier,
final Supplier<TransactionIdStore> transactionIdStoreSupplier,
final Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
final Supplier<LogFileInformation> logFileInformationSupplier,
Expand All @@ -87,12 +87,8 @@ public OnlineBackupKernelExtension( Config config, final GraphDatabaseAPI graphD
public TheBackupInterface newBackup()
{
TransactionIdStore transactionIdStore = transactionIdStoreSupplier.get();
StoreCopyServer copier = new StoreCopyServer(
transactionIdStore,
neoStoreDataSource,
storeFlusherSupplier.get(),
fileSystemAbstraction,
new File( graphDatabaseAPI.getStoreDir() ),
StoreCopyServer copier = new StoreCopyServer( neoStoreDataSource, checkPointerSupplier.get(),
fileSystemAbstraction, new File( graphDatabaseAPI.getStoreDir() ),
monitors.newMonitor( StoreCopyServer.Monitor.class ) );
LogicalTransactionStore logicalTransactionStore = logicalTransactionStoreSupplier.get();
LogFileInformation logFileInformation = logFileInformationSupplier.get();
Expand Down
Expand Up @@ -27,11 +27,11 @@
import org.neo4j.com.Response;
import org.neo4j.com.ServerFailureException;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.Format;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;

import static org.neo4j.com.RequestContext.anonymous;
import static org.neo4j.io.fs.FileUtils.getMostCanonicalFile;
Expand All @@ -47,9 +47,9 @@ public class StoreCopyServer
{
public interface Monitor
{
void startFlushingEverything();
void startTryCheckPoint();

void finishFlushingEverything();
void finishTryCheckPoint();

void startStreamingStoreFile( File file );

Expand All @@ -66,12 +66,12 @@ public interface Monitor
class Adapter implements Monitor
{
@Override
public void startFlushingEverything()
public void startTryCheckPoint()
{ // empty
}

@Override
public void finishFlushingEverything()
public void finishTryCheckPoint()
{ // empty
}

Expand Down Expand Up @@ -107,20 +107,17 @@ public void finishStreamingTransactions( long endTxId )
}
}

private final TransactionIdStore transactionIdStore;
private final NeoStoreDataSource dataSource;
private final StoreFlusher storeFlusher;
private final CheckPointer checkPointer;
private final FileSystemAbstraction fileSystem;
private final File storeDirectory;
private final Monitor monitor;

public StoreCopyServer( TransactionIdStore transactionIdStore,
NeoStoreDataSource dataSource, StoreFlusher storeFlusher, FileSystemAbstraction fileSystem,
public StoreCopyServer( NeoStoreDataSource dataSource, CheckPointer checkPointer, FileSystemAbstraction fileSystem,
File storeDirectory, Monitor monitor )
{
this.transactionIdStore = transactionIdStore;
this.dataSource = dataSource;
this.storeFlusher = storeFlusher;
this.checkPointer = checkPointer;
this.fileSystem = fileSystem;
this.storeDirectory = getMostCanonicalFile( storeDirectory );
this.monitor = monitor;
Expand All @@ -138,11 +135,10 @@ public RequestContext flushStoresAndStreamStoreFiles( StoreWriter writer, boolea
{
try
{
long lastAppliedTransaction = transactionIdStore.getLastClosedTransactionId();
monitor.startFlushingEverything();
storeFlusher.forceEverything();
monitor.finishFlushingEverything();
ByteBuffer temporaryBuffer = ByteBuffer.allocateDirect( 1024 * 1024 );
monitor.startTryCheckPoint();
long lastAppliedTransaction = checkPointer.tryCheckPoint();
monitor.finishTryCheckPoint();
ByteBuffer temporaryBuffer = ByteBuffer.allocateDirect( Format.MB );

// Copy the store files
monitor.startStreamingStoreFiles();
Expand Down
Expand Up @@ -47,8 +47,8 @@
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.entry.LogHeader;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;
import org.neo4j.kernel.impl.transaction.state.NeoStoresSupplier;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.NullLogProvider;
Expand Down Expand Up @@ -290,17 +290,17 @@ public Response<?> copyStore( StoreWriter writer )
NeoStoreDataSource neoStoreDataSource =
original.getDependencyResolver().resolveDependency( NeoStoreDataSource.class );

TransactionIdStore transactionIdStore = original.getDependencyResolver().resolveDependency(
TransactionIdStore.class );
TransactionIdStore transactionIdStore =
original.getDependencyResolver().resolveDependency( TransactionIdStore.class );

LogicalTransactionStore logicalTransactionStore = original.getDependencyResolver().resolveDependency(
LogicalTransactionStore.class );
LogicalTransactionStore logicalTransactionStore =
original.getDependencyResolver().resolveDependency( LogicalTransactionStore.class );

StoreFlusher storeFlusher = original.getDependencyResolver().resolveDependency(
StoreFlusher.class );
CheckPointer checkPointer =
original.getDependencyResolver().resolveDependency( CheckPointer.class );

RequestContext requestContext = new StoreCopyServer(transactionIdStore, neoStoreDataSource,
storeFlusher, fs, originalDir, new Monitors().newMonitor( StoreCopyServer.Monitor.class ) )
RequestContext requestContext = new StoreCopyServer( neoStoreDataSource,
checkPointer, fs, originalDir, new Monitors().newMonitor( StoreCopyServer.Monitor.class ) )
.flushStoresAndStreamStoreFiles( writer, false );

final StoreId storeId =
Expand Down
Expand Up @@ -48,7 +48,7 @@
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.tracing.CommitEvent;
import org.neo4j.kernel.monitoring.Monitors;

Expand All @@ -68,9 +68,7 @@ public class DefaultMasterImplSPI implements MasterImpl.SPI
private final Monitors monitors;

private final TransactionCommitProcess transactionCommitProcess;
private final StoreFlusher storeFlusher;
private final LogicalTransactionStore txStore;
private final TransactionIdStore transactionIdStore;
private final CheckPointer checkPointer;

public DefaultMasterImplSPI( final GraphDatabaseAPI graphDb,
FileSystemAbstraction fileSystemAbstraction,
Expand All @@ -79,7 +77,7 @@ public DefaultMasterImplSPI( final GraphDatabaseAPI graphDb,
RelationshipTypeTokenHolder relationshipTypeTokenHolder,
IdGeneratorFactory idGeneratorFactory,
TransactionCommitProcess transactionCommitProcess,
StoreFlusher storeFlusher,
CheckPointer checkPointer,
TransactionIdStore transactionIdStore,
LogicalTransactionStore logicalTransactionStore,
NeoStoreDataSource neoStoreDataSource)
Expand All @@ -88,19 +86,17 @@ public DefaultMasterImplSPI( final GraphDatabaseAPI graphDb,

// Hmm, fetching the dependencies here instead of handing them in the constructor directly feels bad,
// but it seems like there's some intricate usage and need for the db's dependency resolver.
this.transactionIdStore = transactionIdStore;
this.fileSystem = fileSystemAbstraction;
this.labels = labels;
this.propertyKeyTokenHolder = propertyKeyTokenHolder;
this.relationshipTypeTokenHolder = relationshipTypeTokenHolder;
this.idGeneratorFactory = idGeneratorFactory;
this.transactionCommitProcess = transactionCommitProcess;
this.storeFlusher = storeFlusher;
this.checkPointer = checkPointer;
this.neoStoreDataSource = neoStoreDataSource;
this.storeDir = new File( graphDb.getStoreDir() );
this.txStore = logicalTransactionStore;
this.txChecksumLookup = new TransactionChecksumLookup( transactionIdStore, txStore );
this.responsePacker = new ResponsePacker( txStore, transactionIdStore, new Supplier<StoreId>()
this.txChecksumLookup = new TransactionChecksumLookup( transactionIdStore, logicalTransactionStore );
this.responsePacker = new ResponsePacker( logicalTransactionStore, transactionIdStore, new Supplier<StoreId>()
{
@Override
public StoreId get()
Expand Down Expand Up @@ -170,8 +166,8 @@ public long getTransactionChecksum( long txId ) throws IOException
@Override
public RequestContext flushStoresAndStreamStoreFiles( StoreWriter writer )
{
StoreCopyServer streamer = new StoreCopyServer( transactionIdStore, neoStoreDataSource,
storeFlusher, fileSystem, storeDir, monitors.newMonitor( StoreCopyServer.Monitor.class ) );
StoreCopyServer streamer = new StoreCopyServer( neoStoreDataSource,
checkPointer, fileSystem, storeDir, monitors.newMonitor( StoreCopyServer.Monitor.class ) );
return streamer.flushStoresAndStreamStoreFiles( writer, false );
}

Expand Down
Expand Up @@ -146,7 +146,7 @@
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionAppender;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.rotation.StoreFlusher;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.state.NeoStoreInjectedTransactionValidator;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.impl.util.DependenciesProxy;
Expand Down Expand Up @@ -409,7 +409,7 @@ public MasterImpl.SPI newInstance()
platformModule.monitors,
labelTokenHolder, propertyKeyTokenHolder, relationshipTypeTokenHolder, idGeneratorFactory,
platformModule.dependencies.resolveDependency( TransactionCommitProcess.class ),
platformModule.dependencies.resolveDependency( StoreFlusher.class ),
platformModule.dependencies.resolveDependency( CheckPointer.class ),
platformModule.dependencies.resolveDependency( TransactionIdStore.class ),
platformModule.dependencies.resolveDependency( LogicalTransactionStore.class ),
platformModule.dependencies.resolveDependency( NeoStoreDataSource.class ));
Expand Down

0 comments on commit 80e4e2f

Please sign in to comment.