Skip to content

Commit

Permalink
core-edge: integrate new raft log terms keeper
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski authored and jimwebber committed Jul 14, 2016
1 parent 4187e20 commit b8606cf
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 80 deletions.
Expand Up @@ -53,7 +53,6 @@ class RecoveryProtocol
private final ChannelMarshal<ReplicatedContent> contentMarshal; private final ChannelMarshal<ReplicatedContent> contentMarshal;
private final LogProvider logProvider; private final LogProvider logProvider;
private final Log log; private final Log log;
private long expectedVersion;
private ReaderPool readerPool; private ReaderPool readerPool;


RecoveryProtocol( FileSystemAbstraction fileSystem, FileNames fileNames, ReaderPool readerPool, RecoveryProtocol( FileSystemAbstraction fileSystem, FileNames fileNames, ReaderPool readerPool,
Expand All @@ -76,75 +75,86 @@ State run() throws IOException, DamagedLogStorageException, DisposedException
{ {
state.segments = new Segments( fileSystem, fileNames, readerPool, emptyList(), contentMarshal, logProvider, -1 ); state.segments = new Segments( fileSystem, fileNames, readerPool, emptyList(), contentMarshal, logProvider, -1 );
state.segments.rotate( -1, -1, -1 ); state.segments.rotate( -1, -1, -1 );
state.terms = new Terms( -1, -1 );
return state; return state;
} }


List<SegmentFile> segmentFiles = new ArrayList<>(); List<SegmentFile> segmentFiles = new ArrayList<>();
SegmentFile segment = null;

long firstVersion = files.firstKey(); long firstVersion = files.firstKey();
expectedVersion = firstVersion; long expectedVersion = firstVersion;
boolean mustRecoverLastHeader = false;


for ( Map.Entry<Long,File> entry : files.entrySet() ) for ( Map.Entry<Long,File> entry : files.entrySet() )
{ {
long fileNameVersion = entry.getKey();
File file = entry.getValue();
SegmentHeader header;

checkVersionSequence( fileNameVersion, expectedVersion );

try try
{ {
long fileNameVersion = entry.getKey(); header = loadHeader( fileSystem, file );
File file = entry.getValue(); checkVersionMatches( header.version(), fileNameVersion );

}
SegmentHeader header; catch ( EndOfStreamException e )
try {
if ( files.lastKey() != fileNameVersion )
{ {
header = loadHeader( fileSystem, file ); throw new DamagedLogStorageException( e, "Intermediate file with incomplete or no header found: %s", file );
} }
catch ( EndOfStreamException e ) else if ( files.size() == 1 )
{ {
if ( files.lastKey() != fileNameVersion ) throw new DamagedLogStorageException( e, "Single file with incomplete or no header found: %s", file );
{
throw new DamagedLogStorageException( e, "File with incomplete or no header found: %s", file );
}

header = new SegmentHeader( state.appendIndex, fileNameVersion, state.appendIndex, state.currentTerm );
writeHeader( fileSystem, file, header );
} }


SegmentFile segment = new SegmentFile( fileSystem, file, readerPool, fileNameVersion, contentMarshal, logProvider, header ); /* Last file header must be recovered by scanning next-to-last file and writing a new header based on that. */

mustRecoverLastHeader = true;
checkVersionStrictlyMonotonic( fileNameVersion ); break;
checkVersionMatches( segment.header().version(), fileNameVersion ); }


segmentFiles.add( segment ); segment = new SegmentFile( fileSystem, file, readerPool, fileNameVersion, contentMarshal, logProvider, header );
segmentFiles.add( segment );


if ( fileNameVersion == firstVersion ) if ( fileNameVersion == firstVersion )
{
state.prevIndex = segment.header().prevIndex();
state.prevTerm = segment.header().prevTerm();
}

expectedVersion++;
// check term
}
catch ( IOException e )
{ {
log.error( "Error during recovery", e ); state.prevIndex = segment.header().prevIndex();
state.prevTerm = segment.header().prevTerm();
} }

expectedVersion++;
} }


SegmentFile last = segmentFiles.get( segmentFiles.size() - 1 ); assert segment != null;


state.segments = new Segments( fileSystem, fileNames, readerPool, segmentFiles, contentMarshal, logProvider, files.lastKey() ); state.segments = new Segments( fileSystem, fileNames, readerPool, segmentFiles, contentMarshal, logProvider, segment.header().version() );
state.appendIndex = last.header().prevIndex(); state.appendIndex = segment.header().prevIndex();
state.currentTerm = last.header().prevTerm(); state.terms = new Terms( segment.header().prevIndex(), segment.header().prevTerm() );


long firstIndexInLastSegmentFile = last.header().prevIndex() + 1; try ( IOCursor<EntryRecord> cursor = segment.getCursor( segment.header().prevIndex() + 1 ) )
try ( IOCursor<EntryRecord> cursor = last.getCursor( firstIndexInLastSegmentFile ) )
{ {
while ( cursor.next() ) while ( cursor.next() )
{ {
EntryRecord entry = cursor.get(); EntryRecord entry = cursor.get();
state.appendIndex = entry.logIndex(); state.appendIndex = entry.logIndex();
state.currentTerm = entry.logEntry().term(); state.terms.append( state.appendIndex, entry.logEntry().term() );
} }
} }


if ( mustRecoverLastHeader )
{
SegmentHeader header = new SegmentHeader( state.appendIndex, expectedVersion, state.appendIndex, state.terms.latest() );
log.warn( "Recovering last file based on next-to-last file. " + header );

File file = fileNames.getForVersion( expectedVersion );
writeHeader( fileSystem, file, header );

segment = new SegmentFile( fileSystem, file, readerPool, expectedVersion, contentMarshal, logProvider, header );
segmentFiles.add( segment );
}

return state; return state;
} }


