Skip to content

Commit

Permalink
Implemented truncation of last log after recovery
Browse files Browse the repository at this point in the history
which was sitting as a TODO after previous change to recovery code
  • Loading branch information
tinwelint authored and MishaDemianenko committed Sep 15, 2016
1 parent 911a549 commit 60f10bc
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ private void buildRecovery(
final LatestCheckPointFinder checkPointFinder =
new LatestCheckPointFinder( logFiles, fileSystemAbstraction, logEntryReader );
Recovery.SPI spi = new DefaultRecoverySPI(
storageEngine, logVersionRepository,
storageEngine, logFiles, fileSystemAbstraction, logVersionRepository,
checkPointFinder, transactionIdStore, logicalTransactionStore );
Recovery recovery = new Recovery( spi, recoveryMonitor );
monitors.addMonitorListener( new Recovery.Monitor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.kernel.impl.api.TransactionQueue;
import org.neo4j.kernel.impl.api.TransactionToApply;
Expand All @@ -30,6 +31,7 @@
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogFiles;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
Expand All @@ -42,6 +44,8 @@ public class DefaultRecoverySPI implements Recovery.SPI
{
private final LogVersionRepository logVersionRepository;
private final PositionToRecoverFrom positionToRecoverFrom;
private final PhysicalLogFiles logFiles;
private final FileSystemAbstraction fs;
private final StorageEngine storageEngine;
private final TransactionIdStore transactionIdStore;
private final LogicalTransactionStore logicalTransactionStore;
Expand All @@ -50,10 +54,13 @@ public class DefaultRecoverySPI implements Recovery.SPI

public DefaultRecoverySPI(
StorageEngine storageEngine,
PhysicalLogFiles logFiles, FileSystemAbstraction fs,
LogVersionRepository logVersionRepository, LatestCheckPointFinder checkPointFinder,
TransactionIdStore transactionIdStore, LogicalTransactionStore logicalTransactionStore )
{
this.storageEngine = storageEngine;
this.logFiles = logFiles;
this.fs = fs;
this.logVersionRepository = logVersionRepository;
this.transactionIdStore = transactionIdStore;
this.logicalTransactionStore = logicalTransactionStore;
Expand Down Expand Up @@ -106,7 +113,9 @@ public void allTransactionsRecovered( CommittedTransactionRepresentation lastRec
lastRecoveredTransaction.getCommitEntry().getTimeWritten(),
positionAfterLastRecoveredTransaction.getByteOffset(),
positionAfterLastRecoveredTransaction.getLogVersion() );
// TODO: Also truncate last log after last known position

fs.truncate( logFiles.getLogFileForVersion( positionAfterLastRecoveredTransaction.getLogVersion() ),
positionAfterLastRecoveredTransaction.getByteOffset() );
}

static class RecoveryVisitor implements Visitor<CommittedTransactionRepresentation,Exception>
Expand Down
146 changes: 141 additions & 5 deletions community/kernel/src/test/java/org/neo4j/kernel/RecoveryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class RecoveryTest
private final LogVersionRepository logVersionRepository = new DeadSimpleLogVersionRepository( 1L );
private final TransactionIdStore transactionIdStore = new DeadSimpleTransactionIdStore( 5L, 0,
BASE_TX_COMMIT_TIMESTAMP, 0, 0 );
private final int logVersion = 1;
private final int logVersion = 0;

private LogEntry lastCommittedTxStartEntry;
private LogEntry lastCommittedTxCommitEntry;
Expand Down Expand Up @@ -150,7 +150,7 @@ public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) th
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );

life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logVersionRepository, finder, transactionIdStore, txStore )
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore )
{
private int nr = 0;

Expand Down Expand Up @@ -240,7 +240,7 @@ public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) th
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );

life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logVersionRepository, finder, transactionIdStore, txStore )
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore )
{
@Override
public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
Expand All @@ -260,9 +260,145 @@ public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
}
}

// TODO: Test about truncate file
@Test
public void shouldTruncateLogAfterLastCompleteTransactionAfterSuccessfullRecovery() throws Exception
{
// GIVEN
final PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), "log", fs );
File file = logFiles.getLogFileForVersion( logVersion );
final LogPositionMarker marker = new LogPositionMarker();

writeSomeData( file, new Visitor<Pair<LogEntryWriter, Consumer<LogPositionMarker>>,IOException>()
{
@Override
public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) throws IOException
{
LogEntryWriter writer = pair.first();
Consumer<LogPositionMarker> consumer = pair.other();

// last committed tx
writer.writeStartEntry( 0, 1, 2l, 3l, new byte[0] );
writer.writeCommitEntry( 4l, 5l );

// incomplete tx
consumer.accept( marker ); // <-- marker has the last good position
writer.writeStartEntry( 0, 1, 5l, 4l, new byte[0] );

return true;
}
} );

