Skip to content

Commit

Permalink
Backport incremental backup tx log rotation with config override
Browse files Browse the repository at this point in the history
Backporting 9647396 which allows
backup users to override configuration and allow for transaction
log rotation.
  • Loading branch information
phughk committed Oct 5, 2018
1 parent 36cb23e commit 3b77b20
Show file tree
Hide file tree
Showing 14 changed files with 374 additions and 52 deletions.
Expand Up @@ -55,14 +55,14 @@ class BackupDelegator extends LifecycleAdapter

void copy( AdvertisedSocketAddress fromAddress, StoreId expectedStoreId, Path destDir ) throws StoreCopyFailedException
{
remoteStore.copy( new CatchupAddressProvider.SingleAddressProvider( fromAddress ), expectedStoreId, destDir.toFile() );
remoteStore.copy( new CatchupAddressProvider.SingleAddressProvider( fromAddress ), expectedStoreId, destDir.toFile(), true );
}

CatchupResult tryCatchingUp( AdvertisedSocketAddress fromAddress, StoreId expectedStoreId, Path storeDir ) throws StoreCopyFailedException
{
try
{
return remoteStore.tryCatchingUp( fromAddress, expectedStoreId, storeDir.toFile(), true );
return remoteStore.tryCatchingUp( fromAddress, expectedStoreId, storeDir.toFile(), true, true );
}
catch ( IOException e )
{
Expand Down
Expand Up @@ -311,7 +311,7 @@ private String argAdditionalConf( File backupTarget ) throws IOException
return format( "--additional-config=%s", configFile );
}

private void writeConfigToFile( Config config, File file ) throws IOException
static void writeConfigToFile( Config config, File file ) throws IOException
{
try ( Writer fileWriter = new BufferedWriter( new FileWriter( file ) ) )
{
Expand Down
Expand Up @@ -77,7 +77,7 @@ public void tryCatchingUpDelegatesToRemoteStore() throws StoreCopyFailedExceptio
subject.tryCatchingUp( fromAddress, expectedStoreId, storeDir );

// then
verify( remoteStore ).tryCatchingUp( fromAddress, expectedStoreId, storeDir.toFile(), true );
verify( remoteStore ).tryCatchingUp( fromAddress, expectedStoreId, storeDir.toFile(), true, true );
}

@Test
Expand Down Expand Up @@ -130,7 +130,7 @@ public void retrieveStoreDelegatesToStoreCopyService()

// then
ArgumentCaptor<CatchupAddressProvider> argumentCaptor = ArgumentCaptor.forClass( CatchupAddressProvider.class );
verify( remoteStore ).copy( argumentCaptor.capture(), eq( storeId ), eq( anyFile.toFile() ) );
verify( remoteStore ).copy( argumentCaptor.capture(), eq( storeId ), eq( anyFile.toFile() ), eq( true ) );

//and
assertEquals( anyAddress, argumentCaptor.getValue().primary() );
Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.backup.impl;

import java.io.File;
import java.io.IOException;

import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory;
import org.neo4j.kernel.impl.transaction.log.files.LogFiles;
import org.neo4j.kernel.impl.transaction.log.files.LogFilesBuilder;

class BackupTransactionLogFilesHelper
{
LogFiles readLogFiles( File backupDir ) throws IOException
{
FileSystemAbstraction fileSystemAbstraction = new DefaultFileSystemAbstraction();
PageCache pageCache = ConfigurableStandalonePageCacheFactory.createPageCache( fileSystemAbstraction );
return LogFilesBuilder.activeFilesBuilder( backupDir, fileSystemAbstraction, pageCache ).build();
}
}
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
import java.util.stream.LongStream;

import org.neo4j.causalclustering.ClusterHelper;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
Expand All @@ -57,12 +58,16 @@
import org.neo4j.causalclustering.discovery.IpFamily;
import org.neo4j.causalclustering.discovery.SharedDiscoveryServiceFactory;
import org.neo4j.causalclustering.helpers.CausalClusteringTestHelpers;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.ByteUnit;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
import org.neo4j.kernel.impl.store.format.highlimit.HighLimit;
import org.neo4j.kernel.impl.store.format.standard.Standard;
import org.neo4j.kernel.impl.transaction.log.files.LogFiles;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.test.DbRepresentation;
import org.neo4j.test.causalclustering.ClusterRule;
Expand All @@ -72,10 +77,11 @@

import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.joining;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;

Expand Down Expand Up @@ -279,6 +285,54 @@ public void reportsProgress() throws Exception
assertTrue( output.contains( "Finished receiving index snapshots" ) );
}

@Test
public void onlyTheLatestTransactionIsKeptAfterIncrementalBackup() throws Exception
{
// given database exists with data
Cluster cluster = startCluster( recordFormat );
createSomeData( cluster );

// and backup client is told to rotate conveniently
Config config = Config
.builder()
.withSetting( GraphDatabaseSettings.logical_log_rotation_threshold, "1m" )
.build();
File configOverrideFile = testDirectory.file( "neo4j-backup.conf" );
OnlineBackupCommandBuilder.writeConfigToFile( config, configOverrideFile );

// and we have a full backup
final String backupName = "backupName" + recordFormat;
String address = CausalClusteringTestHelpers.backupAddress( clusterLeader( cluster ).database() );
assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode(
"--from", address,
"--cc-report-dir=" + backupDir,
"--backup-dir=" + backupDir,
"--additional-config=" + configOverrideFile,
"--name=" + backupName ) );

// and the database contains a few more transactions
transactions1M( cluster );
transactions1M( cluster ); // rotation, second tx log file

// when we perform an incremental backup
assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode(
"--from", address,
"--cc-report-dir=" + backupDir,
"--backup-dir=" + backupDir,
"--additional-config=" + configOverrideFile,
"--name=" + backupName ) );

