From ad022d66104174dc45f5d487ab84d09ff1818848 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Tue, 5 Jun 2018 12:08:36 +0200 Subject: [PATCH] Assume chunk is last and re-write otherwise --- .../marshalling/ChunkedReplicatedContent.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java index e6d81c2545fc0..5c20c4bc154e4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/ChunkedReplicatedContent.java @@ -39,7 +39,7 @@ public class ChunkedReplicatedContent implements Marshal, ChunkedInput private final byte contentType; private final ByteBufAwareMarshal byteBufAwareMarshal; private final int chunkSize; - private boolean lastByteWasWritten; + private boolean endOfInput; private int progress; public ChunkedReplicatedContent( byte contentType, ByteBufAwareMarshal byteBufAwareMarshal, int chunkSize ) @@ -68,7 +68,7 @@ public void marshal( WritableChannel channel ) throws IOException @Override public boolean isEndOfInput() { - return lastByteWasWritten; + return endOfInput; } @Override @@ -86,15 +86,16 @@ public ByteBuf readChunk( ChannelHandlerContext ctx ) throws IOException @Override public ByteBuf readChunk( ByteBufAllocator allocator ) throws IOException { - boolean endOfInput = isEndOfInput(); - if ( endOfInput ) + if ( isEndOfInput() ) { return null; } + // assume this is the last chunk + boolean lastChunk = true; ByteBuf buffer = allocator.buffer( chunkSize ); try { - buffer.writeBoolean( endOfInput ); + buffer.writeBoolean( lastChunk ); if ( progress() == 0 ) { // extra metadata on first chunk @@ -103,10 +104,11 @@ public ByteBuf readChunk( ByteBufAllocator allocator ) throws IOException } if ( !byteBufAwareMarshal.encode( buffer ) ) { - lastByteWasWritten = true; + this.endOfInput = true; } - if ( isEndOfInput() != endOfInput ) + if ( isEndOfInput() != lastChunk ) { + // status changed after writing to buffer. buffer.setBoolean( 0, isEndOfInput() ); } progress += buffer.readableBytes();