Skip to content

Commit

Permalink
Updates to ReadAheadChannel after review.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Jun 28, 2016
1 parent f8010a6 commit a5112fc
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 7 deletions.
Expand Up @@ -54,9 +54,16 @@ public ReadAheadChannel( T channel, int readAheadSize )
this.readAheadSize = readAheadSize;
}

/**
* This is the position within the buffered stream (and not the
* underlying channel, which will generally be further ahead).
*
* @return The position within the buffered stream.
* @throws IOException
*/
public long position() throws IOException
{
return offset();
return channel.position() - aheadBuffer.remaining();
}

@Override
Expand Down Expand Up @@ -173,11 +180,6 @@ protected T next( T channel ) throws IOException
return channel;
}

protected long offset() throws IOException
{
return channel.position() - aheadBuffer.remaining();
}

/*
* Moves bytes between aheadBuffer.position() and aheadBuffer.capacity() to the beginning of aheadBuffer. At the
* end of this call the aheadBuffer is positioned in end of that moved content.
Expand Down
Expand Up @@ -56,7 +56,7 @@ public byte getLogFormatVersion()
@Override
public LogPositionMarker getCurrentPosition( LogPositionMarker positionMarker ) throws IOException
{
positionMarker.mark( channel.getVersion(), offset() );
positionMarker.mark( channel.getVersion(), position() );
return positionMarker;
}

Expand Down
Expand Up @@ -170,4 +170,52 @@ protected StoreChannel next( StoreChannel channel ) throws IOException
// outstanding
}
}

@Test
public void shouldReturnPositionWithinBufferedStream() throws Exception
{
// given
EphemeralFileSystemAbstraction fsa = new EphemeralFileSystemAbstraction();
File file = new File( "foo.txt" );

int readAheadSize = 512;
int fileSize = readAheadSize * 8;

createFile( fsa, file, fileSize );
ReadAheadChannel<StoreChannel> bufferedReader = new ReadAheadChannel<>( fsa.open( file, "r" ), readAheadSize );

// when
for ( int i = 0; i < fileSize / Long.BYTES; i++ )
{
assertEquals( Long.BYTES * i, bufferedReader.position() );
bufferedReader.getLong();
}

assertEquals( fileSize, bufferedReader.position() );

try
{
bufferedReader.getLong();
fail();
}
catch ( ReadPastEndException e )
{
// expected
}

assertEquals( fileSize, bufferedReader.position() );
}

private void createFile( EphemeralFileSystemAbstraction fsa, File name, int bufferSize ) throws IOException
{
StoreChannel storeChannel = fsa.open( name, "w" );
ByteBuffer buffer = ByteBuffer.allocate( bufferSize );
for ( int i = 0; i < bufferSize; i++ )
{
buffer.put( (byte) i );
}
buffer.flip();
storeChannel.writeAll( buffer );
storeChannel.close();
}
}

0 comments on commit a5112fc

Please sign in to comment.