Skip to content

Commit

Permalink
Improve chunk content decoder
Browse files Browse the repository at this point in the history
Simplifies some code around the decoding and reduced the risk of leaks
by avoiding retains.

*Removed the ChunkHandler, instead set a custom Cumulator in the decoder

*Make content type dispatcher an InboundHanlderAdapter. The previous
implementation had weird sideffects and used retain which increases the
risk of a leak.
  • Loading branch information
RagnarW authored and martinfurmanski committed Sep 10, 2018
1 parent d0e98ce commit ed46113
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 282 deletions.
Expand Up @@ -31,7 +31,6 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.neo4j.causalclustering.messaging.marshalling.v2.ContentTypeProtocol; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentTypeProtocol;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ChunkHandler;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeDispatcher; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ContentTypeDispatcher;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.DecodingDispatcher; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.DecodingDispatcher;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.RaftMessageComposer; import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.RaftMessageComposer;
Expand Down Expand Up @@ -77,13 +76,11 @@ public void install( Channel channel ) throws Exception
{ {


ContentTypeProtocol contentTypeProtocol = new ContentTypeProtocol(); ContentTypeProtocol contentTypeProtocol = new ContentTypeProtocol();
ChunkHandler chunkHandler = new ChunkHandler(); DecodingDispatcher decodingDispatcher = new DecodingDispatcher( contentTypeProtocol, logProvider );
DecodingDispatcher decodingDispatcher = new DecodingDispatcher( contentTypeProtocol, logProvider, chunkHandler );
pipelineBuilderFactory pipelineBuilderFactory
.server( channel, log ) .server( channel, log )
.modify( modifiers ) .modify( modifiers )
.addFraming() .addFraming()
.onClose( chunkHandler::close )
.add( "raft_content_type_dispatcher", new ContentTypeDispatcher( contentTypeProtocol ) ) .add( "raft_content_type_dispatcher", new ContentTypeDispatcher( contentTypeProtocol ) )
.add( "raft_component_decoder", decodingDispatcher ) .add( "raft_component_decoder", decodingDispatcher )
.add( "raft_content_decoder", new ReplicatedContentDecoder( contentTypeProtocol ) ) .add( "raft_content_decoder", new ReplicatedContentDecoder( contentTypeProtocol ) )
Expand Down
Expand Up @@ -34,7 +34,7 @@


public class ChunkedReplicatedContent implements Marshal, ChunkedInput<ByteBuf> public class ChunkedReplicatedContent implements Marshal, ChunkedInput<ByteBuf>
{ {
private static final int METADATA_SIZE = Integer.BYTES + 1; private static final int METADATA_SIZE = 1;


private final byte contentType; private final byte contentType;
private final ByteBufChunkHandler byteBufChunkHandler; private final ByteBufChunkHandler byteBufChunkHandler;
Expand Down Expand Up @@ -117,11 +117,9 @@ private int metadataSize( boolean isFirstChunk )


private ByteBuf writeMetadata( boolean isFirstChunk, ByteBufAllocator allocator, ByteBuf data ) private ByteBuf writeMetadata( boolean isFirstChunk, ByteBufAllocator allocator, ByteBuf data )
{ {
int length = data.writerIndex();
int capacity = metadataSize( isFirstChunk ); int capacity = metadataSize( isFirstChunk );
ByteBuf metaData = allocator.buffer( capacity, capacity ); ByteBuf metaData = allocator.buffer( capacity, capacity );
metaData.writeBoolean( byteBufAwareMarshal.isEndOfInput() ); metaData.writeBoolean( byteBufAwareMarshal.isEndOfInput() );
metaData.writeInt( length );
if ( isFirstChunk ) if ( isFirstChunk )
{ {
metaData.writeByte( contentType ); metaData.writeByte( contentType );
Expand Down
Expand Up @@ -22,6 +22,8 @@
*/ */
package org.neo4j.causalclustering.messaging.marshalling; package org.neo4j.causalclustering.messaging.marshalling;


import io.netty.buffer.ByteBuf;

import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
Expand All @@ -43,7 +45,6 @@
import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal; import org.neo4j.causalclustering.core.state.storage.SafeChannelMarshal;
import org.neo4j.causalclustering.messaging.EndOfStreamException; import org.neo4j.causalclustering.messaging.EndOfStreamException;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4; import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.marshalling.v2.decoding.ChunkHandler;
import org.neo4j.storageengine.api.ReadableChannel; import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel; import org.neo4j.storageengine.api.WritableChannel;


Expand Down Expand Up @@ -110,30 +111,16 @@ else if ( content instanceof DummyRequest )
} }
} }


public ContentBuilder<ReplicatedContent> decode( ChunkHandler.ComposedChunks composedChunks ) throws IOException, EndOfStreamException public ContentBuilder<ReplicatedContent> decode( byte contentType, ByteBuf buffer ) throws IOException, EndOfStreamException
{ {
switch ( composedChunks.contentType() ) switch ( contentType )
{ {
case TX_CONTENT_TYPE: case TX_CONTENT_TYPE:
{ {
try return ContentBuilder.finished( ReplicatedTransactionSerializer.decode( buffer ) );
{
return ContentBuilder.finished( ReplicatedTransactionSerializer.decode( composedChunks.content() ) );
}
finally
{
composedChunks.release();
}
} }
default: default:
try return read( contentType, new NetworkReadableClosableChannelNetty4( buffer ) );
{
return read( composedChunks.contentType(), new NetworkReadableClosableChannelNetty4( composedChunks.content() ) );
}
finally
{
composedChunks.release();
}
} }
} }


Expand Down

This file was deleted.

Expand Up @@ -24,14 +24,13 @@


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.util.ReferenceCountUtil;
import java.util.List;


import org.neo4j.causalclustering.catchup.Protocol; import org.neo4j.causalclustering.catchup.Protocol;
import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType; import org.neo4j.causalclustering.messaging.marshalling.v2.ContentType;


public class ContentTypeDispatcher extends ByteToMessageDecoder public class ContentTypeDispatcher extends ChannelInboundHandlerAdapter
{ {
private final Protocol<ContentType> contentTypeProtocol; private final Protocol<ContentType> contentTypeProtocol;


Expand All @@ -41,17 +40,18 @@ public ContentTypeDispatcher( Protocol<ContentType> contentTypeProtocol )
} }


@Override @Override
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) public void channelRead( ChannelHandlerContext ctx, Object msg )
{ {
if ( contentTypeProtocol.isExpecting( ContentType.ContentType ) ) if ( contentTypeProtocol.isExpecting( ContentType.ContentType ) )
{ {
byte messageCode = in.readByte(); byte messageCode = ((ByteBuf) msg).readByte();
ContentType contentType = getContentType( messageCode ); ContentType contentType = getContentType( messageCode );
contentTypeProtocol.expect( contentType ); contentTypeProtocol.expect( contentType );
ReferenceCountUtil.release( msg );
} }
else else
{ {
ctx.fireChannelRead( in.retain() ); ctx.fireChannelRead( msg );
} }
} }


Expand Down
Expand Up @@ -35,13 +35,9 @@


public class DecodingDispatcher extends RequestDecoderDispatcher<ContentType> public class DecodingDispatcher extends RequestDecoderDispatcher<ContentType>
{ {
private final ChunkHandler chunkHandler; public DecodingDispatcher( Protocol<ContentType> protocol, LogProvider logProvider )

public DecodingDispatcher( Protocol<ContentType> protocol, LogProvider logProvider, ChunkHandler chunkHandler )
{ {
super( protocol, logProvider ); super( protocol, logProvider );
this.chunkHandler = chunkHandler;

register( ContentType.ContentType, new ByteToMessageDecoder() register( ContentType.ContentType, new ByteToMessageDecoder()
{ {
@Override @Override
Expand All @@ -54,14 +50,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
} }
} ); } );
register( ContentType.RaftLogEntryTerms, new RaftLogEntryTermsDecoder( protocol ) ); register( ContentType.RaftLogEntryTerms, new RaftLogEntryTermsDecoder( protocol ) );
register( ContentType.ReplicatedContent, new ReplicatedContentChunkDecoder( chunkHandler ) ); register( ContentType.ReplicatedContent, new ReplicatedContentChunkDecoder() );
register( ContentType.Message, new RaftMessageDecoder( protocol ) ); register( ContentType.Message, new RaftMessageDecoder( protocol ) );
} }

@Override
public void channelInactive( ChannelHandlerContext ctx )
{
chunkHandler.close();
ctx.fireChannelInactive();
}
} }
Expand Up @@ -23,6 +23,7 @@
package org.neo4j.causalclustering.messaging.marshalling.v2.decoding; package org.neo4j.causalclustering.messaging.marshalling.v2.decoding;


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;


Expand All @@ -33,35 +34,35 @@
public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder public class ReplicatedContentChunkDecoder extends ByteToMessageDecoder
{ {
private final CoreReplicatedContentMarshal contentMarshal = new CoreReplicatedContentMarshal(); private final CoreReplicatedContentMarshal contentMarshal = new CoreReplicatedContentMarshal();
private final ChunkHandler chunkHandler;


ReplicatedContentChunkDecoder( ChunkHandler chunkHandler ) ReplicatedContentChunkDecoder()
{ {
this.chunkHandler = chunkHandler; setCumulator( new ContentChunkCumulator() );
} }


@Override @Override
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception
{ {
try in.markReaderIndex();
boolean isLast = in.readBoolean();
if ( isLast )
{ {
ChunkHandler.ComposedChunks complete = chunkHandler.handle( in ); out.add( contentMarshal.decode( in.readByte(), in ) );
if ( complete != null )
{
out.add( contentMarshal.decode( complete ) );
}
} }
catch ( Throwable e ) else
{ {
try in.resetReaderIndex();
{ }
chunkHandler.close(); }
}
catch ( Throwable t1 ) private static class ContentChunkCumulator implements Cumulator
{ {
e.addSuppressed( t1 ); @Override
} public ByteBuf cumulate( ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in )
throw e; {
boolean isLast = in.readBoolean();
cumulation.setBoolean( 0, isLast );
return COMPOSITE_CUMULATOR.cumulate( alloc, cumulation, in.slice() );
} }
} }
} }
Expand Up @@ -50,24 +50,18 @@ public void shouldProvideExpectedMetaData() throws IOException


// is not last // is not last
assertFalse( byteBuf.readBoolean() ); assertFalse( byteBuf.readBoolean() );
// first chunk unknown length
assertEquals( 8, byteBuf.readInt() );
// first chunk has content // first chunk has content
assertEquals( (byte) 1, byteBuf.readByte() ); assertEquals( (byte) 1, byteBuf.readByte() );
byteBuf.release(); byteBuf.release();


byteBuf = replicatedContent.readChunk( allocator ); byteBuf = replicatedContent.readChunk( allocator );
// is not last // is not last
assertFalse( byteBuf.readBoolean() ); assertFalse( byteBuf.readBoolean() );
// second chunk has length -1
assertEquals( 8, byteBuf.readInt() );
byteBuf.release(); byteBuf.release();


byteBuf = replicatedContent.readChunk( allocator ); byteBuf = replicatedContent.readChunk( allocator );
// is last // is last
assertTrue( byteBuf.readBoolean() ); assertTrue( byteBuf.readBoolean() );
// third chunk has a length
assertEquals( 8, byteBuf.readInt() );
byteBuf.release(); byteBuf.release();


assertNull( replicatedContent.readChunk( allocator ) ); assertNull( replicatedContent.readChunk( allocator ) );
Expand Down

0 comments on commit ed46113

Please sign in to comment.