Skip to content

Commit

Permalink
Properly place transaction logs during backup
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Nov 16, 2017
1 parent c026f12 commit 4cb2df0
Show file tree
Hide file tree
Showing 16 changed files with 44 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ CatchupResult tryCatchingUp( AdvertisedSocketAddress fromAddress, StoreId expect
{
try
{
return remoteStore.tryCatchingUp( fromAddress, expectedStoreId, storeDir );
return remoteStore.tryCatchingUp( fromAddress, expectedStoreId, storeDir, true );
}
catch ( IOException e )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.util.Map;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
Expand All @@ -33,6 +34,7 @@ public class BackupRecoveryService
public void recoverWithDatabase( File targetDirectory, PageCache pageCache, Config config )
{
Map<String,String> configParams = config.getRaw();
configParams.put( GraphDatabaseSettings.logical_logs_location.name(), targetDirectory.getAbsolutePath() );
GraphDatabaseAPI targetDb = startTemporaryDb( targetDirectory, pageCache, configParams );
targetDb.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@
package org.neo4j.backup;

import java.io.File;
import java.util.Map;

import org.neo4j.commandline.admin.CommandFailed;
import org.neo4j.helpers.OptionalHostnamePort;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifeSupport;

import static org.neo4j.backup.BackupProtocolService.startTemporaryDb;

class BackupStrategyWrapper
{
private final BackupStrategy backupStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Optional;

import org.neo4j.commandline.admin.CommandFailed;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.configuration.Config;

Expand All @@ -39,11 +38,10 @@ class OnlineBackupCommandConfigLoader
this.configDir = configDir;
}

Config loadConfig( Optional<Path> additionalConfig, Path folder ) throws CommandFailed
Config loadConfig( Optional<Path> additionalConfig ) throws CommandFailed
{
Config config = Config.fromFile( configDir.resolve( Config.DEFAULT_CONFIG_FILE_NAME ) ).withHome( homeDir )
.withConnectorsDisabled().build();
config.augment( GraphDatabaseSettings.logical_logs_location, folder.toAbsolutePath().toString() );
return withAdditionalConfig( additionalConfig, config );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public OnlineBackupContext fromCommandLineArguments( String[] commandlineArgs )
try
{
OnlineBackupRequiredArguments requiredArguments = backupCommandArgumentHandler.establishRequiredArguments( commandlineArgs );
Config config = onlineBackupCommandConfigLoader.loadConfig( requiredArguments.getAdditionalConfig(),
requiredArguments.getFolder() );
Config config = onlineBackupCommandConfigLoader.loadConfig( requiredArguments.getAdditionalConfig() );
ConsistencyFlags consistencyFlags = backupCommandArgumentHandler.readFlagsFromArgumentsOrDefaultToConfig( config );

return new OnlineBackupContext( requiredArguments, config, consistencyFlags );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void tryCatchingUpDelegatesToRemoteStore() throws org.neo4j.causalcluster
subject.tryCatchingUp( fromAddress, expectedStoreId, storeDir );

// then
verify( remoteStore ).tryCatchingUp( fromAddress, expectedStoreId, storeDir );
verify( remoteStore ).tryCatchingUp( fromAddress, expectedStoreId, storeDir, true );
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void errorHandledForNonExistingAdditionalConfigFile() throws CommandFaile

// expect
assertFalse( additionalConf.exists() );
subject.loadConfig( Optional.of( additionalConf.toPath() ), homeDir );
subject.loadConfig( Optional.of( additionalConf.toPath() ) );
}

@Test
Expand All @@ -101,7 +101,7 @@ public void prioritiseConfigDirOverHomeDir() throws IOException, CommandFailed
appendToFile( homeDirConfigFile, "causal_clustering.raft_in_queue_max_batch=21" );

// when
Config config = subject.loadConfig( Optional.empty(), homeDir );
Config config = subject.loadConfig( Optional.empty() );

// then
assertEquals( Integer.valueOf( 4 ), config.get( CausalClusteringSettings.expected_core_cluster_size ) );
Expand All @@ -121,7 +121,7 @@ public void prioritiseAdditionalOverConfigDir() throws IOException, CommandFaile
appendToFile( additionalConfigFile, "causal_clustering.expected_core_cluster_size=5" );

// when
Config config = subject.loadConfig( Optional.of( additionalConfigFile.toPath() ), homeDir );
Config config = subject.loadConfig( Optional.of( additionalConfigFile.toPath() ) );

// then
assertEquals( Integer.valueOf( 5 ), config.get( CausalClusteringSettings.expected_core_cluster_size ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public class RemoteStore
private final TxPullClient txPullClient;
private final TransactionLogCatchUpFactory transactionLogFactory;

public RemoteStore( LogProvider logProvider, FileSystemAbstraction fs, PageCache pageCache, StoreCopyClient storeCopyClient, TxPullClient txPullClient,
TransactionLogCatchUpFactory transactionLogFactory, Config config, Monitors monitors )
public RemoteStore( LogProvider logProvider, FileSystemAbstraction fs, PageCache pageCache, StoreCopyClient storeCopyClient,
TxPullClient txPullClient, TransactionLogCatchUpFactory transactionLogFactory, Config config, Monitors monitors )
{
this.logProvider = logProvider;
this.storeCopyClient = storeCopyClient;
Expand Down Expand Up @@ -127,10 +127,11 @@ private long getPullIndex( File storeDir ) throws IOException
}
}

public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir ) throws StoreCopyFailedException, IOException
public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir,
boolean keepTxLogsInStoreDir ) throws StoreCopyFailedException, IOException
{
long pullIndex = getPullIndex( storeDir );
return pullTransactions( from, expectedStoreId, storeDir, pullIndex, false );
return pullTransactions( from, expectedStoreId, storeDir, pullIndex, false, keepTxLogsInStoreDir );
}

public void copy( AdvertisedSocketAddress from, StoreId expectedStoreId, File destDir )
Expand All @@ -147,7 +148,8 @@ public void copy( AdvertisedSocketAddress from, StoreId expectedStoreId, File de

log.info( "Store files need to be recovered starting from: %d", lastFlushedTxId );

CatchupResult catchupResult = pullTransactions( from, expectedStoreId, destDir, lastFlushedTxId, true );
CatchupResult catchupResult =
pullTransactions( from, expectedStoreId, destDir, lastFlushedTxId, true, true );
if ( catchupResult != SUCCESS_END_OF_STREAM )
{
throw new StreamingTransactionsFailedException( "Failed to pull transactions: " + catchupResult );
Expand All @@ -159,11 +161,12 @@ public void copy( AdvertisedSocketAddress from, StoreId expectedStoreId, File de
}
}

private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, long fromTxId, boolean asPartOfStoreCopy )
private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, long fromTxId,
boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir )
throws IOException, StoreCopyFailedException
{
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, config,
logProvider, fromTxId, asPartOfStoreCopy ) )
logProvider, fromTxId, asPartOfStoreCopy, keepTxLogsInStoreDir ) )
{
log.info( "Pulling transactions from: %d", fromTxId );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
public class TransactionLogCatchUpFactory
{
public TransactionLogCatchUpWriter create( File storeDir, FileSystemAbstraction fs, PageCache pageCache,
Config config, LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy ) throws IOException
Config config, LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir )
throws IOException
{
return new TransactionLogCatchUpWriter( storeDir, fs, pageCache, config, logProvider, fromTxId,
asPartOfStoreCopy );
asPartOfStoreCopy, keepTxLogsInStoreDir );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ public class TransactionLogCatchUpWriter implements TxPullResponseListener, Auto
private long expectedTxId;

TransactionLogCatchUpWriter( File storeDir, FileSystemAbstraction fs, PageCache pageCache, Config config,
LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy ) throws IOException
LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir ) throws IOException
{
this.pageCache = pageCache;
this.log = logProvider.getLog( getClass() );
this.asPartOfStoreCopy = asPartOfStoreCopy;
LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache )
.withLastCommittedTransactionIdSupplier( () -> fromTxId - 1 );
if ( !asPartOfStoreCopy )
if ( !keepTxLogsInStoreDir )
{
logFilesBuilder.withConfig( config );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void onCoreSnapshot( CompletableFuture<CoreSnapshot> signal, CoreSnapshot
else
{
StoreId localStoreId = localDatabase.storeId();
CatchupResult catchupResult = remoteStore.tryCatchingUp( fromAddress, localStoreId, localDatabase.storeDir() );
CatchupResult catchupResult = remoteStore.tryCatchingUp( fromAddress, localStoreId, localDatabase
.storeDir(), false );

if ( catchupResult == E_TRANSACTION_PRUNED )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private TransactionLogCatchUpFactory factory( TransactionLogCatchUpWriter writer
{
TransactionLogCatchUpFactory factory = mock( TransactionLogCatchUpFactory.class );
when( factory.create( isNull(), any( FileSystemAbstraction.class ), isNull(), any( Config.class ),
any( LogProvider.class ), anyLong(), anyBoolean() ) ).thenReturn( writer );
any( LogProvider.class ), anyLong(), anyBoolean(), anyBoolean() ) ).thenReturn( writer );
return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,25 @@ public void setup() throws IOException
@Test
public void shouldCreateTransactionLogWithCheckpoint() throws Exception
{
createTransactionLogWithCheckpoint( Config.defaults() );
createTransactionLogWithCheckpoint( Config.defaults(), true );
}

@Test
public void createTransactionLogWithCheckpointInCustomLocation() throws IOException
{
createTransactionLogWithCheckpoint( Config.defaults( GraphDatabaseSettings.logical_logs_location,
"custom-tx-logs") );
"custom-tx-logs"), false );
}

private void createTransactionLogWithCheckpoint( Config config ) throws IOException
private void createTransactionLogWithCheckpoint( Config config, boolean logsInStoreDir ) throws IOException
{
org.neo4j.kernel.impl.store.StoreId storeId = simulateStoreCopy();

int fromTxId = 37;
int endTxId = fromTxId + 5;

TransactionLogCatchUpWriter catchUpWriter = new TransactionLogCatchUpWriter( storeDir, fs, pageCache, config,
NullLogProvider.getInstance(), fromTxId, partOfStoreCopy );
NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, logsInStoreDir );

// when
for ( int i = fromTxId; i <= endTxId; i++ )
Expand All @@ -136,7 +136,7 @@ private void createTransactionLogWithCheckpoint( Config config ) throws IOExcept

// then
LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache );
if ( !partOfStoreCopy )
if ( !logsInStoreDir )
{
logFilesBuilder.withConfig( config );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void shouldDownloadCompleteStoreWhenEmpty() throws Throwable
downloader.downloadSnapshot( remoteMember );

// then
verify( remoteStore, never() ).tryCatchingUp( any(), any(), any() );
verify( remoteStore, never() ).tryCatchingUp( any(), any(), any(), anyBoolean() );
verify( storeCopyProcess ).replaceWithStoreFrom( remoteAddress, remoteStoreId );
}

