Skip to content

Commit

Permalink
Introduce readAll method in StoreChannel that will read as much data as
Browse files Browse the repository at this point in the history
its possible to place to provided buffer and wil throw IllegalStateException
in case if end of stream reached before buffer was filled.
  • Loading branch information
MishaDemianenko committed Nov 29, 2017
1 parent c502bf4 commit 126324b
Show file tree
Hide file tree
Showing 21 changed files with 95 additions and 57 deletions.
Expand Up @@ -49,6 +49,12 @@ public int read( ByteBuffer dst, long position ) throws IOException
throw new UnsupportedOperationException();
}

@Override
public void readAll( ByteBuffer dst ) throws IOException
{
throw new UnsupportedOperationException();
}

@Override
public void force( boolean metaData ) throws IOException
{
Expand Down
6 changes: 6 additions & 0 deletions community/io/src/main/java/org/neo4j/io/fs/OffsetChannel.java
Expand Up @@ -72,6 +72,12 @@ public int read( ByteBuffer dst, long position ) throws IOException
return delegate.read( dst, offset( position ) );
}

@Override
public void readAll( ByteBuffer dst ) throws IOException
{
delegate.readAll( dst );
}

@Override
public void force( boolean metaData ) throws IOException
{
Expand Down
12 changes: 12 additions & 0 deletions community/io/src/main/java/org/neo4j/io/fs/StoreChannel.java
Expand Up @@ -25,6 +25,7 @@
import java.nio.channels.FileLock;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.InterruptibleChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.SeekableByteChannel;

Expand Down Expand Up @@ -56,6 +57,17 @@ public interface StoreChannel
*/
int read( ByteBuffer dst, long position ) throws IOException;

/**
* Try to Read a sequence of bytes from channel into the given buffer, till the buffer will be full.
* In case if end of channel will be reached during reading {@link IllegalStateException} will be thrown.
*
* @param dst destination buffer.
* @throws IOException if an I/O exception occurs.
* @throws IllegalStateException if end of stream reached during reading.
* @see ReadableByteChannel#read(ByteBuffer)
*/
void readAll( ByteBuffer dst ) throws IOException;

void force( boolean metaData ) throws IOException;

@Override
Expand Down
13 changes: 13 additions & 0 deletions community/io/src/main/java/org/neo4j/io/fs/StoreFileChannel.java
Expand Up @@ -99,6 +99,19 @@ public int read( ByteBuffer dst, long position ) throws IOException
return channel.read( dst, position );
}

@Override
public void readAll( ByteBuffer dst ) throws IOException
{
while ( dst.hasRemaining() )
{
int bytesRead = channel.read( dst );
if ( bytesRead < 0 )
{
throw new IllegalStateException( "Channel has reached end-of-stream." );
}
}
}

@Override
public void force( boolean metaData ) throws IOException
{
Expand Down
Expand Up @@ -119,6 +119,12 @@ public int read( ByteBuffer dst, long position ) throws IOException
return delegate.read( dst, position );
}

@Override
public void readAll( ByteBuffer dst ) throws IOException
{
delegate.readAll( dst );
}

