Skip to content

Commit

Permalink
Skip transaction log file without headers
Browse files Browse the repository at this point in the history
During concurrent access to transaction log files it's possible that
we will observe file that was already created but still do not have
any header information since it was not yet written.
PR changes behaviour of log file traversal to skip files without header
and check previous available file for desired transactions
instead of throwing exception about missing header.
  • Loading branch information
MishaDemianenko committed Feb 24, 2017
1 parent fddd16e commit 1202953
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 30 deletions.
Expand Up @@ -342,20 +342,26 @@ public void accept( LogHeaderVisitor visitor ) throws IOException
Long previousLogLastTxId = logHeaderCache.getLogHeader( logVersion );
if ( previousLogLastTxId == null )
{
LogHeader header = readLogHeader( fileSystem, logFiles.getLogFileForVersion( logVersion ) );
assert logVersion == header.logVersion;
logHeaderCache.putHeader( header.logVersion, header.lastCommittedTxId );
previousLogLastTxId = header.lastCommittedTxId;
LogHeader header = readLogHeader( fileSystem, logFiles.getLogFileForVersion( logVersion ), false );
if ( header != null )
{
assert logVersion == header.logVersion;
logHeaderCache.putHeader( header.logVersion, header.lastCommittedTxId );
previousLogLastTxId = header.lastCommittedTxId;
}
}

long lowTransactionId = previousLogLastTxId + 1;
LogPosition position = LogPosition.start( logVersion );
if ( !visitor.visit( position, lowTransactionId, highTransactionId ) )
if ( previousLogLastTxId != null )
{
break;
long lowTransactionId = previousLogLastTxId + 1;
LogPosition position = LogPosition.start( logVersion );
if ( !visitor.visit( position, lowTransactionId, highTransactionId ) )
{
break;
}
highTransactionId = previousLogLastTxId;
}
logVersion--;
highTransactionId = previousLogLastTxId;
}
}

Expand Down
Expand Up @@ -62,6 +62,33 @@ public class PhysicalLogFileTest
private final TransactionIdStore transactionIdStore =
new DeadSimpleTransactionIdStore( 5L, 0, BASE_TX_COMMIT_TIMESTAMP, 0, 0 );

@Test
public void skipLogFileWithoutHeader() throws IOException
{
LifeSupport life = new LifeSupport();
FileSystemAbstraction fs = fileSystemRule.get();
PhysicalLogFiles physicalLogFiles = new PhysicalLogFiles( directory.directory(), "logs", fs );
PhysicalLogFile physicalLogFile = new PhysicalLogFile( fs, physicalLogFiles, 1000,
() -> 1L, logVersionRepository,
mock( Monitor.class ), new LogHeaderCache( 10 ) );
life.add( physicalLogFile );
life.start();

// simulate new file without header presence
PhysicalLogFile logFileToSearchFrom = new PhysicalLogFile( fs, physicalLogFiles, 1000,
() -> 10L, logVersionRepository, mock( Monitor.class ),
new LogHeaderCache( 10 ) );
logVersionRepository.incrementAndGetVersion();
fs.create( physicalLogFiles.getLogFileForVersion( logVersionRepository.getCurrentLogVersion() ) ).close();

PhysicalLogicalTransactionStore.LogVersionLocator versionLocator =
new PhysicalLogicalTransactionStore.LogVersionLocator( 4L );
logFileToSearchFrom.accept( versionLocator );

LogPosition logPosition = versionLocator.getLogPosition();
assertEquals( 1, logPosition.getLogVersion() );
}

@Test
public void shouldOpenInFreshDirectoryAndFinallyAddHeader() throws Exception
{
Expand Down
Expand Up @@ -76,6 +76,46 @@ public void setup()
testDir = dir.graphDbDir();
}