Expand Down Expand Up @@ -136,7 +137,7 @@ public void shouldNotOverwriteNonEmptyMismatchingStore() throws Exception

// then
verify( remoteStore, never() ).copy( any(), any(), any() );
verify( remoteStore, never() ).tryCatchingUp( any(), any(), any() );
verify( remoteStore, never() ).tryCatchingUp( any(), any(), any(), anyBoolean() );
}

@Test
Expand All @@ -145,13 +146,14 @@ public void shouldCatchupIfPossible() throws Exception
// given
when( localDatabase.isEmpty() ).thenReturn( false );
when( remoteStore.getStoreId( remoteAddress ) ).thenReturn( storeId );
when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir ) ).thenReturn( SUCCESS_END_OF_STREAM );
when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir, false ) )
.thenReturn( SUCCESS_END_OF_STREAM );

// when
downloader.downloadSnapshot( remoteMember );

// then
verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir );
verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir, false );
verify( remoteStore, never() ).copy( any(), any(), any() );
}

Expand All @@ -161,13 +163,13 @@ public void shouldDownloadWholeStoreIfCannotCatchUp() throws Exception
// given
when( localDatabase.isEmpty() ).thenReturn( false );
when( remoteStore.getStoreId( remoteAddress ) ).thenReturn( storeId );
when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir ) ).thenReturn( E_TRANSACTION_PRUNED );
when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir, false ) ).thenReturn( E_TRANSACTION_PRUNED );

