Skip to content

Commit

Permalink
Removes instantiation of ByteBuf in Causal Clustering message encoders
Browse files Browse the repository at this point in the history
By extending MessageToByteEncoder instead of MessageToMessageEncouders,
 causal clustering message encoders now receive a buffer managed by
 netty, which removes the need for allocation and a hazard for
 memory leaks. Decoders are also changed to extend ByteToMessageDecoder,
 although that is for symmetry and makes no difference in terms of
 buffer management. Two exceptions exist for Decoders, because netty
 expects object addition to the out list to happen only through consumption
 of at least one byte from the incoming buffer when using a
 ByteToMessageDecoder, so in these cases MessageToMessageDecoder is
 used instead.
  • Loading branch information
digitalstain committed Oct 31, 2016
1 parent 1f093fa commit 92bc612
Show file tree
Hide file tree
Showing 27 changed files with 135 additions and 152 deletions.
Expand Up @@ -158,7 +158,8 @@ protected void initChannel( SocketChannel ch )
pipeline.addLast(
new TxPullRequestHandler( protocol, storeIdSupplier, dataSourceAvailabilitySupplier,
transactionIdStoreSupplier, logicalTransactionStoreSupplier, monitors,
logProvider ) ); pipeline.addLast( new ChunkedWriteHandler() );
logProvider ) );
pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new GetStoreRequestHandler( protocol, dataSourceSupplier,
checkPointerSupplier ) );
pipeline.addLast( new GetStoreIdRequestHandler( protocol, storeIdSupplier ) );
Expand Down
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -48,6 +49,12 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio
if ( delegate == null )
{
log.warn( "Unregistered handler for protocol %s", protocol );

/*
* Since we cannot process this message further we need to release the message as per netty doc
* see http://netty.io/wiki/reference-counted-objects.html#inbound-messages
*/
ReferenceCountUtil.release( msg );
return;
}
delegate.channelRead( ctx, msg );
Expand Down
Expand Up @@ -19,19 +19,15 @@
*/
package org.neo4j.causalclustering.catchup;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.MessageToByteEncoder;

public class RequestMessageTypeEncoder extends MessageToMessageEncoder<RequestMessageType>
public class RequestMessageTypeEncoder extends MessageToByteEncoder<RequestMessageType>
{
@Override
protected void encode( ChannelHandlerContext ctx, RequestMessageType request, List<Object> out ) throws Exception
protected void encode( ChannelHandlerContext ctx, RequestMessageType request, ByteBuf out ) throws Exception
{
ByteBuf encoded = ctx.alloc().buffer();
encoded.writeByte( request.messageType() );
out.add( encoded );
out.writeByte( request.messageType() );
}
}
Expand Up @@ -19,19 +19,15 @@
*/
package org.neo4j.causalclustering.catchup;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.MessageToByteEncoder;

public class ResponseMessageTypeEncoder extends MessageToMessageEncoder<ResponseMessageType>
public class ResponseMessageTypeEncoder extends MessageToByteEncoder<ResponseMessageType>
{
@Override
protected void encode( ChannelHandlerContext ctx, ResponseMessageType response, List<Object> out ) throws Exception
protected void encode( ChannelHandlerContext ctx, ResponseMessageType response, ByteBuf out ) throws Exception
{
ByteBuf encoded = ctx.alloc().buffer();
encoded.writeByte( response.messageType() );
out.add( encoded );
out.writeByte( response.messageType() );
}
}
Expand Up @@ -28,6 +28,11 @@
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.function.Factory;

