Skip to content

Commit

Permalink
Negotiate, install modifiers to protocols during handshake
Browse files Browse the repository at this point in the history
InstalledProtocolsProcedure includes modifier protocols

Split modifier protocols and NettyPipelineBuilder into server/client.
  • Loading branch information
andrewkerr9000 committed Mar 6, 2018
1 parent fee2492 commit 5de1537
Show file tree
Hide file tree
Showing 51 changed files with 2,491 additions and 564 deletions.
Expand Up @@ -64,11 +64,12 @@
import org.neo4j.causalclustering.messaging.Outbound; import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.messaging.RaftOutbound; import org.neo4j.causalclustering.messaging.RaftOutbound;
import org.neo4j.causalclustering.messaging.SenderService; import org.neo4j.causalclustering.messaging.SenderService;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller; import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer; import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ProtocolRepository; import org.neo4j.causalclustering.protocol.handshake.ProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.com.storecopy.StoreUtil; import org.neo4j.com.storecopy.StoreUtil;
Expand Down Expand Up @@ -120,6 +121,7 @@


import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.raft_messages_log_path; import static org.neo4j.causalclustering.core.CausalClusteringSettings.raft_messages_log_path;
import static org.neo4j.helpers.collection.Iterators.asSet;


/** /**
* This implementation of {@link org.neo4j.kernel.impl.factory.EditionModule} creates the implementations of services * This implementation of {@link org.neo4j.kernel.impl.factory.EditionModule} creates the implementations of services
Expand Down Expand Up @@ -234,11 +236,15 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,


long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis(); long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis();


ProtocolRepository protocolRepository = new ProtocolRepository( Protocol.Protocols.values() ); ProtocolRepository<Protocol.ApplicationProtocol> applicationProtocolRepository = new ProtocolRepository<>( Protocol.ApplicationProtocols.values() );
ProtocolRepository<Protocol.ModifierProtocol> modifierProtocolRepository = new ProtocolRepository<>( Protocol.ModifierProtocols.values() );
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository = ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository =
new ProtocolInstallerRepository<>( singletonList( new RaftProtocolClientInstaller( logProvider, clientPipelineBuilderFactory ) ) ); new ProtocolInstallerRepository<>(
HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( logProvider, protocolRepository, Protocol.Identifier.RAFT, singletonList( new RaftProtocolClientInstaller.Factory( clientPipelineBuilderFactory, logProvider ) ),
protocolInstallerRepository, config, clientPipelineBuilderFactory ); ModifierProtocolInstaller.allClientInstallers );
HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( applicationProtocolRepository,
Protocol.ApplicationProtocolIdentifier.RAFT, modifierProtocolRepository, asSet( Protocol.ModifierProtocolIdentifier.COMPRESSION ),
protocolInstallerRepository, clientPipelineBuilderFactory, config, logProvider );
final SenderService raftSender = new SenderService( channelInitializer, logProvider ); final SenderService raftSender = new SenderService( channelInitializer, logProvider );
life.add( raftSender ); life.add( raftSender );
this.clientInstalledProtocols = raftSender::installedProtocols; this.clientInstalledProtocols = raftSender::installedProtocols;
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.neo4j.causalclustering.messaging.ComposableMessageHandler; import org.neo4j.causalclustering.messaging.ComposableMessageHandler;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler; import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.messaging.LoggingInbound; import org.neo4j.causalclustering.messaging.LoggingInbound;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller; import org.neo4j.causalclustering.protocol.ProtocolInstaller;
Expand Down Expand Up @@ -84,15 +85,18 @@ class RaftServerModule
private RaftServer createRaftServer( CoreServerModule coreServerModule, private RaftServer createRaftServer( CoreServerModule coreServerModule,
LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain ) LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> messageHandlerChain )
{ {
ProtocolRepository protocolRepository = new ProtocolRepository( Protocol.Protocols.values() ); ProtocolRepository<Protocol.ApplicationProtocol> applicationProtocolRepository = new ProtocolRepository<>( Protocol.ApplicationProtocols.values() );
ProtocolRepository<Protocol.ModifierProtocol> modifierProtocolRepository = new ProtocolRepository<>( Protocol.ModifierProtocols.values() );


RaftMessageNettyHandler nettyHandler = new RaftMessageNettyHandler( logProvider ); RaftMessageNettyHandler nettyHandler = new RaftMessageNettyHandler( logProvider );
RaftProtocolServerInstaller raftProtocolServerInstaller = new RaftProtocolServerInstaller( nettyHandler, pipelineBuilderFactory, logProvider ); RaftProtocolServerInstaller.Factory raftProtocolServerInstaller =
new RaftProtocolServerInstaller.Factory( nettyHandler, pipelineBuilderFactory, logProvider );
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository = ProtocolInstallerRepository<ProtocolInstaller.Orientation.Server> protocolInstallerRepository =
new ProtocolInstallerRepository<>( singletonList( raftProtocolServerInstaller ) ); new ProtocolInstallerRepository<>( singletonList( raftProtocolServerInstaller ), ModifierProtocolInstaller.allServerInstallers );


HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( logProvider, protocolRepository, Protocol.Identifier.RAFT, HandshakeServerInitializer handshakeServerInitializer =
protocolInstallerRepository, pipelineBuilderFactory ); new HandshakeServerInitializer( logProvider, applicationProtocolRepository, modifierProtocolRepository,
Protocol.ApplicationProtocolIdentifier.RAFT, protocolInstallerRepository, pipelineBuilderFactory );
RaftServer raftServer = new RaftServer( handshakeServerInitializer, platformModule.config, RaftServer raftServer = new RaftServer( handshakeServerInitializer, platformModule.config,
logProvider, platformModule.logging.getUserLogProvider() ); logProvider, platformModule.logging.getUserLogProvider() );


Expand Down
Expand Up @@ -21,32 +21,63 @@


import io.netty.channel.Channel; import io.netty.channel.Channel;


import java.util.List;
import java.util.stream.Collectors;

import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal; import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.marshalling.RaftMessageEncoder; import org.neo4j.causalclustering.messaging.marshalling.RaftMessageEncoder;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller; import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstaller.Orientation;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


public class RaftProtocolClientInstaller extends ProtocolInstaller<ProtocolInstaller.Orientation.Client> public class RaftProtocolClientInstaller implements ProtocolInstaller<Orientation.Client>
{ {
private static final Protocol.ApplicationProtocols APPLICATION_PROTOCOL = Protocol.ApplicationProtocols.RAFT_1;

public static class Factory extends ProtocolInstaller.Factory<Orientation.Client, RaftProtocolClientInstaller>
{
public Factory( NettyPipelineBuilderFactory clientPipelineBuilderFactory, LogProvider logProvider )
{
super( APPLICATION_PROTOCOL,
modifiers -> new RaftProtocolClientInstaller( clientPipelineBuilderFactory, modifiers, logProvider ) );
}
}

private final List<ModifierProtocolInstaller<Orientation.Client>> modifiers;
private final Log log; private final Log log;
private final NettyPipelineBuilderFactory clientPipelineBuilderFactory; private final NettyPipelineBuilderFactory clientPipelineBuilderFactory;


public RaftProtocolClientInstaller( LogProvider logProvider, NettyPipelineBuilderFactory clientPipelineBuilderFactory ) public RaftProtocolClientInstaller( NettyPipelineBuilderFactory clientPipelineBuilderFactory, List<ModifierProtocolInstaller<Orientation.Client>> modifiers,
LogProvider logProvider )
{ {
super( Protocol.Protocols.RAFT_1 ); this.modifiers = modifiers;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.clientPipelineBuilderFactory = clientPipelineBuilderFactory; this.clientPipelineBuilderFactory = clientPipelineBuilderFactory;
} }


@Override @Override
public void install( Channel channel ) throws Exception public void install( Channel channel ) throws Exception
{ {
clientPipelineBuilderFactory.create( channel, log ) clientPipelineBuilderFactory.client( channel, log )
.modify( modifiers )
.addFraming() .addFraming()
.add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) ) .add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) )
.install(); .install();
} }

@Override
public Protocol.ApplicationProtocol applicationProtocol()
{
return APPLICATION_PROTOCOL;
}

@Override
public List<Protocol.ModifierProtocol> modifiers()
{
return modifiers.stream().map( ModifierProtocolInstaller::protocol ).collect( Collectors.toList() );
}
} }
Expand Up @@ -23,36 +23,66 @@
import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandler;


import java.time.Clock; import java.time.Clock;
import java.util.List;
import java.util.stream.Collectors;


import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal; import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.marshalling.RaftMessageDecoder; import org.neo4j.causalclustering.messaging.marshalling.RaftMessageDecoder;
import org.neo4j.causalclustering.protocol.ModifierProtocolInstaller;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller; import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstaller.Orientation;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


public class RaftProtocolServerInstaller extends ProtocolInstaller<ProtocolInstaller.Orientation.Server> public class RaftProtocolServerInstaller implements ProtocolInstaller<Orientation.Server>
{ {
private static final Protocol.ApplicationProtocols APPLICATION_PROTOCOL = Protocol.ApplicationProtocols.RAFT_1;

public static class Factory extends ProtocolInstaller.Factory<Orientation.Server, RaftProtocolServerInstaller>
{
public Factory( ChannelInboundHandler raftMessageHandler, NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider logProvider )
{
super( APPLICATION_PROTOCOL,
modifiers -> new RaftProtocolServerInstaller( raftMessageHandler, pipelineBuilderFactory, modifiers, logProvider ) );
}
}

private final ChannelInboundHandler raftMessageHandler; private final ChannelInboundHandler raftMessageHandler;
private final NettyPipelineBuilderFactory pipelineBuilderFactory; private final NettyPipelineBuilderFactory pipelineBuilderFactory;
private final List<ModifierProtocolInstaller<Orientation.Server>> modifiers;
private final Log log; private final Log log;


public RaftProtocolServerInstaller( ChannelInboundHandler raftMessageHandler, NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider logProvider ) public RaftProtocolServerInstaller( ChannelInboundHandler raftMessageHandler, NettyPipelineBuilderFactory pipelineBuilderFactory,
List<ModifierProtocolInstaller<Orientation.Server>> modifiers, LogProvider logProvider )
{ {
super( Protocol.Protocols.RAFT_1 );
this.raftMessageHandler = raftMessageHandler; this.raftMessageHandler = raftMessageHandler;
this.pipelineBuilderFactory = pipelineBuilderFactory; this.pipelineBuilderFactory = pipelineBuilderFactory;
this.modifiers = modifiers;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
} }


@Override @Override
public void install( Channel channel ) throws Exception public void install( Channel channel ) throws Exception
{ {
pipelineBuilderFactory.create( channel, log ) pipelineBuilderFactory.server( channel, log )
.modify( modifiers )
.addFraming() .addFraming()
.add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) ) .add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) )
.add( "raft_handler", raftMessageHandler ) .add( "raft_handler", raftMessageHandler )
.install(); .install();
} }

@Override
public Protocol.ApplicationProtocol applicationProtocol()
{
return APPLICATION_PROTOCOL;
}

@Override
public List<Protocol.ModifierProtocol> modifiers()
{
return modifiers.stream().map( ModifierProtocolInstaller::protocol ).collect( Collectors.toList() );
}
} }
Expand Up @@ -149,7 +149,7 @@ public Outcome handle( RaftMessages.PreVote.Response response ) throws IOExcepti
} }


@Override @Override
public Outcome handle( RaftMessages.PruneRequest pruneRequest ) throws IOException public Outcome handle( RaftMessages.PruneRequest pruneRequest )
{ {
Pruning.handlePruneRequest( outcome, pruneRequest ); Pruning.handlePruneRequest( outcome, pruneRequest );
return outcome; return outcome;
Expand All @@ -174,7 +174,7 @@ public Outcome handle( RaftMessages.Timeout.Election election ) throws IOExcepti
} }


@Override @Override
public Outcome handle( RaftMessages.Timeout.Heartbeat heartbeat ) throws IOException public Outcome handle( RaftMessages.Timeout.Heartbeat heartbeat )
{ {
return outcome; return outcome;
} }
Expand Down Expand Up @@ -242,7 +242,7 @@ public Outcome handle( RaftMessages.Timeout.Election election, Outcome outcome,
private static class PreVoteUnsupportedRefusesToLead implements ElectionTimeoutHandler private static class PreVoteUnsupportedRefusesToLead implements ElectionTimeoutHandler
{ {
@Override @Override
public Outcome handle( RaftMessages.Timeout.Election election, Outcome outcome, ReadableRaftState ctx, Log log ) throws IOException public Outcome handle( RaftMessages.Timeout.Election election, Outcome outcome, ReadableRaftState ctx, Log log )
{ {
log.info( "Election timeout triggered but refusing to be leader" ); log.info( "Election timeout triggered but refusing to be leader" );
return outcome; return outcome;
Expand All @@ -254,7 +254,7 @@ public Outcome handle( RaftMessages.Timeout.Election election, Outcome outcome,
private static class PreVoteSupportedRefusesToLeadHandler implements ElectionTimeoutHandler private static class PreVoteSupportedRefusesToLeadHandler implements ElectionTimeoutHandler
{ {
@Override @Override
public Outcome handle( RaftMessages.Timeout.Election election, Outcome outcome, ReadableRaftState ctx, Log log ) throws IOException public Outcome handle( RaftMessages.Timeout.Election election, Outcome outcome, ReadableRaftState ctx, Log log )
{ {
log.info( "Election timeout triggered but refusing to be leader" ); log.info( "Election timeout triggered but refusing to be leader" );
Set<MemberId> memberIds = ctx.votingMembers(); Set<MemberId> memberIds = ctx.votingMembers();
Expand Down Expand Up @@ -295,7 +295,7 @@ public Outcome handle( RaftMessages.PreVote.Request request, Outcome outcome, Re
private static class PreVoteRequestNoOpHandler implements PreVoteRequestHandler private static class PreVoteRequestNoOpHandler implements PreVoteRequestHandler
{ {
@Override @Override
public Outcome handle( RaftMessages.PreVote.Request request, Outcome outcome, ReadableRaftState ctx, Log log ) throws IOException public Outcome handle( RaftMessages.PreVote.Request request, Outcome outcome, ReadableRaftState ctx, Log log )
{ {
return outcome; return outcome;
} }
Expand Down Expand Up @@ -343,7 +343,7 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )
private static class PreVoteResponseNoOpHandler implements PreVoteResponseHandler private static class PreVoteResponseNoOpHandler implements PreVoteResponseHandler
{ {
@Override @Override
public Outcome handle( RaftMessages.PreVote.Response response, Outcome outcome, ReadableRaftState ctx, Log log ) throws IOException public Outcome handle( RaftMessages.PreVote.Response response, Outcome outcome, ReadableRaftState ctx, Log log )
{ {
return outcome; return outcome;
} }
Expand Down
Expand Up @@ -21,8 +21,10 @@


import java.util.Comparator; import java.util.Comparator;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;


import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller; import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.collection.RawIterator; import org.neo4j.collection.RawIterator;
Expand Down Expand Up @@ -54,7 +56,8 @@ public InstalledProtocolsProcedure( Supplier<Stream<Pair<AdvertisedSocketAddress
.out( "orientation", Neo4jTypes.NTString ) .out( "orientation", Neo4jTypes.NTString )
.out( "remoteAddress", Neo4jTypes.NTString ) .out( "remoteAddress", Neo4jTypes.NTString )
.out( "applicationProtocol", Neo4jTypes.NTString ) .out( "applicationProtocol", Neo4jTypes.NTString )
.out( "version", Neo4jTypes.NTInteger ) .out( "applicationProtocolVersion", Neo4jTypes.NTInteger )
.out( "modifierProtocols", Neo4jTypes.NTString )
.description( "Overview of installed protocols" ) .description( "Overview of installed protocols" )
.build() ); .build() );
this.clientInstalledProtocols = clientInstalledProtocols; this.clientInstalledProtocols = clientInstalledProtocols;
Expand All @@ -63,7 +66,7 @@ public InstalledProtocolsProcedure( Supplier<Stream<Pair<AdvertisedSocketAddress


@Override @Override
public RawIterator<Object[],ProcedureException> apply( public RawIterator<Object[],ProcedureException> apply(
Context ctx, Object[] input, ResourceTracker resourceTracker ) throws ProcedureException Context ctx, Object[] input, ResourceTracker resourceTracker )
{ {
Stream<Object[]> outbound = toOutputRows( clientInstalledProtocols, ProtocolInstaller.Orientation.Client.OUTBOUND ); Stream<Object[]> outbound = toOutputRows( clientInstalledProtocols, ProtocolInstaller.Orientation.Client.OUTBOUND );


Expand All @@ -74,14 +77,34 @@ public RawIterator<Object[],ProcedureException> apply(


private <T extends SocketAddress> Stream<Object[]> toOutputRows( Supplier<Stream<Pair<T,ProtocolStack>>> installedProtocols, String orientation ) private <T extends SocketAddress> Stream<Object[]> toOutputRows( Supplier<Stream<Pair<T,ProtocolStack>>> installedProtocols, String orientation )
{ {
Comparator<Pair<T,ProtocolStack>> connectionInfoComparator = Comparator.comparing( ( Pair<T,ProtocolStack> entry ) -> entry.first().getHostname() )
.thenComparing( entry -> entry.first().getPort() );

return installedProtocols.get() return installedProtocols.get()
.sorted( Comparator.comparing( entry -> entry.first().toString() ) ) .sorted( connectionInfoComparator )
.map( entry -> new Object[] .map( entry -> buildRow( entry, orientation ) );
{ }
orientation,
entry.first().toString(), private <T extends SocketAddress> Object[] buildRow( Pair<T,ProtocolStack> connectionInfo, String orientation )
entry.other().applicationProtocol().identifier(), {
(long) entry.other().applicationProtocol().version() T socketAddress = connectionInfo.first();
} ); ProtocolStack protocolStack = connectionInfo.other();
return new Object[]
{
orientation,
socketAddress.toString(),
protocolStack.applicationProtocol().identifier(),
(long) protocolStack.applicationProtocol().version(),
modifierString( protocolStack )
};
}

private String modifierString( ProtocolStack protocolStack )
{
return protocolStack
.modifierProtocols()
.stream()
.map( Protocol.ModifierProtocol::friendlyName )
.collect( Collectors.joining( ",", "[", "]") );
} }
} }
Expand Up @@ -71,7 +71,7 @@ public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws E
} }


@Override @Override
public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise )
{ {
if ( !gated.test( msg ) ) if ( !gated.test( msg ) )
{ {
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.protocol;

import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldPrepender;

import org.neo4j.logging.Log;

public class ClientNettyPipelineBuilder extends NettyPipelineBuilder<ProtocolInstaller.Orientation.Client, ClientNettyPipelineBuilder>
{
ClientNettyPipelineBuilder( ChannelPipeline pipeline, Log log )
{
super( pipeline, log );
}

@Override
public ClientNettyPipelineBuilder addFraming()
{
add( "frame_encoder", new LengthFieldPrepender( 4 ) );
return this;
}
}

0 comments on commit 5de1537

Please sign in to comment.