diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java index 679cd721efbd1..277604f9efe92 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java @@ -46,7 +46,7 @@ public void install( Channel channel ) throws Exception { clientPipelineBuilderFactory.create( channel, log ) .addFraming() - .add( new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) ) + .add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) ) .install(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java index 3cb5aad56b21c..c083a5dd84b6f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java @@ -51,8 +51,8 @@ public void install( Channel channel ) throws Exception { pipelineBuilderFactory.create( channel, log ) .addFraming() - .add( new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) ) - .add( raftMessageHandler ) + .add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) ) + .add( "raft_handler", raftMessageHandler ) .install(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java index cfeda8e7f10bf..f03c1e64cf328 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java @@ -37,4 +37,6 @@ default List handlersFor( Channel channel ) throws Exception { return emptyList(); } + + String name(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java index 4454f70955d42..49d2a7998a71d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java @@ -22,7 +22,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; -import java.util.Collections; import java.util.List; import org.neo4j.kernel.configuration.Config; @@ -40,6 +39,12 @@ public List handlersFor( Channel channel ) { return emptyList(); } + + @Override + public String name() + { + return "void"; + } }; @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java index 55c7800625aa4..9572d56d0ce85 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java @@ -19,9 +19,7 @@ */ package org.neo4j.causalclustering.messaging; -import io.netty.util.concurrent.Future; - -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; public interface Channel { @@ -31,24 +29,7 @@ public interface Channel boolean isOpen(); - CompletableFuture write( Object msg ); - - CompletableFuture writeAndFlush( Object msg ); + Future write( Object msg ); - static CompletableFuture convertNettyFuture( Future nettyFuture ) - { - CompletableFuture promise = new CompletableFuture<>(); - nettyFuture.addListener( future -> - { - if ( future.isSuccess() ) - { - promise.complete( null ); - } - else - { - promise.completeExceptionally( future.cause() ); - } - } ); - return promise; - } + Future writeAndFlush( Object msg ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java deleted file mode 100644 index 8120a36fa9581..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.channel.Channel; -import io.netty.util.concurrent.Future; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; - -import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; - -/** - * Allows intercepting the writing to a channel. - */ -public interface ChannelInterceptor -{ - void write( BiFunction> writer, io.netty.channel.Channel channel, Object msg, - CompletableFuture promise ); - - Optional installedProtocolStack(); -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java deleted file mode 100644 index 7d4e7612e4e84..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.util.concurrent.Future; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; - -import org.neo4j.causalclustering.protocol.handshake.ClientHandshakeException; -import org.neo4j.causalclustering.protocol.handshake.ClientHandshakeFinishedEvent; -import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer; -import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; -import org.neo4j.logging.Log; - -/** - * Gates messages written before the handshake has completed. The handshake is finalized - * by firing a ClientHandshakeFinishedEvent (as a netty user event) in {@link HandshakeClientInitializer}. - */ -public class HandshakeGate implements ChannelInterceptor -{ - public static final String HANDSHAKE_GATE = "HandshakeGate"; - - private final CompletableFuture handshakePromise = new CompletableFuture<>(); - - HandshakeGate( Channel channel, Log log ) - { - log.info( "Handshake gate added" ); - channel.pipeline().addFirst( HANDSHAKE_GATE, new ChannelInboundHandlerAdapter() - { - @Override - public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception - { - if ( evt instanceof ClientHandshakeFinishedEvent.Success ) - { - log.info( "Handshake gate success" ); - handshakePromise.complete( ((ClientHandshakeFinishedEvent.Success) evt).protocolStack() ); - } - else if ( evt instanceof ClientHandshakeFinishedEvent.Failure ) - { - log.warn( "Handshake gate failed" ); - handshakePromise.completeExceptionally( new ClientHandshakeException( "Handshake failed" ) ); - channel.close(); - } - else - { - super.userEventTriggered( ctx, evt ); - } - } - } ); - } - - @Override - public void write( BiFunction> writer, Channel channel, Object msg, CompletableFuture promise ) - { - handshakePromise.whenComplete( ( ignored, failure ) -> - writer.apply( channel, msg ).addListener( x -> promise.complete( null ) ) ); - } - - @Override - public Optional installedProtocolStack() - { - return Optional.ofNullable( handshakePromise.getNow( null ) ); - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/MessageGate.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/MessageGate.java new file mode 100644 index 0000000000000..b9a72224315f4 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/MessageGate.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.messaging; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +import org.neo4j.causalclustering.protocol.handshake.GateEvent; + +/** + * Gates messages and keeps them on a queue until the gate is either + * opened successfully or closed forever. + */ +@ChannelHandler.Sharable +public class MessageGate extends ChannelDuplexHandler +{ + private final Predicate gated; + + private List pending = new ArrayList<>(); + + public MessageGate( Predicate gated ) + { + this.gated = gated; + } + + @Override + public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception + { + if ( evt instanceof GateEvent ) + { + if ( GateEvent.getSuccess().equals( evt ) ) + { + for ( GatedWrite write : pending ) + { + ctx.write( write.msg, write.promise ); + } + + ctx.channel().pipeline().remove( this ); + } + + pending.clear(); + pending = null; + } + else + { + super.userEventTriggered( ctx, evt ); + } + } + + @Override + public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception + { + if ( !gated.test( msg ) ) + { + ctx.write( msg, promise ); + } + else if ( pending != null ) + { + pending.add( new GatedWrite( msg, promise ) ); + } + else + { + promise.setFailure( new RuntimeException( "Gate failed and has been permanently closed." ) ); + } + } + + static class GatedWrite + { + final Object msg; + final ChannelPromise promise; + + GatedWrite( Object msg, ChannelPromise promise ) + { + this.msg = msg; + this.promise = promise; + } + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java index d278fe0b510e9..a52f220c962d3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java @@ -22,13 +22,14 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelOutboundInvoker; -import io.netty.util.concurrent.Future; +import io.netty.channel.EventLoop; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.Promise; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; -import java.util.function.Function; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.helper.TimeoutStrategy; @@ -40,10 +41,12 @@ public class ReconnectingChannel implements Channel { + public static final AttributeKey PROTOCOL_STACK_KEY = AttributeKey.valueOf( "PROTOCOL_STACK" ); + private final Log log; private final Bootstrap bootstrap; + private final EventLoop eventLoop; private final SocketAddress destination; - private final Function channelInterceptorFactory; private final TimeoutStrategy connectionBackoffStrategy; private volatile io.netty.channel.Channel channel; @@ -52,21 +55,19 @@ public class ReconnectingChannel implements Channel private volatile boolean disposed; private TimeoutStrategy.Timeout connectionBackoff; - private ChannelInterceptor channelInterceptor; - ReconnectingChannel( Bootstrap bootstrap, final SocketAddress destination, final Log log, - Function channelInterceptorFactory ) + ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log ) { - this( bootstrap, destination, log, channelInterceptorFactory, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) ); + this( bootstrap, eventLoop, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) ); } - private ReconnectingChannel( Bootstrap bootstrap, final SocketAddress destination, final Log log, - Function channelInterceptorFactory, TimeoutStrategy connectionBackoffStrategy ) + private ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log, + TimeoutStrategy connectionBackoffStrategy ) { this.bootstrap = bootstrap; + this.eventLoop = eventLoop; this.destination = destination; this.log = log; - this.channelInterceptorFactory = channelInterceptorFactory; this.connectionBackoffStrategy = connectionBackoffStrategy; this.connectionBackoff = connectionBackoffStrategy.newTimeout(); } @@ -89,7 +90,6 @@ else if ( fChannel != null && !fChannel.isDone() ) fChannel = bootstrap.connect( destination.socketAddress() ); channel = fChannel.channel(); - channelInterceptor = channelInterceptorFactory.apply( channel ); fChannel.addListener( ( ChannelFuture f ) -> { @@ -131,18 +131,18 @@ public boolean isOpen() } @Override - public CompletableFuture write( Object msg ) + public Future write( Object msg ) { - return doWrite( msg, ChannelOutboundInvoker::write ); + return write( msg, false ); } @Override - public CompletableFuture writeAndFlush( Object msg ) + public Future writeAndFlush( Object msg ) { - return doWrite( msg, ChannelOutboundInvoker::writeAndFlush ); + return write( msg, true ); } - private CompletableFuture doWrite( Object msg, BiFunction> writer ) + private Future write( Object msg, boolean flush ) { if ( disposed ) { @@ -151,32 +151,67 @@ private CompletableFuture doWrite( Object msg, BiFunction promise = new CompletableFuture<>(); - channelInterceptor.write( writer, channel, msg, promise ); - return promise; + if ( flush ) + { + return channel.writeAndFlush( msg ); + } + else + { + return channel.write( msg ); + } } else { - CompletableFuture promise = new CompletableFuture<>(); + Promise promise = eventLoop.newPromise(); + BiConsumer writer; + + if ( flush ) + { + writer = ( channel, message ) -> chain( channel.writeAndFlush( msg ), promise ); + } + else + { + writer = ( channel, mmessage ) -> chain( channel.write( msg ), promise ); + } + deferredWrite( msg, fChannel, promise, true, writer ); return promise; } } + /** + * Chains a channel future to a promise. Used when the returned promise + * was not allocated through the channel and cannot be used as the + * first-hand promise for the I/O operation. + */ + private static void chain( ChannelFuture when, Promise then ) + { + when.addListener( f -> { + if ( f.isSuccess() ) + { + then.setSuccess( when.get() ); + } + else + { + then.setFailure( when.cause() ); + } + } ); + } + /** * Will try to reconnect once before giving up on a send. The reconnection *must* happen * after write was scheduled. This is necessary to provide proper ordering when a message * is sent right after the non-blocking channel was setup and before the server is ready * to accept a connection. This happens frequently in tests. */ - private void deferredWrite( Object msg, ChannelFuture channelFuture, CompletableFuture promise, boolean firstAttempt, - BiFunction> writer ) + private void deferredWrite( Object msg, ChannelFuture channelFuture, Promise promise, boolean firstAttempt, + BiConsumer writer ) { channelFuture.addListener( (ChannelFutureListener) f -> { if ( f.isSuccess() ) { - channelInterceptor.write( writer, f.channel(), msg, promise ); + writer.accept( f.channel(), msg ); } else if ( firstAttempt ) { @@ -185,14 +220,14 @@ else if ( firstAttempt ) } else { - promise.completeExceptionally( f.cause() ); + promise.setFailure( f.cause() ); } } ); } public Optional installedProtocolStack() { - return channelInterceptor.installedProtocolStack(); + return Optional.ofNullable( channel.attr( PROTOCOL_STACK_KEY ).get() ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java index bbf11a028062e..629cd637f13ab 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java @@ -20,7 +20,8 @@ package org.neo4j.causalclustering.messaging; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Stream; @@ -64,7 +65,7 @@ public SenderService( ChannelInitializer channelInitializer, LogProvider logProv @Override public void send( AdvertisedSocketAddress to, Message message, boolean block ) { - CompletableFuture future; + Future future; serviceLock.readLock().lock(); try { @@ -73,9 +74,7 @@ public void send( AdvertisedSocketAddress to, Message message, boolean block ) return; } - Channel channel = channel( to ); - - future = channel.writeAndFlush( message ); + future = channel( to ).writeAndFlush( message ); } finally { @@ -84,7 +83,18 @@ public void send( AdvertisedSocketAddress to, Message message, boolean block ) if ( block ) { - future.join(); + try + { + future.get(); + } + catch ( ExecutionException e ) + { + log.error( "Exception while sending to: " + to, e ); + } + catch ( InterruptedException e ) + { + log.info( "Interrupted while sending", e ); + } } } @@ -94,7 +104,7 @@ private Channel channel( AdvertisedSocketAddress destination ) if ( channel == null ) { - channel = new ReconnectingChannel( bootstrap, destination, log, ch -> new HandshakeGate( ch, log ) ); + channel = new ReconnectingChannel( bootstrap, eventLoopGroup.next(), destination, log ); channel.start(); ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java index ef2c28f9f8517..b56e8cc973bc4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java @@ -19,7 +19,7 @@ */ package org.neo4j.causalclustering.messaging; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import org.neo4j.logging.Log; @@ -56,17 +56,17 @@ public boolean isOpen() } @Override - public CompletableFuture write( Object msg ) + public Future write( Object msg ) { checkDisposed(); - return Channel.convertNettyFuture( channel.write( msg ) ); + return channel.write( msg ); } @Override - public CompletableFuture writeAndFlush( Object msg ) + public Future writeAndFlush( Object msg ) { checkDisposed(); - return Channel.convertNettyFuture( channel.writeAndFlush( msg ) ); + return channel.writeAndFlush( msg ); } private void checkDisposed() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java index 58f90f1046466..0209880b29e62 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java @@ -27,28 +27,37 @@ import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.MessageToByteEncoder; import java.util.ArrayList; import java.util.List; +import java.util.function.Predicate; -import org.neo4j.causalclustering.messaging.HandshakeGate; +import org.neo4j.causalclustering.messaging.MessageGate; import org.neo4j.logging.Log; import static java.util.Arrays.asList; /** * Builder and installer of pipelines. - * + *

* Makes sures to install sane last-resort error handling and * handles the construction of common patterns, like framing. + *

+ * Do not modify the names of handlers you install. */ public class NettyPipelineBuilder { + static final String MESSAGE_GATE_NAME = "message_gate"; + private final ChannelPipeline pipeline; private final Log log; - private final List handlers = new ArrayList<>(); + private final List handlerInfos = new ArrayList<>(); + + private Predicate gatePredicate; private NettyPipelineBuilder( ChannelPipeline pipeline, Log log ) { @@ -56,27 +65,62 @@ private NettyPipelineBuilder( ChannelPipeline pipeline, Log log ) this.log = log; } + /** + * Entry point for the builder. + * + * @param pipeline The pipeline to build for. + * @param log The log used for last-resort errors occurring in the pipeline. + * @return The builder. + */ public static NettyPipelineBuilder with( ChannelPipeline pipeline, Log log ) { return new NettyPipelineBuilder( pipeline, log ); } + /** + * Adds buffer framing to the pipeline. Useful for pipelines marshalling + * complete POJOs as an example using {@link MessageToByteEncoder} and + * {@link ByteToMessageDecoder}. + */ public NettyPipelineBuilder addFraming() { - add( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) ); - add( new LengthFieldPrepender( 4 ) ); + add( "framing_decoder", new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) ); + add( "framing_encoder", new LengthFieldPrepender( 4 ) ); return this; } - public NettyPipelineBuilder add( List newHandlers ) + /** + * Adds handlers to the pipeline. + *

+ * The pipeline builder controls the internal names of the handlers in the + * pipeline and external actors are forbidden from manipulating them. + * + * @param name The name of the handler, which must be unique. + * @param newHandlers The new handlers. + * @return The builder. + */ + public NettyPipelineBuilder add( String name, List newHandlers ) { - handlers.addAll( newHandlers ); + newHandlers.stream().map( handler -> new HandlerInfo( name, handler ) ).forEachOrdered( handlerInfos::add ); return this; } - public NettyPipelineBuilder add( ChannelHandler... newHandlers ) + /** + * @see #add(String, List) + */ + public NettyPipelineBuilder add( String name, ChannelHandler... newHandlers ) + { + return add( name, asList( newHandlers ) ); + } + + public NettyPipelineBuilder addGate( Predicate gatePredicate ) { - return add( asList( newHandlers ) ); + if ( this.gatePredicate != null ) + { + throw new IllegalStateException( "Cannot have more than one gate." ); + } + this.gatePredicate = gatePredicate; + return this; } /** @@ -84,32 +128,56 @@ public NettyPipelineBuilder add( ChannelHandler... newHandlers ) */ public void install() { + ChannelHandler oldGateHandler = removeOldGate(); clear(); - handlers.forEach( pipeline::addLast ); -// installErrorHandling(); -- temporarily disabled for debugging purposes + for ( HandlerInfo info : handlerInfos ) + { + pipeline.addLast( info.name, info.handler ); + } + installGate( oldGateHandler ); + installErrorHandling(); } - private void clear() + private ChannelHandler removeOldGate() + { + if ( pipeline.get( MESSAGE_GATE_NAME ) != null ) + { + return pipeline.remove( MESSAGE_GATE_NAME ); + } + return null; + } + + private void installGate( ChannelHandler oldGateHandler ) { - pipeline.names().stream() - .filter( this::isNotDefault ) - .filter( this::isNotUserEvent ) - .forEach( pipeline::remove ); + if ( oldGateHandler != null && gatePredicate != null ) + { + throw new IllegalStateException( "Cannot have more than one gate." ); + } + else if ( gatePredicate != null ) + { + pipeline.addLast( MESSAGE_GATE_NAME, new MessageGate( gatePredicate ) ); + } + else if ( oldGateHandler != null ) + { + pipeline.addLast( MESSAGE_GATE_NAME, oldGateHandler ); + } } - private boolean isNotUserEvent( String name ) + private void clear() { - return !HandshakeGate.HANDSHAKE_GATE.equals( name ); + pipeline.names().stream().filter( this::isNotDefault ).forEach( pipeline::remove ); } private boolean isNotDefault( String name ) { + // these are netty internal handlers for head and tail return pipeline.get( name ) != null; } private void installErrorHandling() { - pipeline.addLast( new ChannelDuplexHandler() + // inbound goes in the direction from first->last + pipeline.addLast( "error_handler_tail", new ChannelDuplexHandler() { @Override public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) @@ -123,13 +191,16 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) log.error( "Unhandled inbound message: " + msg ); } + // this is the first handler for an outbound message, and attaches a listener to its promise if possible @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) { + // if the promise is a void-promise, then exceptions will instead propagate to the + // exceptionCaught handler on the outbound handler further below + if ( !promise.isVoid() ) { - promise.addListener( (ChannelFutureListener) future -> - { + promise.addListener( (ChannelFutureListener) future -> { if ( !future.isSuccess() ) { log.error( "Exception in outbound", future.cause() ); @@ -140,14 +211,17 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise } } ); - pipeline.addFirst( new ChannelOutboundHandlerAdapter() + pipeline.addFirst( "error_handler_head", new ChannelOutboundHandlerAdapter() { + // exceptions which did not get fulfilled on the promise of a write, etc. @Override public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) { log.error( "Exception in outbound", cause ); } + // netty can only handle bytes in the form of ByteBuf, so if you reach this then you are + // perhaps trying to send a POJO without having a suitable encoder @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) { @@ -162,4 +236,16 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise } } ); } + + private static class HandlerInfo + { + private final String name; + private final ChannelHandler handler; + + HandlerInfo( String name, ChannelHandler handler ) + { + this.name = name; + this.handler = handler; + } + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java index 8976413811be6..b15eacd1465c4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java @@ -39,9 +39,11 @@ public NettyPipelineBuilder create( Channel channel, Log log ) throws Exception { ChannelPipeline pipeline = channel.pipeline(); NettyPipelineBuilder builder = NettyPipelineBuilder.with( pipeline, log ); + int i = 0; for ( ChannelHandler handler : wrapper.handlersFor( channel ) ) { - builder.add( handler ); + builder.add( String.format( "%s_%d", wrapper.name(), i ), handler ); + i++; } return builder; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/GateEvent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/GateEvent.java new file mode 100644 index 0000000000000..a00feee2d01bb --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/GateEvent.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.protocol.handshake; + +import java.util.Objects; + +public class GateEvent +{ + private final boolean isSuccess; + + private GateEvent( boolean isSuccess ) + { + this.isSuccess = isSuccess; + } + + private static GateEvent success = new GateEvent( true ); + private static GateEvent failure = new GateEvent( false ); + + public static GateEvent getSuccess() + { + return success; + } + + public static GateEvent getFailure() + { + return failure; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + GateEvent that = (GateEvent) o; + return isSuccess == that.isSuccess; + } + + @Override + public int hashCode() + { + return Objects.hash( isSuccess ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeClientInitializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeClientInitializer.java index e675cf7740f12..dccc16d43ae7f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeClientInitializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeClientInitializer.java @@ -30,6 +30,7 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.helper.TimeoutStrategy; +import org.neo4j.causalclustering.messaging.ReconnectingChannel; import org.neo4j.causalclustering.messaging.SimpleNettyChannel; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.Protocol; @@ -68,16 +69,16 @@ private void installHandlers( Channel channel, HandshakeClient handshakeClient ) { pipelineBuilderFactory.create( channel, log ) .addFraming() - .add( new ClientMessageEncoder() ) - .add( new ClientMessageDecoder() ) - .add( new NettyHandshakeClient( handshakeClient ) ) + .add( "handshake_client_encoder", new ClientMessageEncoder() ) + .add( "handshake_client_decoder", new ClientMessageDecoder() ) + .add( "handshake_client", new NettyHandshakeClient( handshakeClient ) ) + .addGate( msg -> !(msg instanceof ServerMessage) ) .install(); } @Override protected void initChannel( SocketChannel channel ) throws Exception { - log.info( "Initiating channel: " + channel ); HandshakeClient handshakeClient = new HandshakeClient(); installHandlers( channel, handshakeClient ); @@ -90,7 +91,6 @@ protected void initChannel( SocketChannel channel ) throws Exception */ private void scheduleHandshake( SocketChannel ch, HandshakeClient handshakeClient, TimeoutStrategy.Timeout timeout ) { - log.info( String.format( "Scheduling handshake after: %d ms", timeout.getMillis() ) ); ch.eventLoop().schedule( () -> { if ( ch.isActive() ) @@ -104,7 +104,6 @@ else if ( ch.isOpen() ) } else { - log.warn( "Channel closed" ); handshakeClient.failIfNotDone( "Channel closed" ); } }, timeout.getMillis(), MILLISECONDS ); @@ -122,7 +121,6 @@ private void scheduleTimeout( SocketChannel ch, HandshakeClient handshakeClient private void initiateHandshake( Channel channel, HandshakeClient handshakeClient ) { - log.info( "Initiating handshake" ); SimpleNettyChannel channelWrapper = new SimpleNettyChannel( channel, log ); CompletableFuture handshake = handshakeClient.initiate( channelWrapper, protocolRepository, protocolName ); @@ -131,11 +129,11 @@ private void initiateHandshake( Channel channel, HandshakeClient handshakeClient private void onHandshakeComplete( ProtocolStack protocolStack, Channel channel, Throwable failure ) { - log.info( "Handshake completed" ); if ( failure != null ) { log.error( "Error when negotiating protocol stack", failure ); - channel.pipeline().fireUserEventTriggered( ClientHandshakeFinishedEvent.Failure.instance() ); + channel.pipeline().fireUserEventTriggered( GateEvent.getFailure() ); + channel.close(); } else { @@ -143,8 +141,10 @@ private void onHandshakeComplete( ProtocolStack protocolStack, Channel channel, { log.info( "Installing: " + protocolStack ); protocolInstaller.installerFor( protocolStack.applicationProtocol() ).install( channel ); - log.info( "Firing handshake success event to handshake gate" ); - channel.pipeline().fireUserEventTriggered( new ClientHandshakeFinishedEvent.Success( protocolStack ) ); + channel.attr( ReconnectingChannel.PROTOCOL_STACK_KEY ).set( protocolStack ); + + channel.pipeline().fireUserEventTriggered( GateEvent.getSuccess() ); + channel.flush(); } catch ( Exception e ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeServerInitializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeServerInitializer.java index e750562bc7e80..0596d70ecf64c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeServerInitializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeServerInitializer.java @@ -56,9 +56,9 @@ public void initChannel( SocketChannel ch ) throws Exception { pipelineBuilderFactory.create( ch, log ) .addFraming() - .add( new ServerMessageEncoder() ) - .add( new ServerMessageDecoder() ) - .add( createHandshakeServer( ch ) ) + .add( "handshake_server_encoder", new ServerMessageEncoder() ) + .add( "handshake_server_decoder", new ServerMessageDecoder() ) + .add( "handshake_server", createHandshakeServer( ch ) ) .install(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ServerMessage.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ServerMessage.java index 66f3bccb98160..1bf3eb079c3f5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ServerMessage.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ServerMessage.java @@ -22,7 +22,7 @@ /** * Messages to the server, generally requests. */ -interface ServerMessage +public interface ServerMessage { void dispatch( ServerMessageHandler handler ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChannelTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChannelTest.java deleted file mode 100644 index 1623e75ae74a3..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChannelTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.util.concurrent.DefaultEventExecutor; -import io.netty.util.concurrent.Promise; -import org.junit.Test; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; - -import static org.junit.Assert.fail; - -public class ChannelTest -{ - @Test - public void shouldConvertSuccessfulFuture() throws Throwable - { - // given - CompletableFuture completableFuture = Channel.convertNettyFuture( - createPromise( promise -> promise.setSuccess( "yay" ) ) - ); - - // when - completableFuture.get(); - - // then no throw - } - - @Test( expected = IllegalArgumentException.class ) - public void shouldConvertFailedFuture() throws Throwable - { - // given - CompletableFuture completableFuture = Channel.convertNettyFuture( - createPromise( promise -> promise.setFailure( new IllegalArgumentException() ) ) - ); - - // when - try - { - completableFuture.get(); - fail( "Expected a failed future" ); - } - // then throw - catch ( ExecutionException ex ) - { - throw ex.getCause(); - } - } - - private Promise createPromise( Consumer> mutator ) - { - DefaultEventExecutor eventExecutors = new DefaultEventExecutor(); - Promise promise = eventExecutors.newPromise(); - mutator.accept( promise ); - return promise; - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/HandshakeGateTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/HandshakeGateTest.java deleted file mode 100644 index 2665ae1ba39f9..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/HandshakeGateTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.channel.Channel; -import org.junit.Test; -import org.mockito.Answers; -import org.mockito.Mockito; - -import java.util.Optional; - -import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; -import org.neo4j.logging.NullLog; - -import static co.unruly.matchers.OptionalMatchers.empty; -import static org.junit.Assert.assertThat; - -public class HandshakeGateTest -{ - @Test - public void shouldReturnNoInstalledProtocolIfHandshakeNotComplete() throws Throwable - { - // given - HandshakeGate handshakeGate = new HandshakeGate( Mockito.mock( Channel.class, Answers.RETURNS_MOCKS ), NullLog.getInstance() ); - - // when - Optional protocolStack = handshakeGate.installedProtocolStack(); - - // then - assertThat( protocolStack, empty() ); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/MessageGateTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/MessageGateTest.java new file mode 100644 index 0000000000000..65253f1bdf73d --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/MessageGateTest.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.messaging; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import org.neo4j.causalclustering.protocol.handshake.GateEvent; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MessageGateTest +{ + private final String ALLOWED_MSG = "allowed"; + private final MessageGate gate = new MessageGate( m -> m != ALLOWED_MSG ); + private final ChannelHandlerContext ctx = mock( ChannelHandlerContext.class ); + private final Channel channel = mock( Channel.class ); + private final ChannelPipeline pipeline = mock( ChannelPipeline.class ); + + @Before + public void setup() + { + when( channel.pipeline() ).thenReturn( pipeline ); + when( ctx.channel() ).thenReturn( channel ); + } + + @Test + public void shouldLetAllowedMessagesPass() throws Exception + { + // when + ChannelPromise promise = mock( ChannelPromise.class ); + gate.write( ctx, ALLOWED_MSG, promise ); + gate.write( ctx, ALLOWED_MSG, promise ); + gate.write( ctx, ALLOWED_MSG, promise ); + + // then + verify( ctx, times( 3 ) ).write( ALLOWED_MSG, promise ); + } + + @Test + public void shouldGateMessages() throws Exception + { + // when + ChannelPromise promise = mock( ChannelPromise.class ); + gate.write( ctx, "A", promise ); + gate.write( ctx, "B", promise ); + gate.write( ctx, "C", promise ); + + // then + verify( ctx, never() ).write( any(), any() ); + } + + @Test + public void shouldLetGatedMessagesPassOnSuccess() throws Exception + { + // given + ChannelPromise promiseA = mock( ChannelPromise.class ); + ChannelPromise promiseB = mock( ChannelPromise.class ); + ChannelPromise promiseC = mock( ChannelPromise.class ); + + gate.write( ctx, "A", promiseA ); + gate.write( ctx, "B", promiseB ); + gate.write( ctx, "C", promiseC ); + verify( ctx, never() ).write( any(), any() ); + + // when + gate.userEventTriggered( ctx, GateEvent.getSuccess() ); + + // then + InOrder inOrder = Mockito.inOrder( ctx ); + inOrder.verify( ctx ).write( "A", promiseA ); + inOrder.verify( ctx ).write( "B", promiseB ); + inOrder.verify( ctx ).write( "C", promiseC ); + inOrder.verify( ctx, never() ).write( any(), any() ); + } + + @Test + public void shouldRemoveGateOnSuccess() throws Exception + { + // when + gate.userEventTriggered( ctx, GateEvent.getSuccess() ); + + // then + verify( pipeline ).remove( gate ); + } + + @Test + public void shouldNotLetGatedMessagesPassAfterFailure() throws Exception + { + // given + ChannelPromise promise = mock( ChannelPromise.class ); + gate.userEventTriggered( ctx, GateEvent.getFailure() ); + + // when + gate.write( ctx, "A", promise ); + gate.write( ctx, "B", promise ); + gate.write( ctx, "C", promise ); + + // then + verify( ctx, never() ).write( any(), any() ); + } + + @Test + public void shouldStillLetAllowedMessagePassAfterFailure() throws Exception + { + // given + ChannelPromise promise = mock( ChannelPromise.class ); + gate.userEventTriggered( ctx, GateEvent.getFailure() ); + + // when + gate.write( ctx, ALLOWED_MSG, promise ); + + // then + verify( ctx ).write( ALLOWED_MSG, promise ); + } + + @Test + public void shouldLeaveGateOnFailure() throws Exception + { + // when + gate.userEventTriggered( ctx, GateEvent.getFailure() ); + + // then + verify( pipeline, never() ).remove( any( ChannelHandler.class ) ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java index 88d90d83d1663..5bca242cacb5f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java @@ -32,8 +32,8 @@ import org.junit.Before; import org.junit.Test; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.neo4j.helpers.SocketAddress; @@ -70,7 +70,7 @@ public void before() { elg = new NioEventLoopGroup( 0 ); Bootstrap bootstrap = new Bootstrap().channel( NioSocketChannel.class ).group( elg ).handler( VOID_HANDLER ); - channel = new ReconnectingChannel( bootstrap, serverAddress, log, SimpleChannelInterceptor.getInstance() ); + channel = new ReconnectingChannel( bootstrap, elg.next(), serverAddress, log ); } @After @@ -90,7 +90,7 @@ public void shouldBeAbleToSendMessage() throws Exception channel.start(); // when - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); // then will be successfully completed fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -107,7 +107,7 @@ public void shouldAllowDeferredSend() throws Exception // this is benign in the sense that the test will pass in the condition where it was already connected as well // when - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); // then will be successfully completed fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -120,7 +120,7 @@ public void shouldFailSendWhenNoServer() throws Exception channel.start(); // when - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); // then will throw fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -134,7 +134,7 @@ public void shouldReconnectAfterServerComesBack() throws Exception channel.start(); // when - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); // then will not throw fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -170,7 +170,7 @@ public void shouldNotAllowSendingOnDisposedChannel() throws Exception channel.start(); // ensure we are connected - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); assertEventually( "", server::childCount, equalTo( 1 ), DEFAULT_TIMEOUT_MS, MILLISECONDS ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleChannelInterceptor.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleChannelInterceptor.java deleted file mode 100644 index 36570acdf5bd2..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleChannelInterceptor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.channel.Channel; -import io.netty.util.concurrent.Future; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; -import java.util.function.Function; - -import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; - -public class SimpleChannelInterceptor implements ChannelInterceptor -{ - private static SimpleChannelInterceptor instance = new SimpleChannelInterceptor(); - - private SimpleChannelInterceptor() - { - } - - public static Function getInstance() - { - return ignored -> instance; - } - - @Override - public void write( BiFunction> writer, Channel channel, Object msg, CompletableFuture promise ) - { - writer.apply( channel, msg ).addListener( x -> promise.complete( null ) ); - } - - @Override - public Optional installedProtocolStack() - { - return Optional.empty(); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleNettyChannelTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleNettyChannelTest.java index 0accfa89c7816..15bc9604c3706 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleNettyChannelTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleNettyChannelTest.java @@ -22,7 +22,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import org.neo4j.logging.NullLog; @@ -43,7 +43,7 @@ public void shouldWriteOnNettyChannel() // when Object msg = new Object(); - CompletableFuture writeComplete = channel.write( msg ); + Future writeComplete = channel.write( msg ); // then assertNull( nettyChannel.readOutbound() ); @@ -65,7 +65,7 @@ public void shouldWriteAndFlushOnNettyChannel() // when Object msg = new Object(); - CompletableFuture writeComplete = channel.writeAndFlush( msg ); + Future writeComplete = channel.writeAndFlush( msg ); // then assertTrue( writeComplete.isDone() ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderTest.java index ecf42879e4b6c..16a81744cdd82 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderTest.java @@ -19,33 +19,44 @@ */ package org.neo4j.causalclustering.protocol; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Ignore; import org.junit.Test; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.Log; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.neo4j.logging.AssertableLogProvider.inLog; -@Ignore public class NettyPipelineBuilderTest { private AssertableLogProvider logProvider = new AssertableLogProvider(); private Log log = logProvider.getLog( getClass() ); private EmbeddedChannel channel = new EmbeddedChannel(); + private ChannelHandlerAdapter EMPTY_HANDLER = new ChannelHandlerAdapter() + { + }; @Test public void shouldLogExceptionInbound() { // given RuntimeException ex = new RuntimeException(); - NettyPipelineBuilder.with( channel.pipeline(), log ).add( new ChannelInboundHandlerAdapter() + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "read_handler", new ChannelInboundHandlerAdapter() { @Override public void channelRead( ChannelHandlerContext ctx, Object msg ) @@ -93,7 +104,7 @@ public void shouldLogUnhandledMessageOutbound() public void shouldLogExceptionOutbound() { RuntimeException ex = new RuntimeException(); - NettyPipelineBuilder.with( channel.pipeline(), log ).add( new ChannelOutboundHandlerAdapter() + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "write_handler", new ChannelOutboundHandlerAdapter() { @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) @@ -113,7 +124,7 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise public void shouldLogExceptionOutboundWithVoidPromise() { RuntimeException ex = new RuntimeException(); - NettyPipelineBuilder.with( channel.pipeline(), log ).add( new ChannelOutboundHandlerAdapter() + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "write_handler", new ChannelOutboundHandlerAdapter() { @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) @@ -142,7 +153,7 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) // handled } }; - NettyPipelineBuilder.with( channel.pipeline(), log ).add( handler ).install(); + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "read_handler", handler ).install(); // when channel.writeOneInbound( msg ); @@ -164,7 +175,7 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ctx.write( ctx.alloc().buffer() ); } }; - NettyPipelineBuilder.with( channel.pipeline(), log ).add( encoder ).install(); + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "write_handler", encoder ).install(); // when channel.writeAndFlush( msg ); @@ -172,4 +183,34 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise // then logProvider.assertNoLoggingOccurred(); } + + @Test + public void shouldReInstallWithPreviousGate() throws Exception + { + // given + Object gatedMessage = new Object(); + + NettyPipelineBuilder builderA = NettyPipelineBuilder.with( channel.pipeline(), log ); + builderA.addGate( p -> p == gatedMessage ); + builderA.install(); + + assertEquals( 3, getHandlers( channel.pipeline() ).size() ); // head/tail error handlers also counted + assertThat( channel.pipeline().names(), + hasItems( "error_handler_head", NettyPipelineBuilder.MESSAGE_GATE_NAME, "error_handler_tail" ) ); + + // when + NettyPipelineBuilder builderB = NettyPipelineBuilder.with( channel.pipeline(), log ); + builderB.add( "my_handler", EMPTY_HANDLER ); + builderB.install(); + + // then + assertEquals( 4, getHandlers( channel.pipeline() ).size() ); // head/tail error handlers also counted + assertThat( channel.pipeline().names(), + hasItems( "error_handler_head", "my_handler", NettyPipelineBuilder.MESSAGE_GATE_NAME, "error_handler_tail" ) ); + } + + private List getHandlers( ChannelPipeline pipeline ) + { + return pipeline.names().stream().map( pipeline::get ).filter( Objects::nonNull ).collect( Collectors.toList() ); + } }