Skip to content

Commit

Permalink
Improve error and close handling in decoders
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Jun 11, 2018
1 parent 44c2587 commit 39b7412
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
Expand Up @@ -81,7 +81,6 @@ public void install( Channel channel ) throws Exception
.server( channel, log ) .server( channel, log )
.modify( modifiers ) .modify( modifiers )
.addFraming() .addFraming()
.onClose( decodingDispatcher::close )
.add( "raft_content_type_dispatcher", new ContentTypeDispatcher( contentTypeProtocol ) ) .add( "raft_content_type_dispatcher", new ContentTypeDispatcher( contentTypeProtocol ) )
.add( "raft_component_decoder", decodingDispatcher ) .add( "raft_component_decoder", decodingDispatcher )
.add( "raft_content_decoder", new ReplicatedContentDecoder( contentTypeProtocol ) ) .add( "raft_content_decoder", new ReplicatedContentDecoder( contentTypeProtocol ) )
Expand Down
Expand Up @@ -33,13 +33,15 @@
import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


public class DecodingDispatcher extends RequestDecoderDispatcher<ContentType> implements AutoCloseable public class DecodingDispatcher extends RequestDecoderDispatcher<ContentType>
{ {
private final ReplicatedContentChunkDecoder decoder; private final ReplicatedContentChunkDecoder decoder;


public DecodingDispatcher( Protocol<ContentType> protocol, LogProvider logProvider ) public DecodingDispatcher( Protocol<ContentType> protocol, LogProvider logProvider )
{ {
super( protocol, logProvider ); super( protocol, logProvider );
decoder = new ReplicatedContentChunkDecoder();

register( ContentType.ContentType, new ByteToMessageDecoder() register( ContentType.ContentType, new ByteToMessageDecoder()
{ {
@Override @Override
Expand All @@ -52,13 +54,12 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
} }
} ); } );
register( ContentType.RaftLogEntryTerms, new RaftLogEntryTermsDecoder( protocol ) ); register( ContentType.RaftLogEntryTerms, new RaftLogEntryTermsDecoder( protocol ) );
decoder = new ReplicatedContentChunkDecoder();
register( ContentType.ReplicatedContent, decoder ); register( ContentType.ReplicatedContent, decoder );
register( ContentType.Message, new RaftMessageDecoder( protocol ) ); register( ContentType.Message, new RaftMessageDecoder( protocol ) );
} }


@Override @Override
public void close() public void channelInactive( ChannelHandlerContext ctx )
{ {
decoder.close(); decoder.close();
} }
Expand Down
Expand Up @@ -34,28 +34,24 @@


public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder implements AutoCloseable public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder implements AutoCloseable
{ {
private final CoreReplicatedContentMarshal contentMarshal = new CoreReplicatedContentMarshal();

private UnfinishedChunk unfinishedChunk; private UnfinishedChunk unfinishedChunk;
private final CoreReplicatedContentMarshal coreReplicatedContentMarshal = new CoreReplicatedContentMarshal();
private boolean closed;


@Override @Override
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception
{ {
if ( closed )
{
return;
}
try try
{ {

boolean isLast = in.readBoolean(); boolean isLast = in.readBoolean();
if ( unfinishedChunk == null ) if ( unfinishedChunk == null )
{ {
byte contentType = in.readByte(); byte contentType = in.readByte();
int allocationSize = in.readInt(); int allocationSize = in.readInt();
if ( isLast ) if ( isLast )
{ {
out.add( coreReplicatedContentMarshal.read( contentType,
out.add( contentMarshal.read( contentType,
new NetworkReadableClosableChannelNetty4( in.readSlice( in.readableBytes() ) ) ) ); new NetworkReadableClosableChannelNetty4( in.readSlice( in.readableBytes() ) ) ) );
} }
else else
Expand All @@ -79,14 +75,14 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )


if ( isLast ) if ( isLast )
{ {
out.add( coreReplicatedContentMarshal.read( unfinishedChunk.contentType, out.add( contentMarshal.read( unfinishedChunk.contentType,
new NetworkReadableClosableChannelNetty4( unfinishedChunk.content() ) ) ); new NetworkReadableClosableChannelNetty4( unfinishedChunk.content() ) ) );
unfinishedChunk.release(); unfinishedChunk.release();
unfinishedChunk = null; unfinishedChunk = null;
} }
} }
} }
catch ( Exception e ) catch ( Throwable e )
{ {
release(); release();
throw e; throw e;
Expand All @@ -98,12 +94,13 @@ private void release()
if ( unfinishedChunk != null ) if ( unfinishedChunk != null )
{ {
unfinishedChunk.release(); unfinishedChunk.release();
unfinishedChunk = null;
} }
} }


@Override
public void close() public void close()
{ {
closed = true;
release(); release();
} }


Expand Down

0 comments on commit 39b7412

Please sign in to comment.