// then there has been a rotation
BackupTransactionLogFilesHelper backupTransactionLogFilesHelper = new BackupTransactionLogFilesHelper();
LogFiles logFiles = backupTransactionLogFilesHelper.readLogFiles( backupDir.toPath().resolve( backupName ).toFile() );
long highestTxIdInLogFiles = logFiles.getHighestLogVersion();
assertEquals( 2, highestTxIdInLogFiles );

// and the original log has not been removed since the transactions are applied at start
long lowestTxIdInLogFiles = logFiles.getLowestLogVersion();
assertEquals( 0, lowestTxIdInLogFiles );
}

@Test
public void ipv6Enabled() throws Exception
{
Expand Down Expand Up @@ -463,6 +517,23 @@ sharedParams, emptyMap(),
return cluster;
}

private static void transactions1M( Cluster cluster ) throws Exception
{
int numberOfTransactions = 500;
long sizeOfTransaction = (ByteUnit.mebiBytes( 1 ) / numberOfTransactions) + 1;
for ( int txId = 0; txId < numberOfTransactions; txId++ )
{
cluster.coreTx( ( coreGraphDatabase, transaction ) ->
{
Node node = coreGraphDatabase.createNode();
String longString = LongStream.range( 0, sizeOfTransaction ).map( l -> l % 10 ).mapToObj( Long::toString ).collect( joining( "" ) );
node.setProperty( "name", longString );
coreGraphDatabase.createNode().createRelationshipTo( node, RelationshipType.withName( "KNOWS" ) );
transaction.success();
} );
}
}

public static DbRepresentation createSomeData( Cluster cluster )
{
try
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.LongStream;

import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.GraphDatabaseService;
Expand All @@ -51,6 +52,7 @@
import org.neo4j.graphdb.factory.GraphDatabaseBuilder;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.Settings;
Expand All @@ -65,6 +67,7 @@
import org.neo4j.test.rule.TestDirectory;

import static java.lang.String.format;
import static java.util.stream.Collectors.joining;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -347,6 +350,74 @@ public void backupRenamesWork() throws Exception
db2.shutdown();
}

@Test
public void onlyTheLatestTransactionIsKeptAfterIncrementalBackup() throws Exception
{
// given database exists with data
int port = PortAuthority.allocatePort();
startDb( port );
createSomeData( db );

// and backup client is told to rotate conveniently
Config config = Config
.builder()
.withSetting( GraphDatabaseSettings.logical_log_rotation_threshold, "1m" )
.build();
File configOverrideFile = testDirectory.file( "neo4j-backup.conf" );
OnlineBackupCommandBuilder.writeConfigToFile( config, configOverrideFile );

// and we have a full backup
String backupName = "backupName" + recordFormat;
String address = "localhost:" + port;
assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( backupDir,
"--from", address,
"--cc-report-dir=" + backupDir,
"--backup-dir=" + backupDir,
"--protocol=common",
"--additional-config=" + configOverrideFile,
"--name=" + backupName ) );

// and the database contains a few more transactions
transactions1M( db );
transactions1M( db ); // rotation, second tx log file

// when we perform an incremental backup
assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( backupDir,
"--from", address,
"--cc-report-dir=" + backupDir,
"--backup-dir=" + backupDir,
"--protocol=common",
"--additional-config=" + configOverrideFile,
"--name=" + backupName ) );

// then there has been a rotation
BackupTransactionLogFilesHelper backupTransactionLogFilesHelper = new BackupTransactionLogFilesHelper();
LogFiles logFiles = backupTransactionLogFilesHelper.readLogFiles( backupDir.toPath().resolve( backupName ).toFile() );
long highestTxIdInLogFiles = logFiles.getHighestLogVersion();
assertEquals( 2, highestTxIdInLogFiles );

