Skip to content

Commit

Permalink
Cleanup backup code
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Oct 10, 2016
1 parent 231d37e commit 3c1b851
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 72 deletions.
59 changes: 18 additions & 41 deletions enterprise/backup/src/main/java/org/neo4j/backup/BackupService.java
Expand Up @@ -42,7 +42,6 @@
import org.neo4j.helpers.CancellationRequest; import org.neo4j.helpers.CancellationRequest;
import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Service; import org.neo4j.helpers.Service;
import org.neo4j.helpers.progress.ProgressListener;
import org.neo4j.helpers.progress.ProgressMonitorFactory; import org.neo4j.helpers.progress.ProgressMonitorFactory;
import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
Expand All @@ -61,6 +60,7 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.Lifespan;
import org.neo4j.kernel.monitoring.ByteCounterMonitor; import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.FormattedLogProvider; import org.neo4j.logging.FormattedLogProvider;
Expand Down Expand Up @@ -88,7 +88,7 @@ class BackupOutcome
this.consistent = consistent; this.consistent = consistent;
} }


public long getLastCommittedTx() long getLastCommittedTx()
{ {
return lastCommittedTx; return lastCommittedTx;
} }
Expand All @@ -110,7 +110,6 @@ public boolean isConsistent()
private final LogProvider logProvider; private final LogProvider logProvider;
private final Log log; private final Log log;
private final Monitors monitors; private final Monitors monitors;
private final VersionAwareLogEntryReader entryReader;


BackupService() BackupService()
{ {
Expand All @@ -123,7 +122,6 @@ public boolean isConsistent()
this.logProvider = logProvider; this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.monitors = monitors; this.monitors = monitors;
this.entryReader = new VersionAwareLogEntryReader<>();
} }


BackupOutcome doFullBackup( final String sourceHostNameOrIp, final int sourcePort, File targetDirectory, BackupOutcome doFullBackup( final String sourceHostNameOrIp, final int sourcePort, File targetDirectory,
Expand All @@ -142,7 +140,7 @@ BackupOutcome doFullBackup( final String sourceHostNameOrIp, final int sourcePor
loadKernelExtensions(), logProvider, new DefaultFileSystemAbstraction(), pageCache, loadKernelExtensions(), logProvider, new DefaultFileSystemAbstraction(), pageCache,
monitors.newMonitor( StoreCopyClient.Monitor.class, getClass() ), forensics ); monitors.newMonitor( StoreCopyClient.Monitor.class, getClass() ), forensics );
FullBackupStoreCopyRequester storeCopyRequester = FullBackupStoreCopyRequester storeCopyRequester =
new FullBackupStoreCopyRequester( sourceHostNameOrIp, sourcePort, timeout, forensics ); new FullBackupStoreCopyRequester( sourceHostNameOrIp, sourcePort, timeout, forensics, monitors );
storeCopier.copyStore( storeCopyRequester, CancellationRequest.NEVER_CANCELLED ); storeCopier.copyStore( storeCopyRequester, CancellationRequest.NEVER_CANCELLED );


bumpDebugDotLogFileVersion( targetDirectory, timestamp ); bumpDebugDotLogFileVersion( targetDirectory, timestamp );
Expand Down Expand Up @@ -257,12 +255,13 @@ boolean directoryContainsDb( File targetDirectory )
return fileSystem.fileExists( new File( targetDirectory, MetaDataStore.DEFAULT_NAME ) ); return fileSystem.fileExists( new File( targetDirectory, MetaDataStore.DEFAULT_NAME ) );
} }


boolean directoryIsEmpty( File targetDirectory ) private boolean directoryIsEmpty( File targetDirectory )
{ {
return !fileSystem.isDirectory( targetDirectory ) || 0 == fileSystem.listFiles( targetDirectory ).length; return !fileSystem.isDirectory( targetDirectory ) || 0 == fileSystem.listFiles( targetDirectory ).length;
} }


