diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupDelegator.java b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupDelegator.java index 40560ec9c39d3..425e7ea413fcf 100644 --- a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupDelegator.java +++ b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupDelegator.java @@ -55,14 +55,14 @@ class BackupDelegator extends LifecycleAdapter void copy( AdvertisedSocketAddress fromAddress, StoreId expectedStoreId, Path destDir ) throws StoreCopyFailedException { - remoteStore.copy( new CatchupAddressProvider.SingleAddressProvider( fromAddress ), expectedStoreId, destDir.toFile() ); + remoteStore.copy( new CatchupAddressProvider.SingleAddressProvider( fromAddress ), expectedStoreId, destDir.toFile(), true ); } CatchupResult tryCatchingUp( AdvertisedSocketAddress fromAddress, StoreId expectedStoreId, Path storeDir ) throws StoreCopyFailedException { try { - return remoteStore.tryCatchingUp( fromAddress, expectedStoreId, storeDir.toFile(), true ); + return remoteStore.tryCatchingUp( fromAddress, expectedStoreId, storeDir.toFile(), true, true ); } catch ( IOException e ) { diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/impl/OnlineBackupCommandBuilder.java b/enterprise/backup/src/main/java/org/neo4j/backup/impl/OnlineBackupCommandBuilder.java index 33fe82ba7ede3..413e7b9d07fe9 100644 --- a/enterprise/backup/src/main/java/org/neo4j/backup/impl/OnlineBackupCommandBuilder.java +++ b/enterprise/backup/src/main/java/org/neo4j/backup/impl/OnlineBackupCommandBuilder.java @@ -311,7 +311,7 @@ private String argAdditionalConf( File backupTarget ) throws IOException return format( "--additional-config=%s", configFile ); } - private void writeConfigToFile( Config config, File file ) throws IOException + static void writeConfigToFile( Config config, File file ) throws IOException { try ( Writer fileWriter = new BufferedWriter( new FileWriter( file ) ) ) { diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupDelegatorTest.java b/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupDelegatorTest.java index afcaa85c33a71..2f66b7baabacf 100644 --- a/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupDelegatorTest.java +++ b/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupDelegatorTest.java @@ -77,7 +77,7 @@ public void tryCatchingUpDelegatesToRemoteStore() throws StoreCopyFailedExceptio subject.tryCatchingUp( fromAddress, expectedStoreId, storeDir ); // then - verify( remoteStore ).tryCatchingUp( fromAddress, expectedStoreId, storeDir.toFile(), true ); + verify( remoteStore ).tryCatchingUp( fromAddress, expectedStoreId, storeDir.toFile(), true, true ); } @Test @@ -130,7 +130,7 @@ public void retrieveStoreDelegatesToStoreCopyService() // then ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass( CatchupAddressProvider.class ); - verify( remoteStore ).copy( argumentCaptor.capture(), eq( storeId ), eq( anyFile.toFile() ) ); + verify( remoteStore ).copy( argumentCaptor.capture(), eq( storeId ), eq( anyFile.toFile() ), eq( true ) ); //and assertEquals( anyAddress, argumentCaptor.getValue().primary() ); diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupTransactionLogFilesHelper.java b/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupTransactionLogFilesHelper.java new file mode 100644 index 0000000000000..b0354f3e4be92 --- /dev/null +++ b/enterprise/backup/src/test/java/org/neo4j/backup/impl/BackupTransactionLogFilesHelper.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.backup.impl; + +import java.io.File; +import java.io.IOException; + +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.impl.pagecache.ConfigurableStandalonePageCacheFactory; +import org.neo4j.kernel.impl.transaction.log.files.LogFiles; +import org.neo4j.kernel.impl.transaction.log.files.LogFilesBuilder; + +class BackupTransactionLogFilesHelper +{ + LogFiles readLogFiles( File backupDir ) throws IOException + { + FileSystemAbstraction fileSystemAbstraction = new DefaultFileSystemAbstraction(); + PageCache pageCache = ConfigurableStandalonePageCacheFactory.createPageCache( fileSystemAbstraction ); + return LogFilesBuilder.activeFilesBuilder( backupDir, fileSystemAbstraction, pageCache ).build(); + } +} diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandCcIT.java b/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandCcIT.java index e88ab066181be..6ced5483b0173 100644 --- a/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandCcIT.java +++ b/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandCcIT.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntFunction; +import java.util.stream.LongStream; import org.neo4j.causalclustering.ClusterHelper; import org.neo4j.causalclustering.core.CausalClusteringSettings; @@ -57,12 +58,16 @@ import org.neo4j.causalclustering.discovery.IpFamily; import org.neo4j.causalclustering.discovery.SharedDiscoveryServiceFactory; import org.neo4j.causalclustering.helpers.CausalClusteringTestHelpers; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.RelationshipType; import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.io.ByteUnit; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings; import org.neo4j.kernel.impl.store.format.highlimit.HighLimit; import org.neo4j.kernel.impl.store.format.standard.Standard; +import org.neo4j.kernel.impl.transaction.log.files.LogFiles; import org.neo4j.ports.allocation.PortAuthority; import org.neo4j.test.DbRepresentation; import org.neo4j.test.causalclustering.ClusterRule; @@ -72,10 +77,11 @@ import static java.lang.String.format; import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.joining; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeFalse; @@ -279,6 +285,54 @@ public void reportsProgress() throws Exception assertTrue( output.contains( "Finished receiving index snapshots" ) ); } + @Test + public void onlyTheLatestTransactionIsKeptAfterIncrementalBackup() throws Exception + { + // given database exists with data + Cluster cluster = startCluster( recordFormat ); + createSomeData( cluster ); + + // and backup client is told to rotate conveniently + Config config = Config + .builder() + .withSetting( GraphDatabaseSettings.logical_log_rotation_threshold, "1m" ) + .build(); + File configOverrideFile = testDirectory.file( "neo4j-backup.conf" ); + OnlineBackupCommandBuilder.writeConfigToFile( config, configOverrideFile ); + + // and we have a full backup + final String backupName = "backupName" + recordFormat; + String address = CausalClusteringTestHelpers.backupAddress( clusterLeader( cluster ).database() ); + assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( + "--from", address, + "--cc-report-dir=" + backupDir, + "--backup-dir=" + backupDir, + "--additional-config=" + configOverrideFile, + "--name=" + backupName ) ); + + // and the database contains a few more transactions + transactions1M( cluster ); + transactions1M( cluster ); // rotation, second tx log file + + // when we perform an incremental backup + assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( + "--from", address, + "--cc-report-dir=" + backupDir, + "--backup-dir=" + backupDir, + "--additional-config=" + configOverrideFile, + "--name=" + backupName ) ); + + // then there has been a rotation + BackupTransactionLogFilesHelper backupTransactionLogFilesHelper = new BackupTransactionLogFilesHelper(); + LogFiles logFiles = backupTransactionLogFilesHelper.readLogFiles( backupDir.toPath().resolve( backupName ).toFile() ); + long highestTxIdInLogFiles = logFiles.getHighestLogVersion(); + assertEquals( 2, highestTxIdInLogFiles ); + + // and the original log has not been removed since the transactions are applied at start + long lowestTxIdInLogFiles = logFiles.getLowestLogVersion(); + assertEquals( 0, lowestTxIdInLogFiles ); + } + @Test public void ipv6Enabled() throws Exception { @@ -463,6 +517,23 @@ sharedParams, emptyMap(), return cluster; } + private static void transactions1M( Cluster cluster ) throws Exception + { + int numberOfTransactions = 500; + long sizeOfTransaction = (ByteUnit.mebiBytes( 1 ) / numberOfTransactions) + 1; + for ( int txId = 0; txId < numberOfTransactions; txId++ ) + { + cluster.coreTx( ( coreGraphDatabase, transaction ) -> + { + Node node = coreGraphDatabase.createNode(); + String longString = LongStream.range( 0, sizeOfTransaction ).map( l -> l % 10 ).mapToObj( Long::toString ).collect( joining( "" ) ); + node.setProperty( "name", longString ); + coreGraphDatabase.createNode().createRelationshipTo( node, RelationshipType.withName( "KNOWS" ) ); + transaction.success(); + } ); + } + } + public static DbRepresentation createSomeData( Cluster cluster ) { try diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandHaIT.java b/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandHaIT.java index 909f0cbf40972..a3a9dc4e85bad 100644 --- a/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandHaIT.java +++ b/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandHaIT.java @@ -41,6 +41,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.LongStream; import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.graphdb.GraphDatabaseService; @@ -51,6 +52,7 @@ import org.neo4j.graphdb.factory.GraphDatabaseBuilder; import org.neo4j.graphdb.factory.GraphDatabaseFactory; import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.io.ByteUnit; import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Settings; @@ -65,6 +67,7 @@ import org.neo4j.test.rule.TestDirectory; import static java.lang.String.format; +import static java.util.stream.Collectors.joining; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -347,6 +350,74 @@ public void backupRenamesWork() throws Exception db2.shutdown(); } + @Test + public void onlyTheLatestTransactionIsKeptAfterIncrementalBackup() throws Exception + { + // given database exists with data + int port = PortAuthority.allocatePort(); + startDb( port ); + createSomeData( db ); + + // and backup client is told to rotate conveniently + Config config = Config + .builder() + .withSetting( GraphDatabaseSettings.logical_log_rotation_threshold, "1m" ) + .build(); + File configOverrideFile = testDirectory.file( "neo4j-backup.conf" ); + OnlineBackupCommandBuilder.writeConfigToFile( config, configOverrideFile ); + + // and we have a full backup + String backupName = "backupName" + recordFormat; + String address = "localhost:" + port; + assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( backupDir, + "--from", address, + "--cc-report-dir=" + backupDir, + "--backup-dir=" + backupDir, + "--protocol=common", + "--additional-config=" + configOverrideFile, + "--name=" + backupName ) ); + + // and the database contains a few more transactions + transactions1M( db ); + transactions1M( db ); // rotation, second tx log file + + // when we perform an incremental backup + assertEquals( 0, runBackupToolFromOtherJvmToGetExitCode( backupDir, + "--from", address, + "--cc-report-dir=" + backupDir, + "--backup-dir=" + backupDir, + "--protocol=common", + "--additional-config=" + configOverrideFile, + "--name=" + backupName ) ); + + // then there has been a rotation + BackupTransactionLogFilesHelper backupTransactionLogFilesHelper = new BackupTransactionLogFilesHelper(); + LogFiles logFiles = backupTransactionLogFilesHelper.readLogFiles( backupDir.toPath().resolve( backupName ).toFile() ); + long highestTxIdInLogFiles = logFiles.getHighestLogVersion(); + assertEquals( 2, highestTxIdInLogFiles ); + + // and the original log has not been removed since the transactions are applied at start + long lowestTxIdInLogFiles = logFiles.getLowestLogVersion(); + assertEquals( 0, lowestTxIdInLogFiles ); + } + + private static void transactions1M( GraphDatabaseService db ) + { + int numberOfTransactions = 500; + long sizeOfTransaction = (ByteUnit.mebiBytes( 1 ) / numberOfTransactions) + 1; + for ( int txId = 0; txId < numberOfTransactions; txId++ ) + { + try ( Transaction tx = db.beginTx() ) + { + Node node = db.createNode(); + String longString = LongStream.range( 0, sizeOfTransaction ).map( l -> l % 10 ).mapToObj( Long::toString ).collect( joining( "" ) ); + node.setProperty( "name", longString ); + db.createNode().createRelationshipTo( node, RelationshipType.withName( "KNOWS" ) ); + tx.success(); + } + } + } + private void repeatedlyPopulateDatabase( GraphDatabaseService db, AtomicBoolean continueFlagReference ) { while ( continueFlagReference.get() ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java index 8635c138238a2..bcdf62d1152d8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStore.java @@ -93,7 +93,8 @@ public RemoteStore( LogProvider logProvider, FileSystemAbstraction fs, PageCache * they end and pull from there, excluding the last one so that we do not * get duplicate entries. */ - public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, boolean keepTxLogsInDir ) + public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, boolean keepTxLogsInDir, + boolean forceTransactionLogRotation ) throws StoreCopyFailedException, IOException { CommitState commitState = commitStateHelper.getStoreState( storeDir ); @@ -101,28 +102,32 @@ public CatchupResult tryCatchingUp( AdvertisedSocketAddress from, StoreId expect if ( commitState.transactionLogIndex().isPresent() ) { - return pullTransactions( from, expectedStoreId, storeDir, commitState.transactionLogIndex().get() + 1, false, keepTxLogsInDir ); + return pullTransactions( from, expectedStoreId, storeDir, commitState.transactionLogIndex().get() + 1, false, keepTxLogsInDir, + forceTransactionLogRotation ); } else { CatchupResult catchupResult; if ( commitState.metaDataStoreIndex() == BASE_TX_ID ) { - return pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex() + 1, false, keepTxLogsInDir ); + return pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex() + 1, false, keepTxLogsInDir, + forceTransactionLogRotation ); } else { - catchupResult = pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex(), false, keepTxLogsInDir ); + catchupResult = pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex(), false, keepTxLogsInDir, + forceTransactionLogRotation ); if ( catchupResult == E_TRANSACTION_PRUNED ) { - return pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex() + 1, false, keepTxLogsInDir ); + return pullTransactions( from, expectedStoreId, storeDir, commitState.metaDataStoreIndex() + 1, false, keepTxLogsInDir, + forceTransactionLogRotation ); } } return catchupResult; } } - public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreId, File destDir ) + public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreId, File destDir, boolean rotateTransactionsManually ) throws StoreCopyFailedException { try @@ -138,7 +143,7 @@ public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreI // Even for cluster store copy, we still write the transaction logs into the store directory itself // because the destination directory is temporary. We will copy them to the correct place later. CatchupResult catchupResult = pullTransactions( addressProvider.primary(), expectedStoreId, destDir, - lastFlushedTxId, true, true ); + lastFlushedTxId, true, true, rotateTransactionsManually ); if ( catchupResult != SUCCESS_END_OF_STREAM ) { throw new StoreCopyFailedException( "Failed to pull transactions: " + catchupResult ); @@ -151,7 +156,7 @@ public void copy( CatchupAddressProvider addressProvider, StoreId expectedStoreI } private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId expectedStoreId, File storeDir, long fromTxId, - boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir ) + boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir, boolean rotateTransactionsManually ) throws IOException, StoreCopyFailedException { StoreCopyClientMonitor storeCopyClientMonitor = @@ -159,7 +164,7 @@ private CatchupResult pullTransactions( AdvertisedSocketAddress from, StoreId ex storeCopyClientMonitor.startReceivingTransactions( fromTxId ); long previousTxId = fromTxId - 1; try ( TransactionLogCatchUpWriter writer = transactionLogFactory.create( storeDir, fs, pageCache, config, - logProvider, fromTxId, asPartOfStoreCopy, keepTxLogsInStoreDir ) ) + logProvider, fromTxId, asPartOfStoreCopy, keepTxLogsInStoreDir, rotateTransactionsManually ) ) { log.info( "Pulling transactions from %s starting with txId: %d", from, fromTxId ); CatchupResult lastStatus; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyProcess.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyProcess.java index ee4dc9fb570ad..48dc395d471c1 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyProcess.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyProcess.java @@ -56,7 +56,7 @@ public void replaceWithStoreFrom( CatchupAddressProvider addressProvider, StoreI { try ( TemporaryStoreDirectory tempStore = new TemporaryStoreDirectory( fs, pageCache, localDatabase.storeDir() ) ) { - remoteStore.copy( addressProvider, expectedStoreId, tempStore.storeDir() ); + remoteStore.copy( addressProvider, expectedStoreId, tempStore.storeDir(), false ); copiedStoreRecovery.recoverCopiedStore( tempStore.storeDir() ); localDatabase.replaceWith( tempStore.storeDir() ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpFactory.java index 99d03017a6a93..4cf15e8586334 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpFactory.java @@ -33,10 +33,11 @@ public class TransactionLogCatchUpFactory { public TransactionLogCatchUpWriter create( File storeDir, FileSystemAbstraction fs, PageCache pageCache, - Config config, LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir ) + Config config, LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir, + boolean rotateTransactionsManually ) throws IOException { return new TransactionLogCatchUpWriter( storeDir, fs, pageCache, config, logProvider, fromTxId, - asPartOfStoreCopy, keepTxLogsInStoreDir ); + asPartOfStoreCopy, keepTxLogsInStoreDir, rotateTransactionsManually ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriter.java index c7bc0ea6e2de3..71bae2b8ec56b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriter.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriter.java @@ -25,22 +25,32 @@ import java.io.File; import java.io.IOException; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.store.MetaDataStore; +import org.neo4j.kernel.impl.store.NeoStores; +import org.neo4j.kernel.impl.store.StoreFactory; +import org.neo4j.kernel.impl.store.format.RecordFormatSelector; +import org.neo4j.kernel.impl.store.format.RecordFormats; +import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.LogPosition; import org.neo4j.kernel.impl.transaction.log.TransactionLogWriter; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriter; import org.neo4j.kernel.impl.transaction.log.files.LogFiles; import org.neo4j.kernel.impl.transaction.log.files.LogFilesBuilder; +import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.lifecycle.Lifespan; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import static java.lang.String.format; +import static org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier.EMPTY; +import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET; import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_TRANSACTION_ID; +import static org.neo4j.kernel.impl.store.StoreType.META_DATA; import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; public class TransactionLogCatchUpWriter implements TxPullResponseListener, AutoCloseable @@ -52,22 +62,31 @@ public class TransactionLogCatchUpWriter implements TxPullResponseListener, Auto private final TransactionLogWriter writer; private final LogFiles logFiles; private final File storeDir; + private final NeoStores stores; + private final boolean rotateTransactionsManually; private long lastTxId = -1; private long expectedTxId; TransactionLogCatchUpWriter( File storeDir, FileSystemAbstraction fs, PageCache pageCache, Config config, - LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir ) throws IOException + LogProvider logProvider, long fromTxId, boolean asPartOfStoreCopy, boolean keepTxLogsInStoreDir, + boolean forceTransactionRotations ) throws IOException { this.pageCache = pageCache; this.log = logProvider.getLog( getClass() ); this.asPartOfStoreCopy = asPartOfStoreCopy; - LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache ) - .withLastCommittedTransactionIdSupplier( () -> fromTxId - 1 ); - if ( !keepTxLogsInStoreDir ) - { - logFilesBuilder.withConfig( config ); - } + this.rotateTransactionsManually = forceTransactionRotations; + RecordFormats recordFormats = RecordFormatSelector.selectForStoreOrConfig( Config.defaults(), storeDir, pageCache, logProvider ); + this.stores = new StoreFactory( storeDir, config, new DefaultIdGeneratorFactory( fs ), pageCache, fs, recordFormats, logProvider, EMPTY ) + .openNeoStores( META_DATA ); + Dependencies dependencies = new Dependencies(); + dependencies.satisfyDependency( stores.getMetaDataStore() ); + LogFilesBuilder logFilesBuilder = LogFilesBuilder + .builder( storeDir, fs ) + .withDependencies( dependencies ) + .withLastCommittedTransactionIdSupplier( () -> fromTxId - 1 ) + .withConfig( customisedConfig( config, keepTxLogsInStoreDir, forceTransactionRotations ) ) + .withLogVersionRepository( stores.getMetaDataStore() ); this.logFiles = logFilesBuilder.build(); this.lifespan.add( logFiles ); this.writer = new TransactionLogWriter( new LogEntryWriter( logFiles.getLogFile().getWriter() ) ); @@ -75,12 +94,37 @@ public class TransactionLogCatchUpWriter implements TxPullResponseListener, Auto this.expectedTxId = fromTxId; } + private Config customisedConfig( Config original, boolean keepTxLogsInStoreDir, boolean forceTransactionRotations ) + { + Config config = Config.builder() + .build(); + if ( !keepTxLogsInStoreDir ) + { + original.getRaw( GraphDatabaseSettings.logical_logs_location.name() ) + .ifPresent( v -> config.augment( GraphDatabaseSettings.logical_logs_location, v ) ); + } + if ( forceTransactionRotations ) + { + original.getRaw( GraphDatabaseSettings.logical_log_rotation_threshold.name() ) + .ifPresent( v -> config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, v ) ); + } + return config; + } + @Override public synchronized void onTxReceived( TxPullResponse txPullResponse ) { CommittedTransactionRepresentation tx = txPullResponse.tx(); long receivedTxId = tx.getCommitEntry().getTxId(); + // neo4j admin backup clients pull transactions indefinitely and have no monitoring mechanism for tx log rotation + // Other cases, ex. Read Replicas have an external mechanism that rotates independently of this process and don't need to + // manually rotate while pulling + if ( rotateTransactionsManually && logFiles.getLogFile().rotationNeeded() ) + { + rotateTransactionLogs( logFiles ); + } + if ( receivedTxId != expectedTxId ) { throw new RuntimeException( format( "Expected txId: %d but got: %d", expectedTxId, receivedTxId ) ); @@ -99,15 +143,30 @@ public synchronized void onTxReceived( TxPullResponse txPullResponse ) } } + private static void rotateTransactionLogs( LogFiles logFiles ) + { + try + { + logFiles.getLogFile().rotate(); + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + } + @Override public synchronized void close() throws IOException { if ( asPartOfStoreCopy ) { - /* A checkpoint which points to the beginning of the log file, meaning that + /* A checkpoint which points to the beginning of all the log files, meaning that all the streamed transactions will be applied as part of recovery. */ - long logVersion = logFiles.getHighestLogVersion(); - writer.checkPoint( new LogPosition( logVersion, LOG_HEADER_SIZE ) ); + long logVersion = logFiles.getLowestLogVersion(); + LogPosition checkPointPosition = new LogPosition( logVersion, LOG_HEADER_SIZE ); + + log.info( "Writing checkpoint as part of store copy: " + checkPointPosition ); + writer.checkPoint( checkPointPosition ); // * comment copied from old StoreCopyClient * // since we just create new log and put checkpoint into it with offset equals to @@ -121,8 +180,8 @@ public synchronized void close() throws IOException MetaDataStore.setRecord( pageCache, neoStore, - MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, - LOG_HEADER_SIZE ); + LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, + checkPointPosition.getByteOffset() ); } lifespan.close(); @@ -132,5 +191,6 @@ public synchronized void close() throws IOException File neoStoreFile = new File( storeDir, MetaDataStore.DEFAULT_NAME ); MetaDataStore.setRecord( pageCache, neoStoreFile, LAST_TRANSACTION_ID, lastTxId ); } + stores.close(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloader.java index 581a534ea3fd3..29288dc1d2afb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloader.java @@ -169,7 +169,7 @@ public void onCoreSnapshot( CompletableFuture signal, CoreSnapshot CatchupResult catchupResult; try { - catchupResult = remoteStore.tryCatchingUp( primary, localStoreId, localDatabase.storeDir(), false ); + catchupResult = remoteStore.tryCatchingUp( primary, localStoreId, localDatabase.storeDir(), false, false ); } catch ( StoreCopyFailedException e ) { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStoreTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStoreTest.java index 6ddd59becb131..aa0bce21ce55b 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStoreTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/RemoteStoreTest.java @@ -71,7 +71,7 @@ public void shouldCopyStoreFilesAndPullTransactions() throws Exception // when AdvertisedSocketAddress localhost = new AdvertisedSocketAddress( "127.0.0.1", 1234 ); CatchupAddressProvider catchupAddressProvider = CatchupAddressProvider.fromSingleAddress( localhost ); - remoteStore.copy( catchupAddressProvider, storeId, new File( "destination" ) ); + remoteStore.copy( catchupAddressProvider, storeId, new File( "destination" ), true ); // then verify( storeCopyClient ).copyStoreFiles( eq( catchupAddressProvider ), eq( storeId ), any( StoreFileStreamProvider.class ), any(), any() ); @@ -101,7 +101,7 @@ public void shouldSetLastPulledTransactionId() throws Exception null, storeCopyClient, txPullClient, factory( writer ), Config.defaults(), new Monitors() ); // when - remoteStore.copy( catchupAddressProvider, wantedStoreId, new File( "destination" ) ); + remoteStore.copy( catchupAddressProvider, wantedStoreId, new File( "destination" ), true ); // then long previousTxId = lastFlushedTxId - 1; // the interface is defined as asking for the one preceding @@ -129,7 +129,7 @@ public void shouldCloseDownTxLogWriterIfTxStreamingFails() throws Exception // when try { - remoteStore.copy( catchupAddressProvider, storeId, null ); + remoteStore.copy( catchupAddressProvider, storeId, null, true ); } catch ( StoreCopyFailedException e ) { @@ -144,7 +144,7 @@ private TransactionLogCatchUpFactory factory( TransactionLogCatchUpWriter writer { TransactionLogCatchUpFactory factory = mock( TransactionLogCatchUpFactory.class ); when( factory.create( isNull(), any( FileSystemAbstraction.class ), isNull(), any( Config.class ), - any( LogProvider.class ), anyLong(), anyBoolean(), anyBoolean() ) ).thenReturn( writer ); + any( LogProvider.class ), anyLong(), anyBoolean(), anyBoolean(), anyBoolean() ) ).thenReturn( writer ); return factory; } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriterTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriterTest.java index 08d3c2e0ee514..827dc1e3da071 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriterTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriterTest.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.stream.LongStream; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.graphdb.factory.GraphDatabaseSettings; @@ -69,9 +70,13 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.neo4j.kernel.impl.transaction.command.Commands.createNode; +import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; +import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; @RunWith( Parameterized.class ) public class TransactionLogCatchUpWriterTest @@ -98,6 +103,8 @@ public static List partOfStoreCopy() return Arrays.asList( Boolean.TRUE, Boolean.FALSE ); } + private final int MANY_TRANSACTIONS = 100_000; // should be somewhere above the rotation threshold + @Before public void setup() { @@ -119,6 +126,63 @@ public void createTransactionLogWithCheckpointInCustomLocation() throws IOExcept "custom-tx-logs"), false ); } + @Test + public void pullRotatesWhenThresholdCrossedAndExplicitlySet() throws IOException + { + // given + Config config = Config.defaults(); + config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1M" ); // 1 mebibyte + + // and + org.neo4j.kernel.impl.store.StoreId storeId = simulateStoreCopy(); + + // and + long fromTxId = BASE_TX_ID; + TransactionLogCatchUpWriter subject = + new TransactionLogCatchUpWriter( storeDir, fs, pageCache, config, NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, false, true ); + + // when a bunch of transactions received + LongStream.range( fromTxId, MANY_TRANSACTIONS ) + .mapToObj( TransactionLogCatchUpWriterTest::tx ) + .map( tx -> new TxPullResponse( toCasualStoreId( storeId ), tx ) ) + .forEach( subject::onTxReceived ); + subject.close(); + + // then there was a rotation + LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache ); + LogFiles logFiles = logFilesBuilder.build(); + assertNotEquals( logFiles.getLowestLogVersion(), logFiles.getHighestLogVersion() ); + verifyTransactionsInLog( logFiles, fromTxId, MANY_TRANSACTIONS ); + verifyCheckpointInLog( logFiles, partOfStoreCopy ); + } + + @Test + public void pullDoesntRotateWhenThresholdCrossedAndExplicitlyOff() throws IOException + { + // given + Config config = Config.defaults(); + config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1M" ); // 1 mebibyte + + // and + org.neo4j.kernel.impl.store.StoreId storeId = simulateStoreCopy(); + // and + long fromTxId = BASE_TX_ID; + TransactionLogCatchUpWriter subject = + new TransactionLogCatchUpWriter( storeDir, fs, pageCache, config, NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, false, false ); + + // when 1M tx received + LongStream.range( fromTxId, MANY_TRANSACTIONS ) + .mapToObj( TransactionLogCatchUpWriterTest::tx ) + .map(tx -> new TxPullResponse( toCasualStoreId( storeId ), tx )) + .forEach( subject::onTxReceived ); + subject.close(); + + // then there was a rotation + LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( storeDir, fs, pageCache ); + LogFiles logFiles = logFilesBuilder.build(); + assertEquals( logFiles.getLowestLogVersion(), logFiles.getHighestLogVersion() ); + } + private void createTransactionLogWithCheckpoint( Config config, boolean logsInStoreDir ) throws IOException { org.neo4j.kernel.impl.store.StoreId storeId = simulateStoreCopy(); @@ -127,7 +191,7 @@ private void createTransactionLogWithCheckpoint( Config config, boolean logsInSt int endTxId = fromTxId + 5; TransactionLogCatchUpWriter catchUpWriter = new TransactionLogCatchUpWriter( storeDir, fs, pageCache, config, - NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, logsInStoreDir ); + NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, logsInStoreDir, true ); // when for ( int i = fromTxId; i <= endTxId; i++ ) @@ -146,20 +210,26 @@ private void createTransactionLogWithCheckpoint( Config config, boolean logsInSt LogFiles logFiles = logFilesBuilder.build(); verifyTransactionsInLog( logFiles, fromTxId, endTxId ); - if ( partOfStoreCopy ) - { - verifyCheckpointInLog( logFiles ); - } + verifyCheckpointInLog( logFiles, partOfStoreCopy ); } - private void verifyCheckpointInLog( LogFiles logFiles ) + private void verifyCheckpointInLog( LogFiles logFiles, boolean shouldExist ) { LogEntryReader logEntryReader = new VersionAwareLogEntryReader<>( new RecordStorageCommandReaderFactory(), InvalidLogEntryHandler.STRICT ); final LogTailScanner logTailScanner = new LogTailScanner( logFiles, logEntryReader, new Monitors() ); LogTailInformation tailInformation = logTailScanner.getTailInformation(); + + if ( !shouldExist ) + { + assertNull( tailInformation.lastCheckPoint ); + return; + } + assertNotNull( tailInformation.lastCheckPoint ); + assertEquals( 0, tailInformation.lastCheckPoint.getLogPosition().getLogVersion() ); + assertEquals( LOG_HEADER_SIZE, tailInformation.lastCheckPoint.getLogPosition().getByteOffset() ); assertTrue( tailInformation.commitsAfterLastCheckpoint() ); } @@ -199,6 +269,7 @@ private org.neo4j.kernel.impl.store.StoreId simulateStoreCopy() throws IOExcepti // we don't have log files after a store copy LogFiles logFiles = LogFilesBuilder.logFilesBasedOnlyBuilder( storeDir, fsRule.get() ).build(); + //noinspection ResultOfMethodCallIgnored logFiles.accept( ( file, version ) -> file.delete() ); return storeId; @@ -209,10 +280,10 @@ private StoreId toCasualStoreId( org.neo4j.kernel.impl.store.StoreId storeId ) return new StoreId( storeId.getCreationTime(), storeId.getRandomId(), storeId.getUpgradeTime(), storeId.getUpgradeId() ); } - private static CommittedTransactionRepresentation tx( int id ) + private static CommittedTransactionRepresentation tx( long txId ) { return new CommittedTransactionRepresentation( - new LogEntryStart( id, id, id, id - 1, new byte[]{}, LogPosition.UNSPECIFIED ), - Commands.transactionRepresentation( createNode( 0 ) ), new LogEntryCommit( id, id ) ); + new LogEntryStart( 0, 0, 0, txId - 1, new byte[]{}, LogPosition.UNSPECIFIED ), + Commands.transactionRepresentation( createNode( 0 ) ), new LogEntryCommit( txId, 0 ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java index e926d6a64cf6d..7813178b6b39f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/snapshot/CoreStateDownloaderTest.java @@ -98,7 +98,7 @@ public void shouldDownloadCompleteStoreWhenEmpty() throws Throwable downloader.downloadSnapshot( catchupAddressProvider ); // then - verify( remoteStore, never() ).tryCatchingUp( any(), any(), any(), anyBoolean() ); + verify( remoteStore, never() ).tryCatchingUp( any(), any(), any(), anyBoolean(), anyBoolean() ); verify( storeCopyProcess ).replaceWithStoreFrom( catchupAddressProvider, remoteStoreId ); } @@ -130,8 +130,8 @@ public void shouldNotOverwriteNonEmptyMismatchingStore() throws Exception assertFalse( downloader.downloadSnapshot( catchupAddressProvider ) ); // then - verify( remoteStore, never() ).copy( any(), any(), any() ); - verify( remoteStore, never() ).tryCatchingUp( any(), any(), any(), anyBoolean() ); + verify( remoteStore, never() ).copy( any(), any(), any(), anyBoolean() ); + verify( remoteStore, never() ).tryCatchingUp( any(), any(), any(), anyBoolean(), anyBoolean() ); } @Test @@ -140,14 +140,14 @@ public void shouldCatchupIfPossible() throws Exception // given when( localDatabase.isEmpty() ).thenReturn( false ); when( remoteStore.getStoreId( remoteAddress ) ).thenReturn( storeId ); - when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir, false ) ).thenReturn( SUCCESS_END_OF_STREAM ); + when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir, false, false ) ).thenReturn( SUCCESS_END_OF_STREAM ); // when downloader.downloadSnapshot( catchupAddressProvider ); // then - verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir, false ); - verify( remoteStore, never() ).copy( any(), any(), any() ); + verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir, false, false ); + verify( remoteStore, never() ).copy( any(), any(), any(), anyBoolean() ); } @Test @@ -156,13 +156,13 @@ public void shouldDownloadWholeStoreIfCannotCatchUp() throws Exception // given when( localDatabase.isEmpty() ).thenReturn( false ); when( remoteStore.getStoreId( remoteAddress ) ).thenReturn( storeId ); - when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir, false ) ).thenReturn( E_TRANSACTION_PRUNED ); + when( remoteStore.tryCatchingUp( remoteAddress, storeId, storeDir, false, false ) ).thenReturn( E_TRANSACTION_PRUNED ); // when downloader.downloadSnapshot( catchupAddressProvider ); // then - verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir, false ); + verify( remoteStore ).tryCatchingUp( remoteAddress, storeId, storeDir, false, false ); verify( storeCopyProcess ).replaceWithStoreFrom( catchupAddressProvider, storeId ); } }