Skip to content

Commit

Permalink
Rotate on truncate for physical RAFT log.
Browse files Browse the repository at this point in the history
The log will now rotate when truncating and write a continuation
record at the beginning of the log to aid in skipping truncated
records.
  • Loading branch information
martinfurmanski committed Mar 1, 2016
1 parent f8e444a commit 1022d93
Show file tree
Hide file tree
Showing 16 changed files with 344 additions and 502 deletions.
Expand Up @@ -20,97 +20,104 @@
package org.neo4j.coreedge.raft.log;

import java.io.IOException;
import java.util.Stack;

import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.transaction.log.LogFile;
import org.neo4j.kernel.impl.transaction.log.LogHeaderVisitor;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel;
import org.neo4j.logging.LogProvider;

public class PhysicalRaftEntryStore implements RaftEntryStore
class PhysicalRaftEntryStore implements RaftEntryStore
{
private final LogFile logFile;
private final RaftLogMetadataCache metadataCache;
private final ChannelMarshal<ReplicatedContent> marshal;

public PhysicalRaftEntryStore( LogFile logFile, RaftLogMetadataCache metadataCache,
ChannelMarshal<ReplicatedContent> marshal )
PhysicalRaftEntryStore( LogFile logFile, RaftLogMetadataCache metadataCache,
ChannelMarshal<ReplicatedContent> marshal )
{
this.logFile = logFile;
this.metadataCache = metadataCache;
this.marshal = marshal;
}

@Override
public IOCursor<RaftLogAppendRecord> getEntriesFrom( final long indexToStartFrom ) throws IOException
public IOCursor<RaftLogAppendRecord> getEntriesFrom( long fromIndex ) throws IOException
{
// look up in position cache
RaftLogMetadataCache.RaftLogEntryMetadata metadata = metadataCache.getMetadata( indexToStartFrom );
LogPosition startPosition;
boolean positionedCorrectly = false;
if ( metadata != null )
{
startPosition = metadata.getStartPosition();
positionedCorrectly = true;
}
else
{
// ask LogFile about the version it may be in
LogVersionLocator headerVisitor = new LogVersionLocator( indexToStartFrom );
logFile.accept( headerVisitor );
startPosition = headerVisitor.foundPosition;
if ( headerVisitor.firstLogIndexForFoundFile == indexToStartFrom )
{
/*
* we need to know if the first entry (the one the cursor will return next) is the one we are looking
* for, because if it isn't, then we need to skip forward until we find it
*/
positionedCorrectly = true;
}
}
// generate skip stack and get starting position
Stack<Long> skipStack = new Stack<>();
SkipStackGenerator skipStackGenerator = new SkipStackGenerator( fromIndex, skipStack );
logFile.accept( skipStackGenerator );

// the skip stack generator scans through the headers and gives us the logs starting position as a side-effect
LogPosition startPosition = skipStackGenerator.logStartPosition;

if ( startPosition == null )
{
return IOCursor.getEmpty();
}
ReadableLogChannel reader = logFile.getReader( startPosition );

PhysicalRaftLogEntryCursor physicalRaftLogEntryCursor = new PhysicalRaftLogEntryCursor( new
RaftRecordCursor<>( reader, marshal ) );

if ( !positionedCorrectly )
RaftLogMetadataCache.RaftLogEntryMetadata logEntryInfo = metadataCache.getMetadata( fromIndex );
if( logEntryInfo != null && logEntryInfo.getStartPosition().getLogVersion() == startPosition.getLogVersion() )
{
/*
* At this point we know the first entry is not the entry we look for. Iterate until we find it.
*/
while( physicalRaftLogEntryCursor.next() && physicalRaftLogEntryCursor.get().getLogIndex() < indexToStartFrom - 1 );
// then metadata is valid for this log version, read from there
startPosition = logEntryInfo.getStartPosition();
}
return physicalRaftLogEntryCursor;

return new PhysicalRaftLogEntryCursor( new RaftRecordCursor<>( logFile.getReader( startPosition ), marshal ),
skipStack, fromIndex );
}

private static final class LogVersionLocator implements LogHeaderVisitor
private static final class SkipStackGenerator implements LogHeaderVisitor
{
private final long logEntryIndex;
private LogPosition foundPosition;
private long firstLogIndexForFoundFile;
private final Stack<Long> skipStack;
private LogPosition logStartPosition;
private long nextContinuation = -1;

LogVersionLocator( long logEntryIndex )
private SkipStackGenerator( long logEntryIndex, Stack<Long> skipStack )
{
this.logEntryIndex = logEntryIndex;
this.skipStack = skipStack;
}

/**
* Visits all log files backwards in order and creates a stack defining where a record traversal
* should skip forward to the next continuation record. */
@Override
public boolean visit( LogPosition position, long firstLogIndex, long lastLogIndex )
public boolean visit( LogPosition position, long firstLogIndex, long ignored )
{
boolean foundIt = logEntryIndex >= firstLogIndex && logEntryIndex <= lastLogIndex;
if ( foundIt )
if( nextContinuation != -1 )
{
this.firstLogIndexForFoundFile = firstLogIndex;
foundPosition = position;
if( !skipStack.empty() && skipStack.peek() < nextContinuation )
{
// This happens if you truncate again past a previous truncation. For example
// truncating to 7 first and later truncating to 3. Thus the older truncation becomes
// irrelevant and must be ignored.
//
// So one must do a double-skip to reach the start of the latest 3 and this is
// implemented by setting the older skip point to the value of the newer one. Thus
// instead of skipping to 7 and then to 3, we will skip to 3 and again to 3.
//
// The skip points mark the indexes where we should start skipping from until the next
// continuation record. Thus if we skip starting from 3, reach the continuation record,
// and yet again find that we should skip to 3 (by popping the skip stack) then we will
// simply keep skipping.
nextContinuation = skipStack.peek();
}
skipStack.push( nextContinuation );
}
return !foundIt; // continue as long we don't find it

if ( logEntryIndex >= firstLogIndex )
{
logStartPosition = position;
return false;
}

nextContinuation = firstLogIndex;
return true;
}
}
}

0 comments on commit 1022d93

Please sign in to comment.