diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java index eec98119d5819..0c358a93fa8cf 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java @@ -158,7 +158,8 @@ protected void initChannel( SocketChannel ch ) pipeline.addLast( new TxPullRequestHandler( protocol, storeIdSupplier, dataSourceAvailabilitySupplier, transactionIdStoreSupplier, logicalTransactionStoreSupplier, monitors, - logProvider ) ); pipeline.addLast( new ChunkedWriteHandler() ); + logProvider ) ); + pipeline.addLast( new ChunkedWriteHandler() ); pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier, checkPointerSupplier ) ); pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RequestDecoderDispatcher.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RequestDecoderDispatcher.java index 84d0e07e3c366..8c4339d5c9006 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RequestDecoderDispatcher.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RequestDecoderDispatcher.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; import java.util.HashMap; import java.util.Map; @@ -48,6 +49,12 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio if ( delegate == null ) { log.warn( "Unregistered handler for protocol %s", protocol ); + + /* + * Since we cannot process this message further we need to release the message as per netty doc + * see http://netty.io/wiki/reference-counted-objects.html#inbound-messages + */ + ReferenceCountUtil.release( msg ); return; } delegate.channelRead( ctx, msg ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RequestMessageTypeEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RequestMessageTypeEncoder.java index 2a17e4a3a5105..6dc610ebb583c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RequestMessageTypeEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RequestMessageTypeEncoder.java @@ -19,19 +19,15 @@ */ package org.neo4j.causalclustering.catchup; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -public class RequestMessageTypeEncoder extends MessageToMessageEncoder +public class RequestMessageTypeEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, RequestMessageType request, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, RequestMessageType request, ByteBuf out ) throws Exception { - ByteBuf encoded = ctx.alloc().buffer(); - encoded.writeByte( request.messageType() ); - out.add( encoded ); + out.writeByte( request.messageType() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/ResponseMessageTypeEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/ResponseMessageTypeEncoder.java index d176d096ca595..182aa37747844 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/ResponseMessageTypeEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/ResponseMessageTypeEncoder.java @@ -19,19 +19,15 @@ */ package org.neo4j.causalclustering.catchup; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -public class ResponseMessageTypeEncoder extends MessageToMessageEncoder +public class ResponseMessageTypeEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, ResponseMessageType response, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, ResponseMessageType response, ByteBuf out ) throws Exception { - ByteBuf encoded = ctx.alloc().buffer(); - encoded.writeByte( response.messageType() ); - out.add( encoded ); + out.writeByte( response.messageType() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/SimpleRequestDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/SimpleRequestDecoder.java index 5c67162e7059e..08479c72bec17 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/SimpleRequestDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/SimpleRequestDecoder.java @@ -28,6 +28,11 @@ import org.neo4j.causalclustering.messaging.Message; import org.neo4j.function.Factory; +/** + * This class extends {@link MessageToMessageDecoder} because if it extended + * {@link io.netty.handler.codec.ByteToMessageDecoder} instead the decode method would fail as no + * bytes are consumed from the ByteBuf but an object is added in the out list. + */ class SimpleRequestDecoder extends MessageToMessageDecoder { private Factory factory; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileContentDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileContentDecoder.java index fb5c7b60c4ad9..dfaca28ef530c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileContentDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileContentDecoder.java @@ -25,6 +25,12 @@ import java.util.List; +/** + * This class does not consume bytes during the decode method. Instead, it puts a {@link FileContent} object with + * a reference to the buffer, to be consumed later. This is the reason it does not extend + * {@link io.netty.handler.codec.ByteToMessageDecoder}, since that class fails if an object is added in the out + * list but no bytes have been consumed. + */ public class FileContentDecoder extends MessageToMessageDecoder { @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java index dd4eba7e862c5..5ef8f2178e0a3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderDecoder.java @@ -21,11 +21,11 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; -public class FileHeaderDecoder extends MessageToMessageDecoder +public class FileHeaderDecoder extends ByteToMessageDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java index c4778d2e5db97..b65578b0eacae 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/FileHeaderEncoder.java @@ -19,25 +19,19 @@ */ package org.neo4j.causalclustering.catchup.storecopy; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -public class FileHeaderEncoder extends MessageToMessageEncoder +public class FileHeaderEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, FileHeader msg, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, FileHeader msg, ByteBuf out ) throws Exception { - ByteBuf buffer = ctx.alloc().buffer(); - String name = msg.fileName(); - buffer.writeInt( name.length() ); - buffer.writeBytes( name.getBytes() ); - buffer.writeLong( msg.fileLength() ); - - out.add( buffer ); + out.writeInt( name.length() ); + out.writeBytes( name.getBytes() ); + out.writeLong( msg.fileLength() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdRequestEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdRequestEncoder.java index 6e76db1514150..eab374379136d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdRequestEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdRequestEncoder.java @@ -21,17 +21,13 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -import java.util.List; - -public class GetStoreIdRequestEncoder extends MessageToMessageEncoder +public class GetStoreIdRequestEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, GetStoreIdRequest msg, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, GetStoreIdRequest msg, ByteBuf out ) throws Exception { - ByteBuf buffer = ctx.alloc().buffer(); - buffer.writeByte( 0 ); - out.add( buffer ); + out.writeByte( 0 ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseDecoder.java index e00ce93b70acc..1b5a399980377 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreIdResponseDecoder.java @@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; @@ -29,7 +29,7 @@ import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; -public class GetStoreIdResponseDecoder extends MessageToMessageDecoder +public class GetStoreIdResponseDecoder extends ByteToMessageDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestDecoder.java index 0f856dcb7a1d7..673cc6b58bfbc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestDecoder.java @@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; @@ -29,7 +29,7 @@ import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; -public class GetStoreRequestDecoder extends MessageToMessageDecoder +public class GetStoreRequestDecoder extends ByteToMessageDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestEncoder.java index 74ce86cbfccbe..f102f90c8df9c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetStoreRequestEncoder.java @@ -21,20 +21,16 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; - -import java.util.List; +import io.netty.handler.codec.MessageToByteEncoder; import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; -public class GetStoreRequestEncoder extends MessageToMessageEncoder +public class GetStoreRequestEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, GetStoreRequest msg, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, GetStoreRequest msg, ByteBuf out ) throws Exception { - ByteBuf buffer = ctx.alloc().buffer(); - StoreIdMarshal.INSTANCE.marshal( msg.expectedStoreId(), new NetworkFlushableChannelNetty4( buffer ) ); - out.add( buffer ); + StoreIdMarshal.INSTANCE.marshal( msg.expectedStoreId(), new NetworkFlushableChannelNetty4( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyFinishedResponseDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyFinishedResponseDecoder.java index 942f861eab2f4..4c28ef0660f54 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyFinishedResponseDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyFinishedResponseDecoder.java @@ -21,13 +21,13 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse.Status; -public class StoreCopyFinishedResponseDecoder extends MessageToMessageDecoder +public class StoreCopyFinishedResponseDecoder extends ByteToMessageDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyFinishedResponseEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyFinishedResponseEncoder.java index d42070fb4d5ac..3874fd500185c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyFinishedResponseEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyFinishedResponseEncoder.java @@ -19,20 +19,16 @@ */ package org.neo4j.causalclustering.catchup.storecopy; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -public class StoreCopyFinishedResponseEncoder extends MessageToMessageEncoder +public class StoreCopyFinishedResponseEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, StoreCopyFinishedResponse msg, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, StoreCopyFinishedResponse msg, ByteBuf out ) throws Exception { - ByteBuf buffer = ctx.alloc().buffer(); - buffer.writeInt( msg.status().ordinal() ); - buffer.writeLong( msg.lastCommittedTxBeforeStoreCopy() ); - out.add( buffer ); - } + out.writeInt( msg.status().ordinal() ); + out.writeLong( msg.lastCommittedTxBeforeStoreCopy() ); + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestDecoder.java index e1150e09b1f62..84644f4476133 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestDecoder.java @@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; @@ -29,7 +29,7 @@ import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; -public class TxPullRequestDecoder extends MessageToMessageDecoder +public class TxPullRequestDecoder extends ByteToMessageDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestEncoder.java index e694181922a12..a63b9efc04d76 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestEncoder.java @@ -19,23 +19,19 @@ */ package org.neo4j.causalclustering.catchup.tx; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; -public class TxPullRequestEncoder extends MessageToMessageEncoder +public class TxPullRequestEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, TxPullRequest request, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, TxPullRequest request, ByteBuf out ) throws Exception { - ByteBuf encoded = ctx.alloc().buffer(); - encoded.writeLong( request.previousTxId() ); - StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( encoded ) ); - out.add( encoded ); + out.writeLong( request.previousTxId() ); + StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseDecoder.java index 2b7a7f8d852b8..1a3128606f524 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseDecoder.java @@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; @@ -34,7 +34,7 @@ import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader; -public class TxPullResponseDecoder extends MessageToMessageDecoder +public class TxPullResponseDecoder extends ByteToMessageDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java index cf354d23a3440..3b2b3a35292a0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullResponseEncoder.java @@ -19,26 +19,21 @@ */ package org.neo4j.causalclustering.catchup.tx; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -import org.neo4j.com.CommittedTransactionSerializer; import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal; +import org.neo4j.com.CommittedTransactionSerializer; -public class TxPullResponseEncoder extends MessageToMessageEncoder +public class TxPullResponseEncoder extends MessageToByteEncoder { - @Override - protected void encode( ChannelHandlerContext ctx, TxPullResponse response, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, TxPullResponse response, ByteBuf out ) throws Exception { - ByteBuf encoded = ctx.alloc().buffer(); - NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( encoded ); + NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); StoreIdMarshal.INSTANCE.marshal( response.storeId(), channel ); new CommittedTransactionSerializer( channel ).visit( response.tx() ); - out.add( encoded ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxStreamFinishedResponseDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxStreamFinishedResponseDecoder.java index 730799e98ed48..2fe7bc402d5c7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxStreamFinishedResponseDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxStreamFinishedResponseDecoder.java @@ -21,13 +21,13 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; import org.neo4j.causalclustering.catchup.CatchupResult; -public class TxStreamFinishedResponseDecoder extends MessageToMessageDecoder +public class TxStreamFinishedResponseDecoder extends ByteToMessageDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxStreamFinishedResponseEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxStreamFinishedResponseEncoder.java index 59023cb4c7b4a..e6bdf8ca76cc6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxStreamFinishedResponseEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxStreamFinishedResponseEncoder.java @@ -19,20 +19,16 @@ */ package org.neo4j.causalclustering.catchup.tx; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -public class TxStreamFinishedResponseEncoder extends MessageToMessageEncoder +public class TxStreamFinishedResponseEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, TxStreamFinishedResponse response, List out ) throws + protected void encode( ChannelHandlerContext ctx, TxStreamFinishedResponse response, ByteBuf out ) throws Exception { - ByteBuf encoded = ctx.alloc().buffer(); - encoded.writeInt( response.status().ordinal() ); - out.add( encoded ); + out.writeInt( response.status().ordinal() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotDecoder.java index d518077e1ff69..430880c27e474 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotDecoder.java @@ -21,13 +21,13 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; -public class CoreSnapshotDecoder extends MessageToMessageDecoder +public class CoreSnapshotDecoder extends ByteToMessageDecoder { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List out ) throws Exception diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java index 1ca4cf858a30b..23f997fdb74e2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotEncoder.java @@ -19,21 +19,17 @@ */ package org.neo4j.causalclustering.core.state.snapshot; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; -public class CoreSnapshotEncoder extends MessageToMessageEncoder +public class CoreSnapshotEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, CoreSnapshot coreSnapshot, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, CoreSnapshot coreSnapshot, ByteBuf out ) throws Exception { - ByteBuf encoded = ctx.alloc().buffer(); - new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkFlushableByteBuf( encoded ) ); - out.add( encoded ); + new CoreSnapshot.Marshal().marshal( coreSnapshot, new NetworkFlushableByteBuf( out ) ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotRequestEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotRequestEncoder.java index 34d693fb72ae5..ab9b95080e066 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotRequestEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/snapshot/CoreSnapshotRequestEncoder.java @@ -19,19 +19,15 @@ */ package org.neo4j.causalclustering.core.state.snapshot; -import java.util.List; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -public class CoreSnapshotRequestEncoder extends MessageToMessageEncoder +public class CoreSnapshotRequestEncoder extends MessageToByteEncoder { @Override - protected void encode( ChannelHandlerContext ctx, CoreSnapshotRequest msg, List out ) throws Exception + protected void encode( ChannelHandlerContext ctx, CoreSnapshotRequest msg, ByteBuf out ) throws Exception { - ByteBuf buffer = ctx.alloc().buffer(); - buffer.writeByte( 0 ); - out.add( buffer ); + out.writeByte( 0 ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageDecoder.java index 0bfbc7c3fff6a..aea4af60fedc2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageDecoder.java @@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; import java.io.IOException; import java.util.List; @@ -44,7 +44,7 @@ import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.VOTE_REQUEST; import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.VOTE_RESPONSE; -public class RaftMessageDecoder extends MessageToMessageDecoder +public class RaftMessageDecoder extends ByteToMessageDecoder { private final ChannelMarshal marshal; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncoder.java index cd346b800d1b3..15e119f1ded55 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncoder.java @@ -19,10 +19,9 @@ */ package org.neo4j.causalclustering.messaging.marshalling; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageEncoder; - -import java.util.List; +import io.netty.handler.codec.MessageToByteEncoder; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; @@ -31,7 +30,7 @@ import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf; -public class RaftMessageEncoder extends MessageToMessageEncoder +public class RaftMessageEncoder extends MessageToByteEncoder { private final ChannelMarshal marshal; @@ -43,13 +42,13 @@ public RaftMessageEncoder( ChannelMarshal marshal ) @Override protected synchronized void encode( ChannelHandlerContext ctx, RaftMessages.ClusterIdAwareMessage decoratedMessage, - List list ) throws Exception + ByteBuf out ) throws Exception { RaftMessages.RaftMessage message = decoratedMessage.message(); ClusterId clusterId = decoratedMessage.clusterId(); MemberId.Marshal memberMarshal = new MemberId.Marshal(); - NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( ctx.alloc().buffer() ); + NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out ); ClusterId.Marshal.INSTANCE.marshal( clusterId, channel ); channel.putInt( message.type().ordinal() ); memberMarshal.marshal( message.from(), channel ); @@ -123,7 +122,5 @@ else if ( message instanceof RaftMessages.LogCompactionInfo ) { throw new IllegalArgumentException( "Unknown message type: " + message ); } - - list.add( channel.buffer() ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/storeid/StoreIdMarshal.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/storeid/StoreIdMarshal.java index dedc127d9355a..73218a1a47960 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/storeid/StoreIdMarshal.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/storeid/StoreIdMarshal.java @@ -51,7 +51,6 @@ public void marshal( StoreId storeId, WritableChannel channel ) throws IOExcepti protected StoreId unmarshal0( ReadableChannel channel ) throws IOException { - byte exists = channel.get(); if ( exists == 0 ) { @@ -59,7 +58,7 @@ protected StoreId unmarshal0( ReadableChannel channel ) throws IOException } else if ( exists != 1 ) { - throw new DecoderException( "Unexpected value" ); + throw new DecoderException( "Unexpected value: " + exists ); } long creationTime = channel.getLong(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncodingDecodingTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncodingDecodingTest.java index 608002cdceb15..c2d6eed89f4ba 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncodingDecodingTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncodingDecodingTest.java @@ -26,16 +26,15 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.UUID; -import org.neo4j.causalclustering.core.consensus.roles.AppendEntriesRequestBuilder; -import org.neo4j.causalclustering.core.consensus.roles.AppendEntriesResponseBuilder; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.ReplicatedInteger; +import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; +import org.neo4j.causalclustering.core.consensus.roles.AppendEntriesRequestBuilder; +import org.neo4j.causalclustering.core.consensus.roles.AppendEntriesResponseBuilder; import org.neo4j.causalclustering.core.consensus.vote.VoteRequestBuilder; import org.neo4j.causalclustering.core.consensus.vote.VoteResponseBuilder; -import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry; import org.neo4j.causalclustering.core.replication.ReplicatedContent; import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.identity.ClusterId; @@ -96,8 +95,6 @@ public void shouldSerializeHeartbeats() throws Exception RaftMessageEncoder encoder = new RaftMessageEncoder( marshal ); RaftMessageDecoder decoder = new RaftMessageDecoder( marshal ); - // Netty puts buffers with serialized content in this list - LinkedList resultingBuffers = new LinkedList<>(); // Deserialization adds read objects in this list ArrayList thingsRead = new ArrayList<>( 1 ); @@ -105,17 +102,27 @@ public void shouldSerializeHeartbeats() throws Exception MemberId sender = new MemberId( UUID.randomUUID() ); RaftMessages.ClusterIdAwareMessage message = new RaftMessages.ClusterIdAwareMessage( clusterId, new RaftMessages.Heartbeat( sender, 1, 2, 3 ) ); - encoder.encode( setupContext(), message, resultingBuffers ); - - // Then - assertEquals( 1, resultingBuffers.size() ); + ChannelHandlerContext ctx = setupContext(); + ByteBuf buffer = null; + try + { + buffer = ctx.alloc().buffer(); + encoder.encode( ctx, message, buffer ); - // When - decoder.decode( null, (ByteBuf) resultingBuffers.get( 0 ), thingsRead ); + // When + decoder.decode( null, buffer, thingsRead ); - // Then - assertEquals( 1, thingsRead.size() ); - assertEquals( message, thingsRead.get( 0 ) ); + // Then + assertEquals( 1, thingsRead.size() ); + assertEquals( message, thingsRead.get( 0 ) ); + } + finally + { + if ( buffer != null ) + { + buffer.release(); + } + } } @Test @@ -150,25 +157,33 @@ private void serializeReadBackAndVerifyMessage( RaftMessages.RaftMessage message RaftMessageEncoder encoder = new RaftMessageEncoder( marshal ); RaftMessageDecoder decoder = new RaftMessageDecoder( marshal ); - // Netty puts buffers with serialized content in this list - LinkedList resultingBuffers = new LinkedList<>(); // Deserialization adds read objects in this list ArrayList thingsRead = new ArrayList<>( 1 ); // When RaftMessages.ClusterIdAwareMessage decoratedMessage = new RaftMessages.ClusterIdAwareMessage( clusterId, message ); - encoder.encode( setupContext(), decoratedMessage, resultingBuffers ); - - // Then - assertEquals( 1, resultingBuffers.size() ); + ChannelHandlerContext ctx = setupContext(); + ByteBuf buffer = null; + try + { + buffer = ctx.alloc().buffer(); + encoder.encode( ctx, decoratedMessage, buffer ); - // When - decoder.decode( null, (ByteBuf) resultingBuffers.get( 0 ), thingsRead ); + // When + decoder.decode( null, buffer, thingsRead ); - // Then - assertEquals( 1, thingsRead.size() ); - assertEquals( decoratedMessage, thingsRead.get( 0 ) ); + // Then + assertEquals( 1, thingsRead.size() ); + assertEquals( decoratedMessage, thingsRead.get( 0 ) ); + } + finally + { + if ( buffer != null ) + { + buffer.release(); + } + } } private static ChannelHandlerContext setupContext()