LifeSupport life = new LifeSupport();
Recovery.Monitor monitor = mock( Recovery.Monitor.class );
final AtomicBoolean recoveryRequired = new AtomicBoolean();
try
{
StorageEngine storageEngine = mock( StorageEngine.class );
final LogEntryReader<ReadableClosablePositionAwareChannel> reader = new VersionAwareLogEntryReader<>();
LatestCheckPointFinder finder = new LatestCheckPointFinder( logFiles, fs, reader );

TransactionMetadataCache metadataCache = new TransactionMetadataCache( 100 );
LogHeaderCache logHeaderCache = new LogHeaderCache( 10 );
LogFile logFile = life.add( new PhysicalLogFile( fs, logFiles, 50,
() -> transactionIdStore.getLastCommittedTransactionId(), logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );

life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore )
{
@Override
public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{
recoveryRequired.set( true );
return super.startRecovery();
}
}, monitor ) );

life.start();
}
finally
{
life.shutdown();
}

assertTrue( recoveryRequired.get() );
assertEquals( marker.getByteOffset(), file.length() );
}

// TODO: Test about calling TransactionIdStore method
@Test
public void shouldTellTransactionIdStoreAfterSuccessfullRecovery() throws Exception
{
// GIVEN
final PhysicalLogFiles logFiles = new PhysicalLogFiles( directory.directory(), "log", fs );
File file = logFiles.getLogFileForVersion( logVersion );
final LogPositionMarker marker = new LogPositionMarker();

final byte[] additionalHeaderData = new byte[0];
final int masterId = 0;
final int authorId = 1;
final long transactionId = 4;
final long commitTimestamp = 5;
writeSomeData( file, new Visitor<Pair<LogEntryWriter, Consumer<LogPositionMarker>>,IOException>()
{
@Override
public boolean visit( Pair<LogEntryWriter,Consumer<LogPositionMarker>> pair ) throws IOException
{
LogEntryWriter writer = pair.first();
Consumer<LogPositionMarker> consumer = pair.other();

// last committed tx
writer.writeStartEntry( masterId, authorId, 2l, 3l, additionalHeaderData );
writer.writeCommitEntry( transactionId, commitTimestamp );
consumer.accept( marker );

return true;
}
} );

LifeSupport life = new LifeSupport();
Recovery.Monitor monitor = mock( Recovery.Monitor.class );
final AtomicBoolean recoveryRequired = new AtomicBoolean();
try
{
StorageEngine storageEngine = mock( StorageEngine.class );
final LogEntryReader<ReadableClosablePositionAwareChannel> reader = new VersionAwareLogEntryReader<>();
LatestCheckPointFinder finder = new LatestCheckPointFinder( logFiles, fs, reader );

TransactionMetadataCache metadataCache = new TransactionMetadataCache( 100 );
LogHeaderCache logHeaderCache = new LogHeaderCache( 10 );
LogFile logFile = life.add( new PhysicalLogFile( fs, logFiles, 50,
() -> transactionIdStore.getLastCommittedTransactionId(), logVersionRepository,
mock( PhysicalLogFile.Monitor.class ), logHeaderCache ) );
LogicalTransactionStore txStore = new PhysicalLogicalTransactionStore( logFile, metadataCache, reader );

life.add( new Recovery( new DefaultRecoverySPI( storageEngine,
logFiles, fs, logVersionRepository, finder, transactionIdStore, txStore )
{
@Override
public Visitor<CommittedTransactionRepresentation,Exception> startRecovery()
{
recoveryRequired.set( true );
return super.startRecovery();
}
}, monitor ) );

life.start();
}
finally
{
life.shutdown();
}

assertTrue( recoveryRequired.get() );
long[] lastClosedTransaction = transactionIdStore.getLastClosedTransaction();
assertEquals( transactionId, lastClosedTransaction[0] );
assertEquals( LogEntryStart.checksum( additionalHeaderData, masterId, authorId ),
transactionIdStore.getLastCommittedTransaction().checksum() );
assertEquals( commitTimestamp, transactionIdStore.getLastCommittedTransaction().commitTimestamp() );
assertEquals( logVersion, lastClosedTransaction[1] );
assertEquals( marker.getByteOffset(), lastClosedTransaction[2] );
}

private void writeSomeData( File file, Visitor<Pair<LogEntryWriter,Consumer<LogPositionMarker>>,IOException> visitor ) throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void setLastCommittedAndClosedTransactionId( long transactionId, long che
{
committingTransactionId.set( transactionId );
committedTransactionId.set( new TransactionId( transactionId, checksum, commitTimestamp ) );
closedTransactionId.set( transactionId, new long[]{checksum, logVersion, byteOffset} );
closedTransactionId.set( transactionId, new long[]{logVersion, byteOffset} );
}

@Override
Expand Down

0 comments on commit 60f10bc

Please sign in to comment.