Skip to content

Commit

Permalink
core-edge: end of stream, marshals and cursors
Browse files Browse the repository at this point in the history
Uses EndOfStreamException everywhere to circumvent the issue
of ReadPastEndException inheriting from IOException, which is
problematic since in general you want to handle end of stream
differently than arbitrary IOExceptions. Also the marshallers
now do not return null to signify the end of stream condition,
but rather actually throw the EndOfStreamException for proper
handling. This also allows the few cases were null actually
was a valid state to be marshalled correctly and without
special case handling.

Introduces abstract marshal wrappers for taking easy advantage
of the EndOfStreamException and catching ReadPastEndExceptions.

Renames cursors to cursor (instead of reader) all over the place
in test and production code, where applicable.

Pushes more code into EntryRecordCursor instead of overriding
the behaviour in the SegmentFile, making things cleaner, even
though the two classes interact tightly.

Fixes an issue where channels were not being closed correctly
for cursors reading past the end.

Fixes an issue where channels experiencing errors would still
be returned to the pool.
  • Loading branch information
martinfurmanski committed Jun 30, 2016
1 parent 361756b commit 62bd4f5
Show file tree
Hide file tree
Showing 39 changed files with 498 additions and 376 deletions.
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.coreedge.raft.state.EndOfStreamException;
import org.neo4j.storageengine.api.ReadPastEndException;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;
Expand Down Expand Up @@ -51,33 +52,24 @@ public long logIndex()
return logIndex;
}

public static EntryRecord read(
ReadableChannel channel,
ChannelMarshal<ReplicatedContent> contentMarshal ) throws IOException
public static EntryRecord read( ReadableChannel channel, ChannelMarshal<ReplicatedContent> contentMarshal )
throws IOException, EndOfStreamException
{
try
{
long appendIndex = channel.getLong();
long term = channel.getLong();
ReplicatedContent content = contentMarshal.unmarshal( channel );
if (content == null )
{
return null;
}
return new EntryRecord( appendIndex, new RaftLogEntry( term, content ) );
}
catch ( ReadPastEndException e )
{
return null;
throw EndOfStreamException.INSTANCE;
}
}

