diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstallerV2.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstallerV2.java index 0035f25a2fe13..d765f8759c415 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstallerV2.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/protocol/v2/RaftProtocolServerInstallerV2.java @@ -81,7 +81,6 @@ public void install( Channel channel ) throws Exception .server( channel, log ) .modify( modifiers ) .addFraming() - .onClose( decodingDispatcher::close ) .add( "raft_content_type_dispatcher", new ContentTypeDispatcher( contentTypeProtocol ) ) .add( "raft_component_decoder", decodingDispatcher ) .add( "raft_content_decoder", new ReplicatedContentDecoder( contentTypeProtocol ) ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/DecodingDispatcher.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/DecodingDispatcher.java index 080507c76b8d7..6ec82b0ca3195 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/DecodingDispatcher.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/DecodingDispatcher.java @@ -33,13 +33,15 @@ import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; import org.neo4j.logging.LogProvider; -public class DecodingDispatcher extends RequestDecoderDispatcher implements AutoCloseable +public class DecodingDispatcher extends RequestDecoderDispatcher { private final ReplicatedContentChunkDecoder decoder; public DecodingDispatcher( Protocol protocol, LogProvider logProvider ) { super( protocol, logProvider ); + decoder = new ReplicatedContentChunkDecoder(); + register( ContentType.ContentType, new ByteToMessageDecoder() { @Override @@ -52,13 +54,12 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) } } ); register( ContentType.RaftLogEntryTerms, new RaftLogEntryTermsDecoder( protocol ) ); - decoder = new ReplicatedContentChunkDecoder(); register( ContentType.ReplicatedContent, decoder ); register( ContentType.Message, new RaftMessageDecoder( protocol ) ); } @Override - public void close() + public void channelInactive( ChannelHandlerContext ctx ) { decoder.close(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java index 86b960eeb1ae7..66c147866a5aa 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/v2/decoding/ReplicatedContentChunkDecoder.java @@ -34,20 +34,15 @@ public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder implements AutoCloseable { + private final CoreReplicatedContentMarshal contentMarshal = new CoreReplicatedContentMarshal(); + private UnfinishedChunk unfinishedChunk; - private final CoreReplicatedContentMarshal coreReplicatedContentMarshal = new CoreReplicatedContentMarshal(); - private boolean closed; @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) throws Exception { - if ( closed ) - { - return; - } try { - boolean isLast = in.readBoolean(); if ( unfinishedChunk == null ) { @@ -55,7 +50,8 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) int allocationSize = in.readInt(); if ( isLast ) { - out.add( coreReplicatedContentMarshal.read( contentType, + + out.add( contentMarshal.read( contentType, new NetworkReadableClosableChannelNetty4( in.readSlice( in.readableBytes() ) ) ) ); } else @@ -79,14 +75,14 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List out ) if ( isLast ) { - out.add( coreReplicatedContentMarshal.read( unfinishedChunk.contentType, + out.add( contentMarshal.read( unfinishedChunk.contentType, new NetworkReadableClosableChannelNetty4( unfinishedChunk.content() ) ) ); unfinishedChunk.release(); unfinishedChunk = null; } } } - catch ( Exception e ) + catch ( Throwable e ) { release(); throw e; @@ -98,12 +94,13 @@ private void release() if ( unfinishedChunk != null ) { unfinishedChunk.release(); + unfinishedChunk = null; } } + @Override public void close() { - closed = true; release(); }