Skip to content

Commit

Permalink
Monitors store copy on server and client individually
Browse files Browse the repository at this point in the history
so that
BackupServiceIT#shouldContainTransactionsThatHappenDuringBackupProcess can
be implemented correctly and test what it was initially designed to test,
namely that a store copy gets correct even in the face of changing store
files while copying.

Also fixed a transaction log header issue where the txId would be 0
instead of 1 (base txId) if no transaction streamed.

thanks to @davidegrohmann
  • Loading branch information
tinwelint committed Mar 12, 2015
1 parent 115b101 commit c8f305c
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 234 deletions.

This file was deleted.

39 changes: 39 additions & 0 deletions community/kernel/src/test/java/org/neo4j/test/Barrier.java
Expand Up @@ -21,6 +21,17 @@

import java.util.concurrent.CountDownLatch;

/**
* Controls two threads that would otherwise race and produce non-deterministic outcome.
* (ascii-art looks odd in source but lines up in fixed-size generated javadoc).
* <pre>
* {@link Control#await() T1 await()} {@link Control#release() T1 release()}
* | |
* -T1/T2--------|-T2-----------|-T1------------|-T1/T2------------------>
* |
* {@link #reached() T2 reached()}
* </pre>
*/
public interface Barrier
{
Barrier NONE = new Barrier()
Expand All @@ -37,6 +48,7 @@ class Control implements Barrier
{
private final CountDownLatch reached = new CountDownLatch( 1 ), released = new CountDownLatch( 1 );

@Override
public void reached()
{
try
Expand All @@ -55,6 +67,33 @@ public void await() throws InterruptedException
reached.await();
}

public void awaitUninterruptibly()
{
boolean interrupted = false;
try
{
while ( true )
{
try
{
await();
return;
}
catch ( InterruptedException e )
{
interrupted = true;
}
}
}
finally
{
if ( interrupted )
{
Thread.currentThread().interrupt();
}
}
}

public void release()
{
released.countDown();
Expand Down
Expand Up @@ -30,13 +30,11 @@
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.monitoring.StoreCopyMonitor;

import static org.neo4j.com.RequestContext.anonymous;

class BackupImpl implements TheBackupInterface
{
private final StoreCopyMonitor storeCopyMonitor;
private final StoreCopyServer storeCopyServer;
private final ResponsePacker incrementalResponsePacker;
private final LogicalTransactionStore logicalTransactionStore;
Expand All @@ -53,19 +51,18 @@ public BackupImpl( StoreCopyServer storeCopyServer, Monitors monitors,
this.transactionIdStore = transactionIdStore;
this.logFileInformation = logFileInformation;
this.storeId = storeId;
this.storeCopyMonitor = monitors.newMonitor( StoreCopyMonitor.class, getClass() );
this.incrementalResponsePacker = new ResponsePacker( logicalTransactionStore, transactionIdStore, storeId );
}

@Override
public Response<Void> fullBackup( StoreWriter writer, boolean forensics )
{
try ( StoreWriter storeWriter = writer )
{
storeCopyMonitor.startCopyingFiles();
RequestContext copyStartContext = storeCopyServer.flushStoresAndStreamStoreFiles( storeWriter, forensics );
ResponsePacker responsePacker = new StoreCopyResponsePacker( logicalTransactionStore,
transactionIdStore, logFileInformation, storeId,
copyStartContext.lastAppliedTransaction() + 1 ); // mandatory transaction id
copyStartContext.lastAppliedTransaction() + 1, storeCopyServer.monitor() ); // mandatory transaction id
long optionalTransactionId = copyStartContext.lastAppliedTransaction();
return responsePacker.packTransactionStreamResponse( anonymous( optionalTransactionId ), null/*no response object*/ );
}
Expand Down
Expand Up @@ -66,7 +66,6 @@
import org.neo4j.kernel.logging.Logging;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.kernel.monitoring.StoreCopyMonitor;

import static org.neo4j.com.RequestContext.anonymous;
import static org.neo4j.kernel.impl.pagecache.StandalonePageCacheFactory.createPageCache;
Expand Down Expand Up @@ -107,21 +106,18 @@ public boolean isConsistent()

private final FileSystemAbstraction fileSystem;
private final StringLogger logger;
private final Monitors monitors;

BackupService()
{
this( new DefaultFileSystemAbstraction(), StringLogger.SYSTEM );
this( new DefaultFileSystemAbstraction(), StringLogger.SYSTEM, new Monitors() );
}

BackupService( FileSystemAbstraction fileSystem )
{
this( fileSystem, StringLogger.SYSTEM );
}

BackupService( FileSystemAbstraction fileSystem, StringLogger logger )
BackupService( FileSystemAbstraction fileSystem, StringLogger logger, Monitors monitors )
{
this.fileSystem = fileSystem;
this.logger = logger;
this.monitors = monitors;
}

BackupOutcome doFullBackup( final String sourceHostNameOrIp, final int sourcePort, String targetDirectory,
Expand All @@ -137,21 +133,19 @@ BackupOutcome doFullBackup( final String sourceHostNameOrIp, final int sourcePor
long timestamp = System.currentTimeMillis();
long lastCommittedTx = -1;
boolean consistent = !checkConsistency; // default to true if we're not checking consistency
GraphDatabaseAPI targetDb = null;
try ( StandalonePageCache pageCache = createPageCache( fileSystem, "BackupService PageCache" ) )
{
StoreCopyClient storeCopier = new StoreCopyClient( tuningConfiguration, loadKernelExtensions(),
new ConsoleLogger( StringLogger.SYSTEM ), new DevNullLoggingService(),
new DefaultFileSystemAbstraction(), pageCache, new Monitors().newMonitor( StoreCopyMonitor.class, getClass() ) );
new DefaultFileSystemAbstraction(), pageCache,
monitors.newMonitor( StoreCopyClient.Monitor.class, getClass() ) );
storeCopier.copyStore( new StoreCopyClient.StoreCopyRequester()

{
private BackupClient client;

@Override
public Response<?> copyStore( StoreWriter writer )
{
Monitors monitors = new Monitors();
client = new BackupClient( sourceHostNameOrIp, sourcePort, new DevNullLoggingService(),
StoreId.DEFAULT, timeout, ResponseUnpacker.NO_OP_RESPONSE_UNPACKER,
monitors.newMonitor( ByteCounterMonitor.class ),
Expand All @@ -167,19 +161,11 @@ public void done()
}
}, CancellationRequest.NEVER_CANCELLED );

targetDb = startTemporaryDb( targetDirectory );
}
catch ( IOException e )
{
throw new RuntimeException( e );
}
finally
{
if ( targetDb != null )
{
targetDb.shutdown();
}
}
bumpMessagesDotLogFile( targetDirectory, timestamp );
if ( checkConsistency )
{
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.backup;

import ch.qos.logback.classic.LoggerContext;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
Expand Down Expand Up @@ -50,13 +52,11 @@
import org.neo4j.kernel.logging.SystemOutLogging;
import org.neo4j.kernel.monitoring.Monitors;

import ch.qos.logback.classic.LoggerContext;
import static org.slf4j.impl.StaticLoggerBinder.getSingleton;

import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.kernel.impl.storemigration.FileOperation.MOVE;

import static org.slf4j.impl.StaticLoggerBinder.getSingleton;

public class BackupTool
{
private static final String TO = "to";
Expand Down Expand Up @@ -89,7 +89,7 @@ public class BackupTool

public static void main( String[] args )
{
BackupTool tool = new BackupTool( new BackupService( new DefaultFileSystemAbstraction() ), System.out );
BackupTool tool = new BackupTool( new BackupService(), System.out );
try
{
BackupOutcome backupOutcome = tool.run( args );
Expand Down
Expand Up @@ -87,7 +87,8 @@ public TheBackupInterface newBackup()
resolver.resolveDependency( DataSourceManager.class ).getDataSource(),
resolver.resolveDependency( LogRotationControl.class ),
resolver.resolveDependency( FileSystemAbstraction.class ),
new File( graphDatabaseAPI.getStoreDir() ) );
new File( graphDatabaseAPI.getStoreDir() ),
monitors.newMonitor( StoreCopyServer.Monitor.class ) );
LogicalTransactionStore logicalTransactionStore = resolver.resolveDependency( LogicalTransactionStore.class );
LogFileInformation logFileInformation = resolver.resolveDependency( LogFileInformation.class );
return new BackupImpl( copier, monitors,
Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.neo4j.com.TransactionStream;
import org.neo4j.com.TransactionStreamResponse;
import org.neo4j.com.storecopy.ResponsePacker;
import org.neo4j.com.storecopy.StoreCopyServer;
import org.neo4j.com.storecopy.StoreCopyServer.Monitor;
import org.neo4j.helpers.Provider;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.impl.store.StoreId;
Expand All @@ -49,15 +51,17 @@ public class StoreCopyResponsePacker extends ResponsePacker
private final long mandatoryStartTransactionId;
private final LogFileInformation logFileInformation;
private final TransactionIdStore transactionIdStore;
private final Monitor monitor;

public StoreCopyResponsePacker( LogicalTransactionStore transactionStore,
TransactionIdStore transactionIdStore, LogFileInformation logFileInformation,
Provider<StoreId> storeId, long mandatoryStartTransactionId )
Provider<StoreId> storeId, long mandatoryStartTransactionId, StoreCopyServer.Monitor monitor )
{
super( transactionStore, transactionIdStore, storeId );
this.transactionIdStore = transactionIdStore;
this.mandatoryStartTransactionId = mandatoryStartTransactionId;
this.logFileInformation = logFileInformation;
this.monitor = monitor;
}

@Override
Expand All @@ -73,7 +77,9 @@ public void accept( Visitor<CommittedTransactionRepresentation,IOException> visi
// Check so that it's even worth thinking about extracting any transactions at all
if ( toStartFrom > BASE_TX_ID && toStartFrom <= toEndAt )
{
monitor.startStreamingTransactions( toStartFrom );
extractTransactions( toStartFrom, filterVisitor( visitor, toEndAt ) );
monitor.finishStreamingTransactions( toEndAt );
}
}
};
Expand Down

0 comments on commit c8f305c

Please sign in to comment.