@Test
public void extractTransactionFromLogFilesSkippingLastLogFileWithoutHeader() throws IOException
{
TransactionIdStore transactionIdStore = new DeadSimpleTransactionIdStore();
TransactionMetadataCache positionCache = new TransactionMetadataCache( 100 );
LogHeaderCache logHeaderCache = new LogHeaderCache( 10 );
final byte[] additionalHeader = new byte[]{1, 2, 5};
final int masterId = 2, authorId = 1;
final long timeStarted = 12345, latestCommittedTxWhenStarted = 4545, timeCommitted = timeStarted + 10;
LifeSupport life = new LifeSupport();
final PhysicalLogFiles logFiles = new PhysicalLogFiles( testDir, DEFAULT_NAME, fileSystemRule.get() );
Monitor monitor = new Monitors().newMonitor( PhysicalLogFile.Monitor.class );
LogFile logFile = life.add( new PhysicalLogFile( fileSystemRule.get(), logFiles, 1000,
transactionIdStore::getLastCommittedTransactionId, mock( LogVersionRepository.class ), monitor,
logHeaderCache ) );

life.start();
try
{
addATransactionAndRewind(life, logFile, positionCache, transactionIdStore,
additionalHeader, masterId, authorId, timeStarted, latestCommittedTxWhenStarted, timeCommitted );
}
finally
{
life.shutdown();
}

PhysicalLogFile emptyLogFile = new PhysicalLogFile( fileSystemRule.get(), logFiles, 1000,
transactionIdStore::getLastCommittedTransactionId, mock( LogVersionRepository.class ), monitor,
logHeaderCache );
// create empty transaction log file and clear transaction cache to force re-read
fileSystemRule.get().create( logFiles.getLogFileForVersion( logFiles.getHighestLogVersion() + 1 ) ).close();
positionCache.clear();

final LogicalTransactionStore store =
new PhysicalLogicalTransactionStore( emptyLogFile, positionCache, new VersionAwareLogEntryReader<>() );
verifyTransaction( transactionIdStore, positionCache, additionalHeader, masterId, authorId, timeStarted,
latestCommittedTxWhenStarted, timeCommitted, store );
}

@Test
public void shouldOpenCleanStore() throws Exception
{
Expand Down Expand Up @@ -233,27 +273,8 @@ public void shouldExtractMetadataFromExistingTransaction() throws Exception
life.start();
try
{
TransactionMetadata expectedMetadata;
try ( TransactionCursor cursor = store.getTransactions( TransactionIdStore.BASE_TX_ID + 1 ) )
{
boolean hasNext = cursor.next();
assertTrue( hasNext );
CommittedTransactionRepresentation tx = cursor.get();
TransactionRepresentation transaction = tx.getTransactionRepresentation();
assertArrayEquals( additionalHeader, transaction.additionalHeader() );
assertEquals( masterId, transaction.getMasterId() );
assertEquals( authorId, transaction.getAuthorId() );
assertEquals( timeStarted, transaction.getTimeStarted() );
assertEquals( timeCommitted, transaction.getTimeCommitted() );
assertEquals( latestCommittedTxWhenStarted, transaction.getLatestCommittedTxWhenStarted() );
expectedMetadata = new TransactionMetadata( masterId, authorId,
tx.getStartEntry().getStartPosition(), tx.getStartEntry().checksum(), timeCommitted );
}

positionCache.clear();

TransactionMetadata actualMetadata = store.getMetadataFor( txIdStore.getLastCommittedTransactionId() );
assertEquals( expectedMetadata, actualMetadata );
verifyTransaction( txIdStore, positionCache, additionalHeader, masterId, authorId, timeStarted,
latestCommittedTxWhenStarted, timeCommitted, store );
}
finally
{
Expand Down Expand Up @@ -356,6 +377,33 @@ private Collection<StorageCommand> singleCreateNodeCommand()
return commands;
}

private void verifyTransaction( TransactionIdStore transactionIdStore, TransactionMetadataCache positionCache,
byte[] additionalHeader, int masterId, int authorId, long timeStarted, long latestCommittedTxWhenStarted,
long timeCommitted, LogicalTransactionStore store ) throws IOException
{
TransactionMetadata expectedMetadata;
try ( TransactionCursor cursor = store.getTransactions( TransactionIdStore.BASE_TX_ID + 1 ) )
{
boolean hasNext = cursor.next();
assertTrue( hasNext );
CommittedTransactionRepresentation tx = cursor.get();
TransactionRepresentation transaction = tx.getTransactionRepresentation();
assertArrayEquals( additionalHeader, transaction.additionalHeader() );
assertEquals( masterId, transaction.getMasterId() );
assertEquals( authorId, transaction.getAuthorId() );
assertEquals( timeStarted, transaction.getTimeStarted() );
assertEquals( timeCommitted, transaction.getTimeCommitted() );
assertEquals( latestCommittedTxWhenStarted, transaction.getLatestCommittedTxWhenStarted() );
expectedMetadata = new TransactionMetadata( masterId, authorId,
tx.getStartEntry().getStartPosition(), tx.getStartEntry().checksum(), timeCommitted );
}

positionCache.clear();

TransactionMetadata actualMetadata = store.getMetadataFor( transactionIdStore.getLastCommittedTransactionId() );
assertEquals( expectedMetadata, actualMetadata );
}

private static class FakeRecoveryVisitor implements Visitor<CommittedTransactionRepresentation,Exception>
{
private final byte[] additionalHeader;
Expand Down

0 comments on commit 1202953

Please sign in to comment.