Skip to content

Commit

Permalink
Checkpoint in TransactionLogCatchupWriter should cover all log files
Browse files Browse the repository at this point in the history
After rotation of log files was introduced, the checkpoint would just cover the last log file.

This part of the code is somewhat horrible and it is hard to test properly. A minimalistic
fix has been performed, but this entire area really needs a complete refactoring.
  • Loading branch information
martinfurmanski committed Sep 24, 2018
1 parent 9cfcccf commit 9c0d8c3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
Expand Up @@ -49,6 +49,7 @@

import static java.lang.String.format;
import static org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier.EMPTY;
import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET;
import static org.neo4j.kernel.impl.store.MetaDataStore.Position.LAST_TRANSACTION_ID;
import static org.neo4j.kernel.impl.store.StoreType.META_DATA;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;
Expand Down Expand Up @@ -160,10 +161,13 @@ public synchronized void close() throws IOException
{
if ( asPartOfStoreCopy )
{
/* A checkpoint which points to the beginning of the log file, meaning that
/* A checkpoint which points to the beginning of all the log files, meaning that
all the streamed transactions will be applied as part of recovery. */
long logVersion = logFiles.getHighestLogVersion();
writer.checkPoint( new LogPosition( logVersion, LOG_HEADER_SIZE ) );
long logVersion = logFiles.getLowestLogVersion();
LogPosition checkPointPosition = new LogPosition( logVersion, LOG_HEADER_SIZE );

log.info( "Writing checkpoint as part of store copy: " + checkPointPosition );
writer.checkPoint( checkPointPosition );

// * comment copied from old StoreCopyClient *
// since we just create new log and put checkpoint into it with offset equals to
Expand All @@ -177,8 +181,8 @@ public synchronized void close() throws IOException
MetaDataStore.setRecord(
pageCache,
neoStore,
MetaDataStore.Position.LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET,
LOG_HEADER_SIZE );
LAST_CLOSED_TRANSACTION_LOG_BYTE_OFFSET,
checkPointPosition.getByteOffset() );
}

lifespan.close();
Expand Down
Expand Up @@ -31,11 +31,10 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.layout.DatabaseLayout;
import org.neo4j.io.pagecache.PageCache;
Expand Down Expand Up @@ -73,8 +72,11 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.neo4j.kernel.impl.transaction.command.Commands.createNode;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_ID;
import static org.neo4j.kernel.impl.transaction.log.entry.LogHeader.LOG_HEADER_SIZE;

@RunWith( Parameterized.class )
public class TransactionLogCatchUpWriterTest
Expand All @@ -91,16 +93,18 @@ public class TransactionLogCatchUpWriterTest
@Parameterized.Parameter
public boolean partOfStoreCopy;

private PageCache pageCache;
private FileSystemAbstraction fs;
private DatabaseLayout databaseLayout;

@Parameterized.Parameters
@Parameterized.Parameters( name = "Part of store copy: {0}" )
public static List<Boolean> partOfStoreCopy()
{
return Arrays.asList( Boolean.TRUE, Boolean.FALSE );
}

private final int MANY_TRANSACTIONS = 100_000; // should be somewhere above the rotation threshold

private PageCache pageCache;
private FileSystemAbstraction fs;
private DatabaseLayout databaseLayout;