@Override
public long position() throws IOException
{
Expand Down
Expand Up @@ -40,7 +40,6 @@
import org.neo4j.io.fs.StoreChannel;

import static java.nio.ByteBuffer.allocate;
import static java.nio.ByteBuffer.allocateDirect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -263,8 +262,8 @@ private void verifyFileIsFullOfLongIntegerOnes( StoreChannel channel )
try
{
long claimedSize = channel.size();
ByteBuffer buffer = allocateDirect( (int) claimedSize );
channel.read( buffer, 0 );
ByteBuffer buffer = allocate( (int) claimedSize );
channel.readAll( buffer );
buffer.flip();

for ( int position = 0; position < claimedSize; position += 8 )
Expand All @@ -284,7 +283,7 @@ private void verifyFileIsEitherEmptyOrContainsLongIntegerValueOne( StoreChannel
try
{
long claimedSize = channel.size();
ByteBuffer buffer = allocateDirect( 8 );
ByteBuffer buffer = allocate( 8 );
channel.read( buffer, 0 );
buffer.flip();

Expand Down Expand Up @@ -313,15 +312,15 @@ private void verifyFileIsEitherEmptyOrContainsLongIntegerValueOne( StoreChannel

private ByteBuffer readLong( StoreChannel readChannel ) throws IOException
{
ByteBuffer readBuffer = allocateDirect( 8 );
readChannel.read( readBuffer );
ByteBuffer readBuffer = allocate( 8 );
readChannel.readAll( readBuffer );
readBuffer.flip();
return readBuffer;
}

private void writeLong( StoreChannel channel, long value ) throws IOException
{
ByteBuffer buffer = allocateDirect( 8 );
ByteBuffer buffer = allocate( 8 );
buffer.putLong( value );
buffer.flip();
channel.write( buffer );
Expand Down
Expand Up @@ -111,6 +111,12 @@ public int read( ByteBuffer byteBuffer, long l ) throws IOException
return inner.read( byteBuffer, l );
}

@Override
public void readAll( ByteBuffer dst ) throws IOException
{
inner.readAll( dst );
}

@Override
public FileLock tryLock() throws IOException
{
Expand Down
Expand Up @@ -1768,7 +1768,7 @@ public void writesOfDifferentUnitsMustHaveCorrectEndianess() throws Exception
ByteBuffer buf = ByteBuffer.allocate( 20 );
try ( StoreChannel channel = fs.open( file( "a" ), OpenMode.READ ) )
{
channel.read( buf );
channel.readAll( buf );
}
buf.flip();

Expand Down Expand Up @@ -2120,7 +2120,7 @@ public void evictionMustFlushPagesToTheRightFiles() throws IOException
for ( int i = 0; i < recordCount; i++ )
{
bufA.clear();
channel.read( bufA );
channel.readAll( bufA );
bufA.flip();
bufB.clear();
generateRecordForId( i, bufB );
Expand Down
Expand Up @@ -360,7 +360,7 @@ private ByteBuffer readIntoBuffer( String fileName ) throws IOException
ByteBuffer buffer = ByteBuffer.allocate( 16 );
try ( StoreChannel channel = fs.open( file( fileName ), OpenMode.READ ) )
{
channel.read( buffer );
channel.readAll( buffer );
}
buffer.flip();
return buffer;
Expand Down
Expand Up @@ -183,7 +183,7 @@ private boolean readIdBatch0() throws IOException
ByteBuffer readBuffer = ByteBuffer.allocate( bytesToRead );

channel.position( startPosition );
readAll( bytesToRead, readBuffer );
channel.readAll( readBuffer );
stackPosition = startPosition;

readBuffer.flip();
Expand All @@ -200,21 +200,6 @@ private boolean readIdBatch0() throws IOException
return true;
}

private void readAll( int bytesToRead, ByteBuffer readBuffer ) throws IOException
{
int totalRead = 0;
do
{
int bytesRead = channel.read( readBuffer );
if ( bytesRead <= 0 )
{
throw new IllegalStateException( "Unexpected value returned: " + bytesRead );
}
totalRead += bytesRead;
}
while ( totalRead < bytesToRead );
}

/**
* Flushes the currently collected in-memory freed IDs to the storage.
*/
Expand Down
Expand Up @@ -138,12 +138,7 @@ private long readAndValidateHeader() throws IOException
private static long readAndValidate( StoreChannel channel, File fileName ) throws IOException
{
ByteBuffer buffer = ByteBuffer.allocate( HEADER_SIZE );
int totalBytesRead = readBuffer( channel, buffer );
if ( totalBytesRead != HEADER_SIZE )
{
throw new InvalidIdGeneratorException(
"Unable to read header, bytes read: " + totalBytesRead );
}
readHeader( channel, buffer );
buffer.flip();
byte storageStatus = buffer.get();
if ( storageStatus != CLEAN_GENERATOR )
Expand Down Expand Up @@ -315,20 +310,18 @@ public static void createEmptyIdFile( FileSystemAbstraction fs, File file, long
}
}

private static int readBuffer( StoreChannel channel, ByteBuffer buffer ) throws IOException
private static void readHeader( StoreChannel channel, ByteBuffer buffer ) throws IOException
{
int totalBytesRead = 0;
int currentBytesRead;
do
try
{
currentBytesRead = channel.read( buffer );
if ( currentBytesRead > 0 )
{
totalBytesRead += currentBytesRead;
}
channel.readAll( buffer );
}
catch ( IllegalStateException e )
{
ByteBuffer exceptionBuffer = buffer.duplicate();
exceptionBuffer.flip();
throw new InvalidIdGeneratorException( "Unable to read header, bytes read: " + Arrays.toString( getBufferBytes( exceptionBuffer ) ) );
}
while ( buffer.hasRemaining() && currentBytesRead >= 0 );
return totalBytesRead;
}

@Override
Expand All @@ -338,4 +331,11 @@ public String toString()
freeIdKeeper.getCount() + ", grabSize=" + grabSize + ", aggressiveReuse=" +
aggressiveReuse + ", closed=" + closed + '}';
}

private static byte[] getBufferBytes( ByteBuffer buffer )
{
byte[] bytes = new byte[buffer.position()];
buffer.get( bytes );
return bytes;
}
}
Expand Up @@ -227,11 +227,11 @@ private KeyValueWriter newWriter( FileSystemAbstraction fs, File path, ReadableB
private KeyValueStoreFile open( FileSystemAbstraction fs, File path, PageCache pages ) throws IOException
{
ByteBuffer buffer = ByteBuffer.wrap( new byte[maxSize * 4] );
try ( StoreChannel file = fs.open( path, OpenMode.READ ) )
try ( StoreChannel channel = fs.open( path, OpenMode.READ ) )
{
while ( buffer.hasRemaining() )
{
int bytes = file.read( buffer );
int bytes = channel.read( buffer );
if ( bytes == -1 )
{
break;
Expand Down
Expand Up @@ -66,6 +66,12 @@ public int read( ByteBuffer dst, long position ) throws IOException
throw new UnsupportedOperationException( "Not needed" );
}

@Override
public void readAll( ByteBuffer dst ) throws IOException
{
throw new UnsupportedOperationException( "Not needed" );
}

@Override
public void force( boolean metaData ) throws IOException
{
Expand Down
Expand Up @@ -152,7 +152,7 @@ public class JumpingFileChannel extends StoreFileChannel
{
private final int recordSize;

public JumpingFileChannel( StoreFileChannel actual, int recordSize )
JumpingFileChannel( StoreFileChannel actual, int recordSize )
{
super( actual );
this.recordSize = recordSize;
Expand Down
Expand Up @@ -110,7 +110,7 @@ public void createShouldClearExistingFile() throws Exception
try ( StoreChannel r = fs.open( indexFile, OpenMode.READ ) )
{
byte[] firstBytes = new byte[someBytes.length];
r.read( ByteBuffer.wrap( firstBytes ) );
r.readAll( ByteBuffer.wrap( firstBytes ) );
assertNotEquals( "Expected previous file content to have been cleared but was still there",
someBytes, firstBytes );
}
Expand Down
Expand Up @@ -140,7 +140,7 @@ public void createIdGeneratorMustRefuseOverwritingExistingFile() throws IOExcept
// verify that id generator is ok
StoreChannel fileChannel = fs.open( idGeneratorFile(), OpenMode.READ_WRITE );
ByteBuffer buffer = ByteBuffer.allocate( 9 );
assertEquals( 9, fileChannel.read( buffer ) );
fileChannel.readAll( buffer );
buffer.flip();
assertEquals( (byte) 0, buffer.get() );
assertEquals( 0L, buffer.getLong() );
Expand Down
Expand Up @@ -276,7 +276,7 @@ public StoreChannel open( File fileName, OpenMode openMode ) throws IOException
return new DelegatingStoreChannel( super.open( fileName, openMode ) )
{
@Override
public int read( ByteBuffer dst ) throws IOException
public void readAll( ByteBuffer dst ) throws IOException
{
fired.setValue( true );
throw new IOException( "Proving a point here" );
Expand Down
Expand Up @@ -262,7 +262,7 @@ private ByteBuffer readFile( File file ) throws IOException
try ( StoreChannel channel = fileSystemRule.get().open( file, OpenMode.READ ) )
{
ByteBuffer buffer = ByteBuffer.allocate( (int) channel.size() );
channel.read( buffer );
channel.readAll( buffer );
buffer.flip();
return buffer;
}
Expand Down
Expand Up @@ -119,10 +119,9 @@ public void absoluteVersusRelative() throws Exception
// WHEN
channel = fs.open( new File( file.getAbsolutePath() ), OpenMode.READ );
byte[] readBytes = new byte[bytes.length];
int nrOfReadBytes = channel.read( ByteBuffer.wrap( readBytes ) );
channel.readAll( ByteBuffer.wrap( readBytes ) );

// THEN
assertEquals( bytes.length, nrOfReadBytes );
assertTrue( Arrays.equals( bytes, readBytes ) );
fs.close();
}
Expand Down
Expand Up @@ -117,7 +117,7 @@ public synchronized void storeIndexFailure( String failure ) throws IOException
try ( StoreChannel channel = fs.open( failureFile, OpenMode.READ_WRITE ) )
{
byte[] existingData = new byte[(int) channel.size()];
channel.read( ByteBuffer.wrap( existingData ) );
channel.readAll( ByteBuffer.wrap( existingData ) );
channel.position( lengthOf( existingData ) );

byte[] data = UTF8.encode( failure );
Expand All @@ -139,8 +139,8 @@ private String readFailure( File failureFile ) throws IOException
try ( StoreChannel channel = fs.open( failureFile, OpenMode.READ ) )
{
byte[] data = new byte[(int) channel.size()];
int readData = channel.read( ByteBuffer.wrap( data ) );
return readData <= 0 ? "" : UTF8.decode( withoutZeros( data ) );
channel.read( ByteBuffer.wrap( data ) );
return UTF8.decode( withoutZeros( data ) );
}
}

Expand Down Expand Up @@ -168,7 +168,7 @@ private boolean isFailed( File failureFile ) throws IOException
try ( StoreChannel channel = fs.open( failureFile, OpenMode.READ ) )
{
byte[] data = new byte[(int) channel.size()];
channel.read( ByteBuffer.wrap( data ) );
channel.readAll( ByteBuffer.wrap( data ) );
channel.close();
return !allZero( data );
}
Expand Down
Expand Up @@ -138,7 +138,7 @@ private byte[] prefetch() throws IOException
break;
}
}
while ( byteBuffer.remaining() > 0 );
while ( byteBuffer.hasRemaining() );

if ( byteBuffer.position() > 0 )
{
Expand Down

0 comments on commit 126324b

Please sign in to comment.