diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriter.java index d2ad91f13e1d..4c5895c11ada 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriter.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriter.java @@ -49,6 +49,7 @@ import static java.lang.String.format; import static org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier.EMPTY; +import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET; import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_TRANSACTION_ID; import static org.neo4j.kernel.impl.store.StoreType.META_DATA; import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; @@ -160,10 +161,13 @@ public synchronized void close() throws IOException { if ( asPartOfStoreCopy ) { - /* A checkpoint which points to the beginning of the log file, meaning that + /* A checkpoint which points to the beginning of all the log files, meaning that all the streamed transactions will be applied as part of recovery. */ - long logVersion = logFiles.getHighestLogVersion(); - writer.checkPoint( new LogPosition( logVersion, LOG_HEADER_SIZE ) ); + long logVersion = logFiles.getLowestLogVersion(); + LogPosition checkPointPosition = new LogPosition( logVersion, LOG_HEADER_SIZE ); + + log.info( "Writing checkpoint as part of store copy: " + checkPointPosition ); + writer.checkPoint( checkPointPosition ); // * comment copied from old StoreCopyClient * // since we just create new log and put checkpoint into it with offset equals to @@ -177,8 +181,8 @@ public synchronized void close() throws IOException MetaDataStore.setRecord( pageCache, neoStore, - MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, - LOG_HEADER_SIZE ); + LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET, + checkPointPosition.getByteOffset() ); } lifespan.close(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriterTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriterTest.java index 53c9513c3ca3..b4447918f755 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriterTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TransactionLogCatchUpWriterTest.java @@ -31,11 +31,10 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.stream.IntStream; +import java.util.stream.LongStream; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.io.ByteUnit; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.layout.DatabaseLayout; import org.neo4j.io.pagecache.PageCache; @@ -73,8 +72,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.neo4j.kernel.impl.transaction.command.Commands.createNode; +import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID; +import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE; @RunWith( Parameterized.class ) public class TransactionLogCatchUpWriterTest @@ -91,16 +93,18 @@ public class TransactionLogCatchUpWriterTest @Parameterized.Parameter public boolean partOfStoreCopy; - private PageCache pageCache; - private FileSystemAbstraction fs; - private DatabaseLayout databaseLayout; - - @Parameterized.Parameters + @Parameterized.Parameters( name = "Part of store copy: {0}" ) public static List partOfStoreCopy() { return Arrays.asList( Boolean.TRUE, Boolean.FALSE ); } + private final int MANY_TRANSACTIONS = 100_000; // should be somewhere above the rotation threshold + + private PageCache pageCache; + private FileSystemAbstraction fs; + private DatabaseLayout databaseLayout; + @Before public void setup() { @@ -127,20 +131,20 @@ public void pullRotatesWhenThresholdCrossedAndExplicitlySet() throws IOException { // given Config config = Config.defaults(); - config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1m" ); + config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1M" ); // 1 mebibyte // and org.neo4j.storageengine.api.StoreId storeId = simulateStoreCopy(); // and - long fromTx = 0; + long fromTxId = BASE_TX_ID; TransactionLogCatchUpWriter subject = - new TransactionLogCatchUpWriter( databaseLayout, fs, pageCache, config, NullLogProvider.getInstance(), fromTx, partOfStoreCopy, false, true ); + new TransactionLogCatchUpWriter( databaseLayout, fs, pageCache, config, NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, false, true ); - // when 1M tx received - IntStream.range( 0, (int) ByteUnit.mebiBytes( 1 ) ) + // when a bunch of transactions received + LongStream.range( fromTxId, MANY_TRANSACTIONS ) .mapToObj( TransactionLogCatchUpWriterTest::tx ) - .map(tx -> new TxPullResponse( toCasualStoreId( storeId ), tx )) + .map( tx -> new TxPullResponse( toCasualStoreId( storeId ), tx ) ) .forEach( subject::onTxReceived ); subject.close(); @@ -148,25 +152,28 @@ public void pullRotatesWhenThresholdCrossedAndExplicitlySet() throws IOException LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( databaseLayout, fs, pageCache ); LogFiles logFiles = logFilesBuilder.build(); assertNotEquals( logFiles.getLowestLogVersion(), logFiles.getHighestLogVersion() ); + verifyTransactionsInLog( logFiles, fromTxId, MANY_TRANSACTIONS ); + verifyCheckpointInLog( logFiles, partOfStoreCopy ); } @Test - public void pullDoesntRotateWhenThresholdCrossedAndExplicitlyOff() throws IOException + public void pullDoesNotRotateWhenThresholdCrossedAndExplicitlyOff() throws IOException { // given Config config = Config.defaults(); - config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1m" ); + config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1M" ); // 1 mebibyte // and org.neo4j.storageengine.api.StoreId storeId = simulateStoreCopy(); // and - long fromTx = 0; + long fromTxId = BASE_TX_ID; TransactionLogCatchUpWriter subject = - new TransactionLogCatchUpWriter( databaseLayout, fs, pageCache, config, NullLogProvider.getInstance(), fromTx, partOfStoreCopy, false, false ); + new TransactionLogCatchUpWriter( databaseLayout, fs, pageCache, config, NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, false, + false ); // when 1M tx received - IntStream.range( 0, (int) ByteUnit.mebiBytes( 1 ) ) + LongStream.range( fromTxId, MANY_TRANSACTIONS ) .mapToObj( TransactionLogCatchUpWriterTest::tx ) .map(tx -> new TxPullResponse( toCasualStoreId( storeId ), tx )) .forEach( subject::onTxReceived ); @@ -205,20 +212,26 @@ private void createTransactionLogWithCheckpoint( Config config, boolean logsInSt LogFiles logFiles = logFilesBuilder.build(); verifyTransactionsInLog( logFiles, fromTxId, endTxId ); - if ( partOfStoreCopy ) - { - verifyCheckpointInLog( logFiles ); - } + verifyCheckpointInLog( logFiles, partOfStoreCopy ); } - private void verifyCheckpointInLog( LogFiles logFiles ) + private void verifyCheckpointInLog( LogFiles logFiles, boolean shouldExist ) { LogEntryReader logEntryReader = new VersionAwareLogEntryReader<>( new RecordStorageCommandReaderFactory(), InvalidLogEntryHandler.STRICT ); final LogTailScanner logTailScanner = new LogTailScanner( logFiles, logEntryReader, new Monitors() ); LogTailInformation tailInformation = logTailScanner.getTailInformation(); + + if ( !shouldExist ) + { + assertNull( tailInformation.lastCheckPoint ); + return; + } + assertNotNull( tailInformation.lastCheckPoint ); + assertEquals( 0, tailInformation.lastCheckPoint.getLogPosition().getLogVersion() ); + assertEquals( LOG_HEADER_SIZE, tailInformation.lastCheckPoint.getLogPosition().getByteOffset() ); assertTrue( tailInformation.commitsAfterLastCheckpoint() ); } @@ -258,6 +271,7 @@ private org.neo4j.storageengine.api.StoreId simulateStoreCopy() throws IOExcepti // we don't have log files after a store copy LogFiles logFiles = LogFilesBuilder.logFilesBasedOnlyBuilder( databaseLayout.databaseDirectory(), fsRule.get() ).build(); + //noinspection ResultOfMethodCallIgnored logFiles.accept( ( file, version ) -> file.delete() ); return storeId; @@ -268,10 +282,10 @@ private StoreId toCasualStoreId( org.neo4j.storageengine.api.StoreId storeId ) return new StoreId( storeId.getCreationTime(), storeId.getRandomId(), storeId.getUpgradeTime(), storeId.getUpgradeId() ); } - private static CommittedTransactionRepresentation tx( int id ) + private static CommittedTransactionRepresentation tx( long txId ) { return new CommittedTransactionRepresentation( - new LogEntryStart( id, id, id, id - 1, new byte[]{}, LogPosition.UNSPECIFIED ), - Commands.transactionRepresentation( createNode( 0 ) ), new LogEntryCommit( id, id ) ); + new LogEntryStart( 0, 0, 0, txId - 1, new byte[]{}, LogPosition.UNSPECIFIED ), + Commands.transactionRepresentation( createNode( 0 ) ), new LogEntryCommit( txId, 0 ) ); } }