@Before
public void setup()
{
Expand All @@ -127,46 +131,49 @@ public void pullRotatesWhenThresholdCrossedAndExplicitlySet() throws IOException
{
// given
Config config = Config.defaults();
config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1m" );
config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1M" ); // 1 mebibyte

// and
org.neo4j.storageengine.api.StoreId storeId = simulateStoreCopy();

// and
long fromTx = 0;
long fromTxId = BASE_TX_ID;
TransactionLogCatchUpWriter subject =
new TransactionLogCatchUpWriter( databaseLayout, fs, pageCache, config, NullLogProvider.getInstance(), fromTx, partOfStoreCopy, false, true );
new TransactionLogCatchUpWriter( databaseLayout, fs, pageCache, config, NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, false, true );

// when 1M tx received
IntStream.range( 0, (int) ByteUnit.mebiBytes( 1 ) )
// when a bunch of transactions received
LongStream.range( fromTxId, MANY_TRANSACTIONS )
.mapToObj( TransactionLogCatchUpWriterTest::tx )
.map(tx -> new TxPullResponse( toCasualStoreId( storeId ), tx ))
.map( tx -> new TxPullResponse( toCasualStoreId( storeId ), tx ) )
.forEach( subject::onTxReceived );
subject.close();

// then there was a rotation
LogFilesBuilder logFilesBuilder = LogFilesBuilder.activeFilesBuilder( databaseLayout, fs, pageCache );
LogFiles logFiles = logFilesBuilder.build();
assertNotEquals( logFiles.getLowestLogVersion(), logFiles.getHighestLogVersion() );
verifyTransactionsInLog( logFiles, fromTxId, MANY_TRANSACTIONS );
verifyCheckpointInLog( logFiles, partOfStoreCopy );
}

@Test
public void pullDoesntRotateWhenThresholdCrossedAndExplicitlyOff() throws IOException
public void pullDoesNotRotateWhenThresholdCrossedAndExplicitlyOff() throws IOException
{
// given
Config config = Config.defaults();
config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1m" );
config.augment( GraphDatabaseSettings.logical_log_rotation_threshold, "1M" ); // 1 mebibyte

// and
org.neo4j.storageengine.api.StoreId storeId = simulateStoreCopy();

// and
long fromTx = 0;
long fromTxId = BASE_TX_ID;
TransactionLogCatchUpWriter subject =
new TransactionLogCatchUpWriter( databaseLayout, fs, pageCache, config, NullLogProvider.getInstance(), fromTx, partOfStoreCopy, false, false );
new TransactionLogCatchUpWriter( databaseLayout, fs, pageCache, config, NullLogProvider.getInstance(), fromTxId, partOfStoreCopy, false,
false );

// when 1M tx received
IntStream.range( 0, (int) ByteUnit.mebiBytes( 1 ) )
LongStream.range( fromTxId, MANY_TRANSACTIONS )
.mapToObj( TransactionLogCatchUpWriterTest::tx )
.map(tx -> new TxPullResponse( toCasualStoreId( storeId ), tx ))
.forEach( subject::onTxReceived );
Expand Down Expand Up @@ -205,20 +212,26 @@ private void createTransactionLogWithCheckpoint( Config config, boolean logsInSt
LogFiles logFiles = logFilesBuilder.build();

verifyTransactionsInLog( logFiles, fromTxId, endTxId );
if ( partOfStoreCopy )
{
verifyCheckpointInLog( logFiles );
}
verifyCheckpointInLog( logFiles, partOfStoreCopy );
}

private void verifyCheckpointInLog( LogFiles logFiles )
private void verifyCheckpointInLog( LogFiles logFiles, boolean shouldExist )
{
LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader = new VersionAwareLogEntryReader<>(
new RecordStorageCommandReaderFactory(), InvalidLogEntryHandler.STRICT );
final LogTailScanner logTailScanner = new LogTailScanner( logFiles, logEntryReader, new Monitors() );

LogTailInformation tailInformation = logTailScanner.getTailInformation();

if ( !shouldExist )
{
assertNull( tailInformation.lastCheckPoint );
return;
}

assertNotNull( tailInformation.lastCheckPoint );
assertEquals( 0, tailInformation.lastCheckPoint.getLogPosition().getLogVersion() );
assertEquals( LOG_HEADER_SIZE, tailInformation.lastCheckPoint.getLogPosition().getByteOffset() );
assertTrue( tailInformation.commitsAfterLastCheckpoint() );
}

Expand Down Expand Up @@ -258,6 +271,7 @@ private org.neo4j.storageengine.api.StoreId simulateStoreCopy() throws IOExcepti

// we don't have log files after a store copy
LogFiles logFiles = LogFilesBuilder.logFilesBasedOnlyBuilder( databaseLayout.databaseDirectory(), fsRule.get() ).build();
//noinspection ResultOfMethodCallIgnored
logFiles.accept( ( file, version ) -> file.delete() );

return storeId;
Expand All @@ -268,10 +282,10 @@ private StoreId toCasualStoreId( org.neo4j.storageengine.api.StoreId storeId )
return new StoreId( storeId.getCreationTime(), storeId.getRandomId(), storeId.getUpgradeTime(), storeId.getUpgradeId() );
}

private static CommittedTransactionRepresentation tx( int id )
private static CommittedTransactionRepresentation tx( long txId )
{
return new CommittedTransactionRepresentation(
new LogEntryStart( id, id, id, id - 1, new byte[]{}, LogPosition.UNSPECIFIED ),
Commands.transactionRepresentation( createNode( 0 ) ), new LogEntryCommit( id, id ) );
new LogEntryStart( 0, 0, 0, txId - 1, new byte[]{}, LogPosition.UNSPECIFIED ),
Commands.transactionRepresentation( createNode( 0 ) ), new LogEntryCommit( txId, 0 ) );
}
}

0 comments on commit 9c0d8c3

Please sign in to comment.