// when
downloader.downloadSnapshot( remoteMember );

// then
verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir );
verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir, false );
verify( storeCopyProcess ).replaceWithStoreFrom( remoteAddress, storeId );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ private GraphDatabaseService newTempDatabase( File tempStore )
.setConfig( "dbms.backup.enabled", Settings.FALSE )
.setConfig( GraphDatabaseSettings.logs_directory, tempStore.getAbsolutePath() )
.setConfig( GraphDatabaseSettings.keep_logical_logs, Settings.TRUE )
.setConfig( GraphDatabaseSettings.logical_logs_location,
config.get( GraphDatabaseSettings.logical_logs_location ).toString() )
.setConfig( GraphDatabaseSettings.logical_logs_location, tempStore.getAbsolutePath() )
.setConfig( GraphDatabaseSettings.allow_upgrade,
config.get( GraphDatabaseSettings.allow_upgrade ).toString() )
.newGraphDatabase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public void finishRecoveringStore()
@Test
public void storeCopyClientUseCustomTransactionLogLocationWhenConfigured() throws Exception
{
final File copyDir = new File( directory.directory(), "copy" );
final File originalDir = new File( directory.directory(), "original" );
final File copyDir = new File( directory.directory(), "copyCustomLocation" );
final File originalDir = new File( directory.directory(), "originalCustomLocation" );
PageCache pageCache = pageCacheRule.getPageCache( fileSystem );
File copyCustomLogFilesLocation = new File( copyDir, "CopyCustomLogFilesLocation" );
File originalCustomLogFilesLocation = new File( originalDir, "originalCustomLogFilesLocation" );
Expand All @@ -183,6 +183,7 @@ copyDir, config, loadKernelExtensions(), NullLogProvider.getInstance(), fileSyst
fileSystem, true );

copier.copyStore( storeCopyRequest, CancellationRequest.NEVER_CANCELLED, MoveAfterCopy.moveReplaceExisting() );
original.shutdown();

assertFalse( new File( copyDir, TEMP_COPY_DIRECTORY_NAME ).exists() );

Expand Down

0 comments on commit 4cb2df0

Please sign in to comment.