Expand Down Expand Up @@ -172,15 +182,15 @@ private static void writeHeader(
} }
} }


private void checkVersionStrictlyMonotonic( long fileNameVersion ) throws DamagedLogStorageException private static void checkVersionSequence( long fileNameVersion, long expectedVersion ) throws DamagedLogStorageException
{ {
if ( fileNameVersion != expectedVersion ) if ( fileNameVersion != expectedVersion )
{ {
throw new DamagedLogStorageException( "File versions not strictly monotonic. Expected: %d but found: %d", expectedVersion, fileNameVersion ); throw new DamagedLogStorageException( "File versions not strictly monotonic. Expected: %d but found: %d", expectedVersion, fileNameVersion );
} }
} }


private void checkVersionMatches( long headerVersion, long fileNameVersion ) throws DamagedLogStorageException private static void checkVersionMatches( long headerVersion, long fileNameVersion ) throws DamagedLogStorageException
{ {
if ( headerVersion != fileNameVersion ) if ( headerVersion != fileNameVersion )
{ {
Expand Down
Expand Up @@ -35,8 +35,6 @@
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static java.lang.String.format;

/** /**
* The segmented RAFT log is an append only log supporting the operations required to support * The segmented RAFT log is an append only log supporting the operations required to support
* the RAFT consensus algorithm. * the RAFT consensus algorithm.
Expand Down Expand Up @@ -118,7 +116,7 @@ public synchronized long append( RaftLogEntry... entries ) throws IOException
for ( RaftLogEntry entry : entries ) for ( RaftLogEntry entry : entries )
{ {
state.appendIndex++; state.appendIndex++;
updateTerm( entry ); state.terms.append( state.appendIndex, entry.term() );
state.segments.last().write( state.appendIndex, entry ); state.segments.last().write( state.appendIndex, entry );
} }
state.segments.last().flush(); state.segments.last().flush();
Expand All @@ -131,7 +129,7 @@ public synchronized long append( RaftLogEntry... entries ) throws IOException


if ( state.segments.last().position() >= rotateAtSize ) if ( state.segments.last().position() >= rotateAtSize )
{ {
rotateSegment( state.appendIndex, state.appendIndex, state.currentTerm ); rotateSegment( state.appendIndex, state.appendIndex, state.terms.latest() );
} }


return state.appendIndex; return state.appendIndex;
Expand All @@ -145,20 +143,6 @@ private void ensureOk()
} }
} }


private void updateTerm( RaftLogEntry entry )
{
if ( entry.term() >= state.currentTerm )
{
state.currentTerm = entry.term();
}
else
{
throw new IllegalStateException(
format( "Non-monotonic term %d for entry %s in term %d", entry.term(), entry.toString(),
state.currentTerm ) );
}
}

@Override @Override
public synchronized void truncate( long fromIndex ) throws IOException public synchronized void truncate( long fromIndex ) throws IOException
{ {
Expand All @@ -173,7 +157,7 @@ public synchronized void truncate( long fromIndex ) throws IOException
truncateSegment( state.appendIndex, newAppendIndex, newTerm ); truncateSegment( state.appendIndex, newAppendIndex, newTerm );


state.appendIndex = newAppendIndex; state.appendIndex = newAppendIndex;
state.currentTerm = newTerm; state.terms.truncate( fromIndex );
} }


private void rotateSegment( long prevFileLastIndex, long prevIndex, long prevTerm ) throws IOException private void rotateSegment( long prevFileLastIndex, long prevIndex, long prevTerm ) throws IOException
Expand Down Expand Up @@ -216,15 +200,14 @@ public RaftLogCursor getEntryCursor( long fromIndex ) throws IOException
@Override @Override
public synchronized long skip( long newIndex, long newTerm ) throws IOException public synchronized long skip( long newIndex, long newTerm ) throws IOException
{ {
log.info( "Skipping from {index: %d, term: %d} to {index: %d, term: %d}", state.appendIndex, state.currentTerm, newIndex, newTerm ); log.info( "Skipping from {index: %d, term: %d} to {index: %d, term: %d}", state.appendIndex, state.terms.latest(), newIndex, newTerm );
if ( state.appendIndex < newIndex ) if ( state.appendIndex < newIndex )
{ {
skipSegment( state.appendIndex, newIndex, newTerm ); skipSegment( state.appendIndex, newIndex, newTerm );

state.terms.skip( newIndex, newTerm );
state.prevTerm = newTerm;
state.currentTerm = newTerm;


state.prevIndex = newIndex; state.prevIndex = newIndex;
state.prevTerm = newTerm;
state.appendIndex = newIndex; state.appendIndex = newIndex;
} }


Expand All @@ -242,17 +225,13 @@ private RaftLogEntry readLogEntry( long logIndex ) throws IOException
@Override @Override
public long readEntryTerm( long logIndex ) throws IOException public long readEntryTerm( long logIndex ) throws IOException
{ {
if ( logIndex == state.prevIndex ) long term = state.terms.get( logIndex );
if ( term == -1 && logIndex >= state.prevIndex )
{ {
return state.prevTerm; RaftLogEntry entry = readLogEntry( logIndex );
term = (entry != null) ? entry.term() : -1;
} }
else if ( logIndex < state.prevIndex || logIndex > state.appendIndex ) return term;
{
return -1;
}

RaftLogEntry entry = readLogEntry( logIndex );
return entry == null ? -1 : entry.term();
} }


@Override @Override
Expand All @@ -274,6 +253,8 @@ public long prune( long safeIndex ) throws IOException
state.prevTerm = newPrevTerm; state.prevTerm = newPrevTerm;
} }


state.terms.prune( state.prevIndex );

return state.prevIndex; return state.prevIndex;
} }
} }
Expand Up @@ -25,21 +25,19 @@
public class State public class State
{ {
Segments segments; Segments segments;
Terms terms;


long prevIndex = -1; long prevIndex = -1;
long prevTerm = -1; long prevTerm = -1;
long appendIndex = -1; long appendIndex = -1;
long currentTerm = -1;


@Override @Override
public String toString() public String toString()
{ {
return "State{" + return "State{" +
"segments=" + segments + "prevIndex=" + prevIndex +
", prevIndex=" + prevIndex +
", prevTerm=" + prevTerm + ", prevTerm=" + prevTerm +
", appendIndex=" + appendIndex + ", appendIndex=" + appendIndex +
", currentTerm=" + currentTerm +
'}'; '}';
} }
} }
Expand Up @@ -21,6 +21,8 @@


import java.util.Arrays; import java.util.Arrays;


import static java.lang.Math.max;

/** /**
* Keeps track of all the terms in memory for efficient lookup. * Keeps track of all the terms in memory for efficient lookup.
* The implementation favours lookup of recent entries. * The implementation favours lookup of recent entries.
Expand Down Expand Up @@ -100,11 +102,11 @@ synchronized void truncate( long fromIndex )
/** /**
* Prune up to specified index. * Prune up to specified index.
* *
* @param upToIndex The last index to prune (inclusive). * @param upToIndex The last index to prune (exclusive).
*/ */
synchronized void prune( long upToIndex ) synchronized void prune( long upToIndex )
{ {
min = upToIndex + 1; min = max( upToIndex, min );
// could also prune out array // could also prune out array
} }


Expand Down Expand Up @@ -135,4 +137,9 @@ synchronized long get( long logIndex )


throw new RuntimeException( "Should be possible to find index >= min" ); throw new RuntimeException( "Should be possible to find index >= min" );
} }

synchronized long latest()
{
return terms[size - 1];
}
} }
Expand Up @@ -64,7 +64,7 @@ public void shouldReturnEmptyStateOnEmptyDirectory() throws Exception


// then // then
assertEquals( -1, state.appendIndex ); assertEquals( -1, state.appendIndex );
assertEquals( -1, state.currentTerm ); assertEquals( -1, state.terms.latest() );
assertEquals( -1, state.prevIndex ); assertEquals( -1, state.prevIndex );
assertEquals( -1, state.prevTerm ); assertEquals( -1, state.prevTerm );
assertEquals( 0, state.segments.last().header().version() ); assertEquals( 0, state.segments.last().header().version() );
Expand Down

0 comments on commit b8606cf

Please sign in to comment.