/**
* This class extends {@link MessageToMessageDecoder} because if it extended
* {@link io.netty.handler.codec.ByteToMessageDecoder} instead the decode method would fail as no
* bytes are consumed from the ByteBuf but an object is added in the out list.
*/
class SimpleRequestDecoder extends MessageToMessageDecoder<ByteBuf>
{
private Factory<? extends Message> factory;
Expand Down
Expand Up @@ -25,6 +25,12 @@

import java.util.List;

/**
* This class does not consume bytes during the decode method. Instead, it puts a {@link FileContent} object with
* a reference to the buffer, to be consumed later. This is the reason it does not extend
* {@link io.netty.handler.codec.ByteToMessageDecoder}, since that class fails if an object is added in the out
* list but no bytes have been consumed.
*/
public class FileContentDecoder extends MessageToMessageDecoder<ByteBuf>
{
@Override
Expand Down
Expand Up @@ -21,11 +21,11 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class FileHeaderDecoder extends MessageToMessageDecoder<ByteBuf>
public class FileHeaderDecoder extends ByteToMessageDecoder
{
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
Expand Down
Expand Up @@ -19,25 +19,19 @@
*/
package org.neo4j.causalclustering.catchup.storecopy;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.MessageToByteEncoder;

public class FileHeaderEncoder extends MessageToMessageEncoder<FileHeader>
public class FileHeaderEncoder extends MessageToByteEncoder<FileHeader>
{
@Override
protected void encode( ChannelHandlerContext ctx, FileHeader msg, List<Object> out ) throws Exception
protected void encode( ChannelHandlerContext ctx, FileHeader msg, ByteBuf out ) throws Exception
{
ByteBuf buffer = ctx.alloc().buffer();

String name = msg.fileName();

buffer.writeInt( name.length() );
buffer.writeBytes( name.getBytes() );
buffer.writeLong( msg.fileLength() );

out.add( buffer );
out.writeInt( name.length() );
out.writeBytes( name.getBytes() );
out.writeLong( msg.fileLength() );
}
}
Expand Up @@ -21,17 +21,13 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.MessageToByteEncoder;

import java.util.List;

public class GetStoreIdRequestEncoder extends MessageToMessageEncoder<GetStoreIdRequest>
public class GetStoreIdRequestEncoder extends MessageToByteEncoder<GetStoreIdRequest>
{
@Override
protected void encode( ChannelHandlerContext ctx, GetStoreIdRequest msg, List<Object> out ) throws Exception
protected void encode( ChannelHandlerContext ctx, GetStoreIdRequest msg, ByteBuf out ) throws Exception
{
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeByte( 0 );
out.add( buffer );
out.writeByte( 0 );
}
}
Expand Up @@ -21,15 +21,15 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;

public class GetStoreIdResponseDecoder extends MessageToMessageDecoder<ByteBuf>
public class GetStoreIdResponseDecoder extends ByteToMessageDecoder
{
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
Expand Down
Expand Up @@ -21,15 +21,15 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;

public class GetStoreRequestDecoder extends MessageToMessageDecoder<ByteBuf>
public class GetStoreRequestDecoder extends ByteToMessageDecoder
{
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
Expand Down
Expand Up @@ -21,20 +21,16 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;
import io.netty.handler.codec.MessageToByteEncoder;

import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;

public class GetStoreRequestEncoder extends MessageToMessageEncoder<GetStoreRequest>
public class GetStoreRequestEncoder extends MessageToByteEncoder<GetStoreRequest>
{
@Override
protected void encode( ChannelHandlerContext ctx, GetStoreRequest msg, List<Object> out ) throws Exception
protected void encode( ChannelHandlerContext ctx, GetStoreRequest msg, ByteBuf out ) throws Exception
{
ByteBuf buffer = ctx.alloc().buffer();
StoreIdMarshal.INSTANCE.marshal( msg.expectedStoreId(), new NetworkFlushableChannelNetty4( buffer ) );
out.add( buffer );
StoreIdMarshal.INSTANCE.marshal( msg.expectedStoreId(), new NetworkFlushableChannelNetty4( out ) );
}
}
Expand Up @@ -21,13 +21,13 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse.Status;

public class StoreCopyFinishedResponseDecoder extends MessageToMessageDecoder<ByteBuf>
public class StoreCopyFinishedResponseDecoder extends ByteToMessageDecoder
{
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
Expand Down
Expand Up @@ -19,20 +19,16 @@
*/
package org.neo4j.causalclustering.catchup.storecopy;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.MessageToByteEncoder;

public class StoreCopyFinishedResponseEncoder extends MessageToMessageEncoder<StoreCopyFinishedResponse>
public class StoreCopyFinishedResponseEncoder extends MessageToByteEncoder<StoreCopyFinishedResponse>
{
@Override
protected void encode( ChannelHandlerContext ctx, StoreCopyFinishedResponse msg, List<Object> out ) throws Exception
protected void encode( ChannelHandlerContext ctx, StoreCopyFinishedResponse msg, ByteBuf out ) throws Exception
{
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeInt( msg.status().ordinal() );
buffer.writeLong( msg.lastCommittedTxBeforeStoreCopy() );
out.add( buffer );
}
out.writeInt( msg.status().ordinal() );
out.writeLong( msg.lastCommittedTxBeforeStoreCopy() );
}
}
Expand Up @@ -21,15 +21,15 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.NetworkReadableClosableChannelNetty4;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;

public class TxPullRequestDecoder extends MessageToMessageDecoder<ByteBuf>
public class TxPullRequestDecoder extends ByteToMessageDecoder
{
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
Expand Down
Expand Up @@ -19,23 +19,19 @@
*/
package org.neo4j.causalclustering.catchup.tx;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.MessageToByteEncoder;

import org.neo4j.causalclustering.messaging.NetworkFlushableChannelNetty4;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;

public class TxPullRequestEncoder extends MessageToMessageEncoder<TxPullRequest>
public class TxPullRequestEncoder extends MessageToByteEncoder<TxPullRequest>
{
@Override
protected void encode( ChannelHandlerContext ctx, TxPullRequest request, List<Object> out ) throws Exception
protected void encode( ChannelHandlerContext ctx, TxPullRequest request, ByteBuf out ) throws Exception
{
ByteBuf encoded = ctx.alloc().buffer();
encoded.writeLong( request.previousTxId() );
StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( encoded ) );
out.add( encoded );
out.writeLong( request.previousTxId() );
StoreIdMarshal.INSTANCE.marshal( request.expectedStoreId(), new NetworkFlushableChannelNetty4( out ) );
}
}
Expand Up @@ -21,7 +21,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

Expand All @@ -34,7 +34,7 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.transaction.log.entry.VersionAwareLogEntryReader;

public class TxPullResponseDecoder extends MessageToMessageDecoder<ByteBuf>
public class TxPullResponseDecoder extends ByteToMessageDecoder
{
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
Expand Down
Expand Up @@ -19,26 +19,21 @@
*/
package org.neo4j.causalclustering.catchup.tx;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.MessageToByteEncoder;

import org.neo4j.com.CommittedTransactionSerializer;
import org.neo4j.causalclustering.messaging.NetworkFlushableByteBuf;
import org.neo4j.causalclustering.messaging.marshalling.storeid.StoreIdMarshal;
import org.neo4j.com.CommittedTransactionSerializer;

public class TxPullResponseEncoder extends MessageToMessageEncoder<TxPullResponse>
public class TxPullResponseEncoder extends MessageToByteEncoder<TxPullResponse>
{

@Override
protected void encode( ChannelHandlerContext ctx, TxPullResponse response, List<Object> out ) throws Exception
protected void encode( ChannelHandlerContext ctx, TxPullResponse response, ByteBuf out ) throws Exception
{
ByteBuf encoded = ctx.alloc().buffer();
NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( encoded );
NetworkFlushableByteBuf channel = new NetworkFlushableByteBuf( out );
StoreIdMarshal.INSTANCE.marshal( response.storeId(), channel );
new CommittedTransactionSerializer( channel ).visit( response.tx() );
out.add( encoded );
}
}
Expand Up @@ -21,13 +21,13 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import org.neo4j.causalclustering.catchup.CatchupResult;

public class TxStreamFinishedResponseDecoder extends MessageToMessageDecoder<ByteBuf>
public class TxStreamFinishedResponseDecoder extends ByteToMessageDecoder
{
@Override
protected void decode( ChannelHandlerContext ctx, ByteBuf msg, List<Object> out ) throws Exception
Expand Down

0 comments on commit 92bc612

Please sign in to comment.