// and the original log has not been removed since the transactions are applied at start
long lowestTxIdInLogFiles = logFiles.getLowestLogVersion();
assertEquals( 0, lowestTxIdInLogFiles );
}

private static void transactions1M( GraphDatabaseService db )
{
int numberOfTransactions = 500;
long sizeOfTransaction = (ByteUnit.mebiBytes( 1 ) / numberOfTransactions) + 1;
for ( int txId = 0; txId < numberOfTransactions; txId++ )
{
try ( Transaction tx = db.beginTx() )
{
Node node = db.createNode();
String longString = LongStream.range( 0, sizeOfTransaction ).map( l -> l % 10 ).mapToObj( Long::toString ).collect( joining( "" ) );
node.setProperty( "name", longString );
db.createNode().createRelationshipTo( node, RelationshipType.withName( "KNOWS" ) );
tx.success();
}
}
}

private void repeatedlyPopulateDatabase( GraphDatabaseService db, AtomicBoolean continueFlagReference )
{
while ( continueFlagReference.get() )
Expand Down
Expand Up @@ -93,36 +93,41 @@ public RemoteStore( LogProvider logProvider, FileSystemAbstraction fs, PageCache
* they end and pull from there, excluding the last one so that we do not
* get duplicate entries.
*/
public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, boolean keepTxLogsInDir )
public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, boolean keepTxLogsInDir,
boolean forceTransactionLogRotation )
throws StoreCopyFailedException, IOException
{
CommitState commitState = commitStateHelper.getStoreState( storeDir );
log.info( "Store commit state: " + commitState );

if ( commitState.transactionLogIndex().isPresent() )
{
return pullTransactions( from, expectedStoreId, storeDir, commitState.transactionLogIndex().get() + 1, false, keepTxLogsInDir );
return pullTransactions( from, expectedStoreId, storeDir, commitState.transactionLogIndex().get() + 1, false, keepTxLogsInDir,
forceTransactionLogRotation );
}
else
{
CatchupResult catchupResult;
if ( commitState.metaDataStoreIndex() == BASE_TX_ID )
{
return pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex() + 1, false, keepTxLogsInDir );
return pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex() + 1, false, keepTxLogsInDir,
forceTransactionLogRotation );
}
else
{
catchupResult = pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex(), false, keepTxLogsInDir );
catchupResult = pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex(), false, keepTxLogsInDir,
forceTransactionLogRotation );
if ( catchupResult == E_TRANSACTION_PRUNED )
{
return pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex() + 1, false, keepTxLogsInDir );
return pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex() + 1, false, keepTxLogsInDir,
forceTransactionLogRotation );
}
}
return catchupResult;
}
}

public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreId, File destDir )
public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreId, File destDir, boolean rotateTransactionsManually )
throws StoreCopyFailedException
{
try
Expand All @@ -138,7 +143,7 @@ public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreI
// Even for cluster store copy, we still write the transaction logs into the store directory itself
// because the destination directory is temporary. We will copy them to the correct place later.
CatchupResult catchupResult = pullTransactions( addressProvider.primary(), expectedStoreId, destDir,
lastFlushedTxId, true, true );
lastFlushedTxId, true, true, rotateTransactionsManually );
if ( catchupResult != SUCCESS_END_OF_STREAM )
{
throw new StoreCopyFailedException( "Failed to pull transactions: " + catchupResult );
Expand All @@ -151,15 +156,15 @@ public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreI
}

private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, long fromTxId,
boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir )
boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir, boolean rotateTransactionsManually )
throws IOException, StoreCopyFailedException
{
StoreCopyClientMonitor storeCopyClientMonitor =
monitors.newMonitor( StoreCopyClientMonitor.class );
storeCopyClientMonitor.startReceivingTransactions( fromTxId );
long previousTxId = fromTxId - 1;
try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, config,
logProvider, fromTxId, asPartOfStoreCopy, keepTxLogsInStoreDir ) )
logProvider, fromTxId, asPartOfStoreCopy, keepTxLogsInStoreDir, rotateTransactionsManually ) )
{
log.info( "Pulling transactions from %s starting with txId: %d", from, fromTxId );
CatchupResult lastStatus;
Expand Down
Expand Up @@ -56,7 +56,7 @@ public void replaceWithStoreFrom( CatchupAddressProvider addressProvider, StoreI
{
try ( TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( fs, pageCache, localDatabase.storeDir() ) )
{
remoteStore.copy( addressProvider, expectedStoreId, tempStore.storeDir() );
remoteStore.copy( addressProvider, expectedStoreId, tempStore.storeDir(), false );
copiedStoreRecovery.recoverCopiedStore( tempStore.storeDir() );
localDatabase.replaceWith( tempStore.storeDir() );
}
Expand Down

0 comments on commit 3b77b20

Please sign in to comment.