static GraphDatabaseAPI startTemporaryDb( File targetDirectory, PageCache pageCache, Map<String,String> config ) private static GraphDatabaseAPI startTemporaryDb( File targetDirectory, PageCache pageCache,
Map<String,String> config )
{ {
GraphDatabaseFactory factory = ExternallyManagedPageCache.graphDatabaseFactoryWithPageCache( pageCache ); GraphDatabaseFactory factory = ExternallyManagedPageCache.graphDatabaseFactoryWithPageCache( pageCache );
return (GraphDatabaseAPI) factory.newEmbeddedDatabaseBuilder( targetDirectory ).setConfig( config ) return (GraphDatabaseAPI) factory.newEmbeddedDatabaseBuilder( targetDirectory ).setConfig( config )
Expand Down Expand Up @@ -292,20 +291,14 @@ private BackupOutcome incrementalWithContext( String sourceHostNameOrIp, int sou
LogProvider logProvider = resolver.resolveDependency( LogService.class ).getInternalLogProvider(); LogProvider logProvider = resolver.resolveDependency( LogService.class ).getInternalLogProvider();
BackupClient client = new BackupClient( sourceHostNameOrIp, sourcePort, null, logProvider, targetDb.storeId(), BackupClient client = new BackupClient( sourceHostNameOrIp, sourcePort, null, logProvider, targetDb.storeId(),
timeout, unpacker, monitors.newMonitor( ByteCounterMonitor.class, BackupClient.class ), timeout, unpacker, monitors.newMonitor( ByteCounterMonitor.class, BackupClient.class ),
monitors.newMonitor( RequestMonitor.class, BackupClient.class ), entryReader ); monitors.newMonitor( RequestMonitor.class, BackupClient.class ), new VersionAwareLogEntryReader<>() );


boolean consistent = false; try ( Lifespan lifespan = new Lifespan( unpacker, client ) )
try
{ {
client.start();
unpacker.start();

try ( Response<Void> response = client.incrementalBackup( context ) ) try ( Response<Void> response = client.incrementalBackup( context ) )
{ {
unpacker.unpackResponse( response, handler ); unpacker.unpackResponse( response, handler );
} }

consistent = true;
} }
catch ( MismatchingStoreIdException e ) catch ( MismatchingStoreIdException e )
{ {
Expand All @@ -327,19 +320,8 @@ private BackupOutcome incrementalWithContext( String sourceHostNameOrIp, int sou
{ {
throw new RuntimeException( "Unexpected error", throwable ); throw new RuntimeException( "Unexpected error", throwable );
} }
finally
{ return new BackupOutcome( handler.getLastSeenTransactionId(), true );
try
{
client.stop();
unpacker.stop();
}
catch ( Throwable throwable )
{
log.warn( "Unable to stop backup client", throwable );
}
}
return new BackupOutcome( handler.getLastSeenTransactionId(), consistent );
} }


private static boolean bumpDebugDotLogFileVersion( File dbDirectory, long toTimestamp ) private static boolean bumpDebugDotLogFileVersion( File dbDirectory, long toTimestamp )
Expand Down Expand Up @@ -381,52 +363,47 @@ private void clearIdFiles( File targetDirectory ) throws IOException


private static class ProgressTxHandler implements TxHandler private static class ProgressTxHandler implements TxHandler
{ {
private final ProgressListener progress = ProgressMonitorFactory.textual( System.out ).openEnded(
"Transactions applied", 1000 );
private long lastSeenTransactionId; private long lastSeenTransactionId;


@Override @Override
public void accept( long transactionId ) public void accept( long transactionId )
{ {
progress.add( 1 );
lastSeenTransactionId = transactionId; lastSeenTransactionId = transactionId;
} }


@Override long getLastSeenTransactionId()
public void done()
{
progress.done();
}

public long getLastSeenTransactionId()
{ {
return lastSeenTransactionId; return lastSeenTransactionId;
} }
} }


private class FullBackupStoreCopyRequester implements StoreCopyClient.StoreCopyRequester private static class FullBackupStoreCopyRequester implements StoreCopyClient.StoreCopyRequester
{ {
private final String sourceHostNameOrIp; private final String sourceHostNameOrIp;
private final int sourcePort; private final int sourcePort;
private final long timeout; private final long timeout;
private final boolean forensics; private final boolean forensics;
private final Monitors monitors;

private BackupClient client; private BackupClient client;


private FullBackupStoreCopyRequester( String sourceHostNameOrIp, int sourcePort, long timeout, private FullBackupStoreCopyRequester( String sourceHostNameOrIp, int sourcePort, long timeout,
boolean forensics ) boolean forensics, Monitors monitors )
{ {
this.sourceHostNameOrIp = sourceHostNameOrIp; this.sourceHostNameOrIp = sourceHostNameOrIp;
this.sourcePort = sourcePort; this.sourcePort = sourcePort;
this.timeout = timeout; this.timeout = timeout;
this.forensics = forensics; this.forensics = forensics;
this.monitors = monitors;
} }


@Override @Override
public Response<?> copyStore( StoreWriter writer ) public Response<?> copyStore( StoreWriter writer )
{ {
client = new BackupClient( sourceHostNameOrIp, sourcePort, null, NullLogProvider.getInstance(), client = new BackupClient( sourceHostNameOrIp, sourcePort, null, NullLogProvider.getInstance(),
StoreId.DEFAULT, timeout, ResponseUnpacker.NO_OP_RESPONSE_UNPACKER, monitors.newMonitor( StoreId.DEFAULT, timeout, ResponseUnpacker.NO_OP_RESPONSE_UNPACKER, monitors.newMonitor(
ByteCounterMonitor.class ), monitors.newMonitor( RequestMonitor.class ), entryReader ); ByteCounterMonitor.class ), monitors.newMonitor( RequestMonitor.class ),
new VersionAwareLogEntryReader<>() );
client.start(); client.start();
return client.fullBackup( writer, forensics ); return client.fullBackup( writer, forensics );
} }
Expand Down
2 changes: 1 addition & 1 deletion enterprise/com/src/main/java/org/neo4j/com/Client.java
Expand Up @@ -56,7 +56,7 @@
import static org.neo4j.com.Protocol.addLengthFieldPipes; import static org.neo4j.com.Protocol.addLengthFieldPipes;
import static org.neo4j.com.Protocol.assertChunkSizeIsWithinFrameSize; import static org.neo4j.com.Protocol.assertChunkSizeIsWithinFrameSize;
import static org.neo4j.com.ResourcePool.DEFAULT_CHECK_INTERVAL; import static org.neo4j.com.ResourcePool.DEFAULT_CHECK_INTERVAL;
import static org.neo4j.com.storecopy.ResponseUnpacker.NO_OP_TX_HANDLER; import static org.neo4j.com.storecopy.ResponseUnpacker.TxHandler.NO_OP_TX_HANDLER;
import static org.neo4j.helpers.NamedThreadFactory.daemon; import static org.neo4j.helpers.NamedThreadFactory.daemon;


/** /**
Expand Down
Expand Up @@ -28,32 +28,12 @@ public interface ResponseUnpacker
*/ */
void unpackResponse( Response<?> response, TxHandler txHandler ) throws Exception; void unpackResponse( Response<?> response, TxHandler txHandler ) throws Exception;


public static final ResponseUnpacker NO_OP_RESPONSE_UNPACKER = new ResponseUnpacker() ResponseUnpacker NO_OP_RESPONSE_UNPACKER = ( response, txHandler ) -> { /* Do nothing */ };
{
@Override
public void unpackResponse( Response<?> response, TxHandler txHandler )
{
txHandler.done();
}
};


public interface TxHandler interface TxHandler
{ {
void accept( long transactionId ); TxHandler NO_OP_TX_HANDLER = transactionId -> { /* Do nothing */ };


void done(); void accept( long transactionId );
} }

public static final TxHandler NO_OP_TX_HANDLER = new TxHandler()
{
@Override
public void accept( long transactionId )
{ // Do nothing
}

@Override
public void done()
{ // Do nothing
}
};
} }
Expand Up @@ -50,9 +50,9 @@
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.verifyZeroInteractions;
import static org.neo4j.com.MadeUpServer.FRAME_LENGTH; import static org.neo4j.com.MadeUpServer.FRAME_LENGTH;
import static org.neo4j.com.TxChecksumVerifier.ALWAYS_MATCH;
import static org.neo4j.com.storecopy.ResponseUnpacker.NO_OP_TX_HANDLER;
import static org.neo4j.com.StoreIdTestFactory.newStoreIdForCurrentVersion; import static org.neo4j.com.StoreIdTestFactory.newStoreIdForCurrentVersion;
import static org.neo4j.com.TxChecksumVerifier.ALWAYS_MATCH;
import static org.neo4j.com.storecopy.ResponseUnpacker.TxHandler.NO_OP_TX_HANDLER;


public class TestCommunication public class TestCommunication
{ {
Expand Down
Expand Up @@ -39,7 +39,6 @@
import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Commands;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit; import org.neo4j.kernel.impl.transaction.log.entry.OnePhaseCommit;
Expand All @@ -56,7 +55,7 @@
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.com.storecopy.ResponseUnpacker.NO_OP_TX_HANDLER; import static org.neo4j.com.storecopy.ResponseUnpacker.TxHandler.NO_OP_TX_HANDLER;
import static org.neo4j.kernel.impl.transaction.log.LogPosition.UNSPECIFIED; import static org.neo4j.kernel.impl.transaction.log.LogPosition.UNSPECIFIED;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;


Expand Down
Expand Up @@ -193,7 +193,7 @@ public Response<Void> endLockSession( RequestContext context, final boolean succ
@Override @Override
public Response<Void> pullUpdates( RequestContext context ) public Response<Void> pullUpdates( RequestContext context )
{ {
return pullUpdates( context, ResponseUnpacker.NO_OP_TX_HANDLER ); return pullUpdates( context, ResponseUnpacker.TxHandler.NO_OP_TX_HANDLER );
} }


@Override @Override
Expand All @@ -210,7 +210,7 @@ public Response<HandshakeResult> handshake( final long txId, StoreId storeId )
Deserializer<HandshakeResult> deserializer = Deserializer<HandshakeResult> deserializer =
( buffer, temporaryBuffer ) -> new HandshakeResult( buffer.readLong(), buffer.readLong() ); ( buffer, temporaryBuffer ) -> new HandshakeResult( buffer.readLong(), buffer.readLong() );
return sendRequest( requestTypes.type( HaRequestTypes.Type.HANDSHAKE ), RequestContext.EMPTY, return sendRequest( requestTypes.type( HaRequestTypes.Type.HANDSHAKE ), RequestContext.EMPTY,
serializer, deserializer, storeId, ResponseUnpacker.NO_OP_TX_HANDLER ); serializer, deserializer, storeId, ResponseUnpacker.TxHandler.NO_OP_TX_HANDLER );
} }


@Override @Override
Expand Down

0 comments on commit 3c1b851

Please sign in to comment.