Skip to content

Commit

Permalink
Have BoundedNetworkChannel implement WritableChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
RagnarW authored and martinfurmanski committed Sep 10, 2018
1 parent 86f79ac commit 4836ed1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 35 deletions.
Expand Up @@ -156,10 +156,9 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception
{
txWriter.write( channel );
}
// nothing more to write, flush latest chunk and close channel
// nothing more to write, close the channel to get the potential last buffer
if ( output.isEmpty() )
{
channel.prepareForFlush().flush();
channel.close();
}
return output.poll();
Expand Down
Expand Up @@ -24,22 +24,19 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;

import java.io.Flushable;
import java.io.IOException;
import java.util.Objects;
import java.util.Queue;

import org.neo4j.kernel.impl.transaction.log.FlushableChannel;
import org.neo4j.storageengine.api.WritableChannel;

import static java.lang.Integer.min;

/**
* Uses provied allocator to create {@link ByteBuf}. The byte bufs will be split if maxChunkSize is reached. The full buffer is then added
* to the provided output and a new buffer is allocated.
* Uses provided allocator to create {@link ByteBuf}. The buffers will be split if maximum size is reached. The full buffer is then added
* to the provided output and a new buffer is allocated. If the output queue is bounded then writing to this channel may block!
*/
public class BoundedNetworkChannel implements FlushableChannel
public class BoundedNetworkChannel implements WritableChannel, AutoCloseable
{
private static final int DEFAULT_INIT_CHUNK_SIZE = 512;
private final ByteBufAllocator allocator;
Expand All @@ -52,7 +49,7 @@ public class BoundedNetworkChannel implements FlushableChannel
/**
* @param allocator used to allocated {@link ByteBuf}
* @param maxChunkSize when reached the current buffer will be moved to the @param outputQueue and a new {@link ByteBuf} is allocated
* @param outputQueue full or flushed buffers are added here.
* @param outputQueue full or flushed buffers are added here. If this queue is bounded then writing to this channel may block!
*/
public BoundedNetworkChannel( ByteBufAllocator allocator, int maxChunkSize, Queue<ByteBuf> outputQueue )
{
Expand All @@ -68,19 +65,8 @@ public BoundedNetworkChannel( ByteBufAllocator allocator, int maxChunkSize, Queu
this.byteBufs = outputQueue;
}

/**
* @return When called will move the current buffer to the queue.
* This should always be called when finished writing to the buffer to ensure
* that the last buffer is moved to the output.
*/
@Override
public Flushable prepareForFlush()
{
return this::storeCurrent;
}

@Override
public FlushableChannel put( byte value )
public WritableChannel put( byte value )
{
checkState();
prepareWrite( 1 );
Expand All @@ -89,7 +75,7 @@ public FlushableChannel put( byte value )
}

@Override
public FlushableChannel putShort( short value )
public WritableChannel putShort( short value )
{
checkState();
prepareWrite( Short.BYTES );
Expand All @@ -98,7 +84,7 @@ public FlushableChannel putShort( short value )
}

@Override
public FlushableChannel putInt( int value )
public WritableChannel putInt( int value )
{
checkState();
prepareWrite( Integer.BYTES );
Expand All @@ -107,7 +93,7 @@ public FlushableChannel putInt( int value )
}

@Override
public FlushableChannel putLong( long value )
public WritableChannel putLong( long value )
{
checkState();
prepareWrite( Long.BYTES );
Expand All @@ -116,7 +102,7 @@ public FlushableChannel putLong( long value )
}

@Override
public FlushableChannel putFloat( float value )
public WritableChannel putFloat( float value )
{
checkState();
prepareWrite( Float.BYTES );
Expand All @@ -125,7 +111,7 @@ public FlushableChannel putFloat( float value )
}

@Override
public FlushableChannel putDouble( double value )
public WritableChannel putDouble( double value )
{
checkState();
prepareWrite( Double.BYTES );
Expand All @@ -134,7 +120,7 @@ public FlushableChannel putDouble( double value )
}

@Override
public FlushableChannel put( byte[] value, int length )
public WritableChannel put( byte[] value, int length )
{
checkState();
int writeIndex = 0;
Expand All @@ -150,6 +136,15 @@ public FlushableChannel put( byte[] value, int length )
return this;
}

/**
* Move the current buffer to the output.
*/
public WritableChannel flush()
{
storeCurrent();
return this;
}

private int prepareGently( int size )
{
if ( getOrCreateCurrent().writerIndex() == maxChunkSize )
Expand Down Expand Up @@ -219,12 +214,17 @@ private void checkState()
}
}

/**
* Flushes and closes the channel
*
* @see #flush()
*/
@Override
public void close() throws IOException
public void close()
{
try
{
prepareForFlush().flush();
flush();
}
finally
{
Expand Down
Expand Up @@ -42,7 +42,7 @@ public class BoundedNetworkChannelTest
public final Buffers buffers = new Buffers();

@Test
public void shouldSerializeIntoChunksOfGivenSize() throws IOException
public void shouldSerializeIntoChunksOfGivenSize()
{
// given
int chunkSize = 8;
Expand All @@ -58,7 +58,7 @@ public void shouldSerializeIntoChunksOfGivenSize() throws IOException
channel.putShort( (short) 1 );
channel.putLong( 1 );
channel.put( array, array.length );
channel.prepareForFlush().flush();
channel.flush();

// when
ByteBuf combinedByteBuf = buffers.buffer();
Expand All @@ -83,7 +83,7 @@ public void shouldSerializeIntoChunksOfGivenSize() throws IOException
}

@Test
public void shouldReturnNullIfQueueIsEmpty() throws IOException
public void shouldReturnNullIfQueueIsEmpty()
{
// given
int chunkSize = 8;
Expand All @@ -107,14 +107,14 @@ public void shouldReturnNullIfQueueIsEmpty() throws IOException
assertNull( byteBufs.poll() );

// when
channel.prepareForFlush().flush();
channel.flush();

// then
assertNotNull( byteBufs.poll() );
}

@Test( expected = IllegalStateException.class )
public void shouldThrowIllegalStatAfterClosed() throws IOException
public void shouldThrowIllegalStatAfterClosed()
{
int chunkSize = 8;
BoundedNetworkChannel channel = new BoundedNetworkChannel( buffers, chunkSize, new LinkedList<>() );
Expand Down

0 comments on commit 4836ed1

Please sign in to comment.