public static void write(
WritableChannel channel,
ChannelMarshal<ReplicatedContent> contentMarshal,
long logIndex,
long term,
ReplicatedContent content ) throws IOException
public static void write( WritableChannel channel, ChannelMarshal<ReplicatedContent> contentMarshal,
long logIndex, long term, ReplicatedContent content ) throws IOException
{
channel.putLong( logIndex );
channel.putLong( term );
Expand Down
Expand Up @@ -68,7 +68,7 @@ private int dump( String filenameOrDirectory, PrintStream out )

out.println( header.toString() );

try ( IOCursor<EntryRecord> cursor = segment.getReader( header.prevIndex() + 1 ) )
try ( IOCursor<EntryRecord> cursor = segment.getCursor( header.prevIndex() + 1 ) )
{
while ( cursor.next() )
{
Expand Down
Expand Up @@ -35,7 +35,7 @@
class EntryCursor implements IOCursor<EntryRecord>
{
private final Segments segments;
private IOCursor<EntryRecord> reader;
private IOCursor<EntryRecord> cursor;
private ValueRange<Long,SegmentFile> segmentRange = null;
private long currentIndex;

Expand All @@ -60,9 +60,9 @@ public boolean next() throws IOException
}
}

if ( reader.next() )
if ( cursor.next() )
{
currentRecord.set( reader.get() );
currentRecord.set( cursor.get() );
return true;
}

Expand All @@ -84,20 +84,20 @@ private boolean nextSegment() throws IOException
SegmentFile file = optionalFile.get();

/* Open new reader before closing old, so that pruner cannot overtake us. */
IOCursor<EntryRecord> oldReader = reader;
IOCursor<EntryRecord> oldCursor = cursor;
try
{
reader = file.getReader( currentIndex );
cursor = file.getCursor( currentIndex );
}
catch ( DisposedException e )
{
currentRecord.invalidate();
return false;
}

if ( oldReader != null )
if ( oldCursor != null )
{
oldReader.close();
oldCursor.close();
}

limit = segmentRange.limit().orElse( Long.MAX_VALUE );
Expand All @@ -108,9 +108,9 @@ private boolean nextSegment() throws IOException
@Override
public void close() throws IOException
{
if ( reader != null )
if ( cursor != null )
{
reader.close();
cursor.close();
}
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.coreedge.raft.log.LogPosition;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.coreedge.raft.state.EndOfStreamException;
import org.neo4j.cursor.CursorValue;
import org.neo4j.cursor.IOCursor;
import org.neo4j.io.fs.StoreChannel;
Expand All @@ -42,49 +43,84 @@ class EntryRecordCursor implements IOCursor<EntryRecord>

private final LogPosition position;
private final CursorValue<EntryRecord> currentRecord = new CursorValue<>();
private final Reader reader;
private ChannelMarshal<ReplicatedContent> contentMarshal;
private final SegmentFile segment;

EntryRecordCursor( ReadAheadChannel<StoreChannel> bufferedReader,
ChannelMarshal<ReplicatedContent> contentMarshal, long startIndex ) throws IOException
private boolean hadError;
private boolean closed;

EntryRecordCursor( Reader reader, ChannelMarshal<ReplicatedContent> contentMarshal,
long currentIndex, long wantedIndex, SegmentFile segment ) throws IOException, EndOfStreamException
{
this.bufferedReader = bufferedReader;
this.bufferedReader = new ReadAheadChannel<>( reader.channel() );
this.reader = reader;
this.contentMarshal = contentMarshal;
position = new LogPosition( startIndex, bufferedReader.position() );
this.segment = segment;

/* The cache lookup might have given us an earlier position, scan forward to the exact position. */
while ( currentIndex < wantedIndex )
{
read( bufferedReader, contentMarshal );
currentIndex++;
}

this.position = new LogPosition( currentIndex, bufferedReader.position() );
}

@Override
public boolean next() throws IOException
{
EntryRecord entryRecord = read( bufferedReader, contentMarshal );
if ( entryRecord != null )
EntryRecord entryRecord;
try
{
currentRecord.set( entryRecord );
position.byteOffset = bufferedReader.position();
position.logIndex++;
return true;
entryRecord = read( bufferedReader, contentMarshal );
}
else
catch ( EndOfStreamException e )
{
currentRecord.invalidate();
return false;
}
catch ( IOException e )
{
hadError = true;
throw e;
}

currentRecord.set( entryRecord );
position.byteOffset = bufferedReader.position();
position.logIndex++;
return true;
}

@Override
public void close() throws IOException
{
// the cursor does not own any resources, the channel is owned by the pooled Reader
if ( closed )
{
/* This is just a defensive measure, for catching user errors from messing up the refCount. */
throw new IllegalStateException( "Already closed" );
}

bufferedReader = null;
closed = true;
segment.refCount().decrease();

if ( hadError )
{
/* If the reader had en error, then it should be closed instead of returned to the pool. */
reader.close();
}
else
{
segment.positionCache().put( position );
segment.readerPool().release( reader );
}
}

@Override
public EntryRecord get()
{
return currentRecord.get();
}

public LogPosition position()
{
return position;
}
}
Expand Up @@ -30,7 +30,7 @@
import org.neo4j.coreedge.raft.log.EntryRecord;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.coreedge.raft.state.UnexpectedEndOfStreamException;
import org.neo4j.coreedge.raft.state.EndOfStreamException;
import org.neo4j.cursor.IOCursor;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
Expand Down Expand Up @@ -103,7 +103,7 @@ State run() throws IOException, DamagedLogStorageException, DisposedException
{
header = loadHeader( fileSystem, file );
}
catch ( UnexpectedEndOfStreamException e )
catch ( EndOfStreamException e )
{
if ( files.lastKey() != fileNameVersion )
{
Expand Down Expand Up @@ -143,11 +143,11 @@ State run() throws IOException, DamagedLogStorageException, DisposedException
state.currentTerm = last.header().prevTerm();

long firstIndexInLastSegmentFile = last.header().prevIndex() + 1;
try ( IOCursor<EntryRecord> reader = last.getReader( firstIndexInLastSegmentFile ) )
try ( IOCursor<EntryRecord> cursor = last.getCursor( firstIndexInLastSegmentFile ) )
{
while ( reader.next() )
while ( cursor.next() )
{
EntryRecord entry = reader.get();
EntryRecord entry = cursor.get();
state.appendIndex = entry.logIndex();
state.currentTerm = entry.logEntry().term();
}
Expand All @@ -158,7 +158,7 @@ State run() throws IOException, DamagedLogStorageException, DisposedException

private static SegmentHeader loadHeader(
FileSystemAbstraction fileSystem,
File file ) throws IOException, UnexpectedEndOfStreamException
File file ) throws IOException, EndOfStreamException
{
try ( StoreChannel channel = fileSystem.open( file, "r" ) )
{
Expand Down
Expand Up @@ -28,17 +28,15 @@
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.coreedge.raft.state.EndOfStreamException;
import org.neo4j.cursor.IOCursor;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalFlushableChannel;
import org.neo4j.kernel.impl.transaction.log.ReadAheadChannel;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.ReadPastEndException;

import static java.lang.String.format;
import static org.neo4j.coreedge.raft.log.EntryRecord.read;

/**
* Keeps track of a segment of the RAFT log, i.e. a consecutive set of entries.
Expand Down Expand Up @@ -111,7 +109,7 @@ static SegmentFile create( FileSystemAbstraction fileSystem, File file, ReaderPo
/**
* Channels must be closed when no longer used, so that they are released back to the pool of readers.
*/
IOCursor<EntryRecord> getReader( long logIndex ) throws IOException, DisposedException
IOCursor<EntryRecord> getCursor( long logIndex ) throws IOException, DisposedException
{
assert logIndex > header.prevIndex();

Expand All @@ -128,47 +126,18 @@ IOCursor<EntryRecord> getReader( long logIndex ) throws IOException, DisposedExc

try
{
ReadAheadChannel<StoreChannel> bufferedReader = new ReadAheadChannel<>( reader.channel() );
long currentIndex = position.logIndex;
if ( scanStats != null )
{
scanStats.collect( offsetIndex - currentIndex );
}

try
{
/* The cache lookup might have given us an earlier position, scan forward to the exact position. */
while ( currentIndex < offsetIndex )
{
read( bufferedReader, contentMarshal );
currentIndex++;
}
}
catch ( ReadPastEndException e )
{
bufferedReader.close();
return IOCursor.getEmpty();
}

return new EntryRecordCursor( bufferedReader, contentMarshal, currentIndex )
{
boolean closed = false; /* This is just a defensive measure, for catching user errors from messing up the refCount. */

@Override
public void close()
{
if ( closed )
{
throw new IllegalStateException( "Already closed" );
}

/* The reader owns the channel and it is returned to the pool with it open. */
closed = true;
positionCache.put( this.position() );
readerPool.release( reader );
refCount.decrease();
}
};
return new EntryRecordCursor( reader, contentMarshal, currentIndex, offsetIndex, this );
}
catch ( EndOfStreamException e )
{
readerPool.release( reader );
refCount.decrease();
return IOCursor.getEmpty();
}
catch ( IOException e )
{
Expand Down Expand Up @@ -286,4 +255,19 @@ public String toString()
", header=" + header +
'}';
}

ReferenceCounter refCount()
{
return refCount;
}

PositionCache positionCache()
{
return positionCache;
}

public ReaderPool readerPool()
{
return readerPool;
}
}

0 comments on commit 62bd4f5

Please sign in to comment.