From 4836ed151ad13cb753eacd0af359eacb954ff673 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Wed, 5 Sep 2018 12:02:39 +0200 Subject: [PATCH] Have BoundedNetworkChannel implement WritableChannel --- ...onRepresentationReplicatedTransaction.java | 3 +- .../messaging/BoundedNetworkChannel.java | 56 +++++++++---------- .../messaging/BoundedNetworkChannelTest.java | 10 ++-- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransaction.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransaction.java index b743bcd8b27f9..b07d5bec328c2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransaction.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/TransactionRepresentationReplicatedTransaction.java @@ -156,10 +156,9 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws Exception { txWriter.write( channel ); } - // nothing more to write, flush latest chunk and close channel + // nothing more to write, close the channel to get the potential last buffer if ( output.isEmpty() ) { - channel.prepareForFlush().flush(); channel.close(); } return output.poll(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java index 5c2962c529f7d..84180217312b7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannel.java @@ -24,22 +24,19 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; -import java.io.Flushable; -import java.io.IOException; import java.util.Objects; import java.util.Queue; -import org.neo4j.kernel.impl.transaction.log.FlushableChannel; +import org.neo4j.storageengine.api.WritableChannel; import static java.lang.Integer.min; /** - * Uses provied allocator to create {@link ByteBuf}. The byte bufs will be split if maxChunkSize is reached. The full buffer is then added - * to the provided output and a new buffer is allocated. + * Uses provided allocator to create {@link ByteBuf}. The buffers will be split if maximum size is reached. The full buffer is then added + * to the provided output and a new buffer is allocated. If the output queue is bounded then writing to this channel may block! */ -public class BoundedNetworkChannel implements FlushableChannel +public class BoundedNetworkChannel implements WritableChannel, AutoCloseable { private static final int DEFAULT_INIT_CHUNK_SIZE = 512; private final ByteBufAllocator allocator; @@ -52,7 +49,7 @@ public class BoundedNetworkChannel implements FlushableChannel /** * @param allocator used to allocated {@link ByteBuf} * @param maxChunkSize when reached the current buffer will be moved to the @param outputQueue and a new {@link ByteBuf} is allocated - * @param outputQueue full or flushed buffers are added here. + * @param outputQueue full or flushed buffers are added here. If this queue is bounded then writing to this channel may block! */ public BoundedNetworkChannel( ByteBufAllocator allocator, int maxChunkSize, Queue outputQueue ) { @@ -68,19 +65,8 @@ public BoundedNetworkChannel( ByteBufAllocator allocator, int maxChunkSize, Queu this.byteBufs = outputQueue; } - /** - * @return When called will move the current buffer to the queue. - * This should always be called when finished writing to the buffer to ensure - * that the last buffer is moved to the output. - */ @Override - public Flushable prepareForFlush() - { - return this::storeCurrent; - } - - @Override - public FlushableChannel put( byte value ) + public WritableChannel put( byte value ) { checkState(); prepareWrite( 1 ); @@ -89,7 +75,7 @@ public FlushableChannel put( byte value ) } @Override - public FlushableChannel putShort( short value ) + public WritableChannel putShort( short value ) { checkState(); prepareWrite( Short.BYTES ); @@ -98,7 +84,7 @@ public FlushableChannel putShort( short value ) } @Override - public FlushableChannel putInt( int value ) + public WritableChannel putInt( int value ) { checkState(); prepareWrite( Integer.BYTES ); @@ -107,7 +93,7 @@ public FlushableChannel putInt( int value ) } @Override - public FlushableChannel putLong( long value ) + public WritableChannel putLong( long value ) { checkState(); prepareWrite( Long.BYTES ); @@ -116,7 +102,7 @@ public FlushableChannel putLong( long value ) } @Override - public FlushableChannel putFloat( float value ) + public WritableChannel putFloat( float value ) { checkState(); prepareWrite( Float.BYTES ); @@ -125,7 +111,7 @@ public FlushableChannel putFloat( float value ) } @Override - public FlushableChannel putDouble( double value ) + public WritableChannel putDouble( double value ) { checkState(); prepareWrite( Double.BYTES ); @@ -134,7 +120,7 @@ public FlushableChannel putDouble( double value ) } @Override - public FlushableChannel put( byte[] value, int length ) + public WritableChannel put( byte[] value, int length ) { checkState(); int writeIndex = 0; @@ -150,6 +136,15 @@ public FlushableChannel put( byte[] value, int length ) return this; } + /** + * Move the current buffer to the output. + */ + public WritableChannel flush() + { + storeCurrent(); + return this; + } + private int prepareGently( int size ) { if ( getOrCreateCurrent().writerIndex() == maxChunkSize ) @@ -219,12 +214,17 @@ private void checkState() } } + /** + * Flushes and closes the channel + * + * @see #flush() + */ @Override - public void close() throws IOException + public void close() { try { - prepareForFlush().flush(); + flush(); } finally { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java index a6a4750abba38..fa25c8c3eceb5 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/BoundedNetworkChannelTest.java @@ -42,7 +42,7 @@ public class BoundedNetworkChannelTest public final Buffers buffers = new Buffers(); @Test - public void shouldSerializeIntoChunksOfGivenSize() throws IOException + public void shouldSerializeIntoChunksOfGivenSize() { // given int chunkSize = 8; @@ -58,7 +58,7 @@ public void shouldSerializeIntoChunksOfGivenSize() throws IOException channel.putShort( (short) 1 ); channel.putLong( 1 ); channel.put( array, array.length ); - channel.prepareForFlush().flush(); + channel.flush(); // when ByteBuf combinedByteBuf = buffers.buffer(); @@ -83,7 +83,7 @@ public void shouldSerializeIntoChunksOfGivenSize() throws IOException } @Test - public void shouldReturnNullIfQueueIsEmpty() throws IOException + public void shouldReturnNullIfQueueIsEmpty() { // given int chunkSize = 8; @@ -107,14 +107,14 @@ public void shouldReturnNullIfQueueIsEmpty() throws IOException assertNull( byteBufs.poll() ); // when - channel.prepareForFlush().flush(); + channel.flush(); // then assertNotNull( byteBufs.poll() ); } @Test( expected = IllegalStateException.class ) - public void shouldThrowIllegalStatAfterClosed() throws IOException + public void shouldThrowIllegalStatAfterClosed() { int chunkSize = 8; BoundedNetworkChannel channel = new BoundedNetworkChannel( buffers, chunkSize, new LinkedList<>() );