From 2626f0369685bfea5c503283fcfc5f0a8fe4f22a Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Fri, 9 Feb 2018 21:06:32 +0100 Subject: [PATCH] Fix race with channel interceptor Remove the ChannelInterceptor concept since it potentially left a gap between connect() and adding the interceptor where messages could slip by without being intercepted. Instead the complete handshake pipeline is installed immediately from the ChannelInitializer and the NettyPipelineBuilder has been extended with functionality to cater for the message gating. The message gate is now a free-standing handler of its own and which does as little as possible to fulfil its role. Closing and flusing logic has been moved outside to the components which drive the handshake. --- .../RaftProtocolClientInstaller.java | 2 +- .../RaftProtocolServerInstaller.java | 4 +- .../handlers/PipelineWrapper.java | 2 + .../handlers/VoidPipelineWrapperFactory.java | 7 +- .../causalclustering/messaging/Channel.java | 25 +-- .../messaging/ChannelInterceptor.java | 40 ----- .../messaging/HandshakeGate.java | 86 ---------- .../messaging/MessageGate.java | 101 ++++++++++++ .../messaging/ReconnectingChannel.java | 91 ++++++---- .../messaging/SenderService.java | 24 ++- .../messaging/SimpleNettyChannel.java | 10 +- .../protocol/NettyPipelineBuilder.java | 130 ++++++++++++--- .../protocol/NettyPipelineBuilderFactory.java | 4 +- .../protocol/handshake/GateEvent.java | 66 ++++++++ .../handshake/HandshakeClientInitializer.java | 22 +-- .../handshake/HandshakeServerInitializer.java | 6 +- .../protocol/handshake/ServerMessage.java | 2 +- .../messaging/ChannelTest.java | 76 --------- .../messaging/HandshakeGateTest.java | 49 ------ .../messaging/MessageGateTest.java | 155 ++++++++++++++++++ .../messaging/ReconnectingChannelIT.java | 14 +- .../messaging/SimpleChannelInterceptor.java | 56 ------- .../messaging/SimpleNettyChannelTest.java | 6 +- .../protocol/NettyPipelineBuilderTest.java | 55 ++++++- 24 files changed, 605 insertions(+), 428 deletions(-) delete mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java delete mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/MessageGate.java create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/GateEvent.java delete mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChannelTest.java delete mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/HandshakeGateTest.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/MessageGateTest.java delete mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleChannelInterceptor.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java index 679cd721efbd1..277604f9efe92 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java @@ -46,7 +46,7 @@ public void install( Channel channel ) throws Exception { clientPipelineBuilderFactory.create( channel, log ) .addFraming() - .add( new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) ) + .add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) ) .install(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java index 3cb5aad56b21c..c083a5dd84b6f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java @@ -51,8 +51,8 @@ public void install( Channel channel ) throws Exception { pipelineBuilderFactory.create( channel, log ) .addFraming() - .add( new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) ) - .add( raftMessageHandler ) + .add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) ) + .add( "raft_handler", raftMessageHandler ) .install(); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java index cfeda8e7f10bf..f03c1e64cf328 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java @@ -37,4 +37,6 @@ default List handlersFor( Channel channel ) throws Exception { return emptyList(); } + + String name(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java index 4454f70955d42..49d2a7998a71d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java @@ -22,7 +22,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; -import java.util.Collections; import java.util.List; import org.neo4j.kernel.configuration.Config; @@ -40,6 +39,12 @@ public List handlersFor( Channel channel ) { return emptyList(); } + + @Override + public String name() + { + return "void"; + } }; @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java index 55c7800625aa4..9572d56d0ce85 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java @@ -19,9 +19,7 @@ */ package org.neo4j.causalclustering.messaging; -import io.netty.util.concurrent.Future; - -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; public interface Channel { @@ -31,24 +29,7 @@ public interface Channel boolean isOpen(); - CompletableFuture write( Object msg ); - - CompletableFuture writeAndFlush( Object msg ); + Future write( Object msg ); - static CompletableFuture convertNettyFuture( Future nettyFuture ) - { - CompletableFuture promise = new CompletableFuture<>(); - nettyFuture.addListener( future -> - { - if ( future.isSuccess() ) - { - promise.complete( null ); - } - else - { - promise.completeExceptionally( future.cause() ); - } - } ); - return promise; - } + Future writeAndFlush( Object msg ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java deleted file mode 100644 index 8120a36fa9581..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.channel.Channel; -import io.netty.util.concurrent.Future; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; - -import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; - -/** - * Allows intercepting the writing to a channel. - */ -public interface ChannelInterceptor -{ - void write( BiFunction> writer, io.netty.channel.Channel channel, Object msg, - CompletableFuture promise ); - - Optional installedProtocolStack(); -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java deleted file mode 100644 index 7d4e7612e4e84..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.util.concurrent.Future; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; - -import org.neo4j.causalclustering.protocol.handshake.ClientHandshakeException; -import org.neo4j.causalclustering.protocol.handshake.ClientHandshakeFinishedEvent; -import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer; -import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; -import org.neo4j.logging.Log; - -/** - * Gates messages written before the handshake has completed. The handshake is finalized - * by firing a ClientHandshakeFinishedEvent (as a netty user event) in {@link HandshakeClientInitializer}. - */ -public class HandshakeGate implements ChannelInterceptor -{ - public static final String HANDSHAKE_GATE = "HandshakeGate"; - - private final CompletableFuture handshakePromise = new CompletableFuture<>(); - - HandshakeGate( Channel channel, Log log ) - { - log.info( "Handshake gate added" ); - channel.pipeline().addFirst( HANDSHAKE_GATE, new ChannelInboundHandlerAdapter() - { - @Override - public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception - { - if ( evt instanceof ClientHandshakeFinishedEvent.Success ) - { - log.info( "Handshake gate success" ); - handshakePromise.complete( ((ClientHandshakeFinishedEvent.Success) evt).protocolStack() ); - } - else if ( evt instanceof ClientHandshakeFinishedEvent.Failure ) - { - log.warn( "Handshake gate failed" ); - handshakePromise.completeExceptionally( new ClientHandshakeException( "Handshake failed" ) ); - channel.close(); - } - else - { - super.userEventTriggered( ctx, evt ); - } - } - } ); - } - - @Override - public void write( BiFunction> writer, Channel channel, Object msg, CompletableFuture promise ) - { - handshakePromise.whenComplete( ( ignored, failure ) -> - writer.apply( channel, msg ).addListener( x -> promise.complete( null ) ) ); - } - - @Override - public Optional installedProtocolStack() - { - return Optional.ofNullable( handshakePromise.getNow( null ) ); - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/MessageGate.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/MessageGate.java new file mode 100644 index 0000000000000..b9a72224315f4 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/MessageGate.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.messaging; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +import org.neo4j.causalclustering.protocol.handshake.GateEvent; + +/** + * Gates messages and keeps them on a queue until the gate is either + * opened successfully or closed forever. + */ +@ChannelHandler.Sharable +public class MessageGate extends ChannelDuplexHandler +{ + private final Predicate gated; + + private List pending = new ArrayList<>(); + + public MessageGate( Predicate gated ) + { + this.gated = gated; + } + + @Override + public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception + { + if ( evt instanceof GateEvent ) + { + if ( GateEvent.getSuccess().equals( evt ) ) + { + for ( GatedWrite write : pending ) + { + ctx.write( write.msg, write.promise ); + } + + ctx.channel().pipeline().remove( this ); + } + + pending.clear(); + pending = null; + } + else + { + super.userEventTriggered( ctx, evt ); + } + } + + @Override + public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception + { + if ( !gated.test( msg ) ) + { + ctx.write( msg, promise ); + } + else if ( pending != null ) + { + pending.add( new GatedWrite( msg, promise ) ); + } + else + { + promise.setFailure( new RuntimeException( "Gate failed and has been permanently closed." ) ); + } + } + + static class GatedWrite + { + final Object msg; + final ChannelPromise promise; + + GatedWrite( Object msg, ChannelPromise promise ) + { + this.msg = msg; + this.promise = promise; + } + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java index d278fe0b510e9..a52f220c962d3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java @@ -22,13 +22,14 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelOutboundInvoker; -import io.netty.util.concurrent.Future; +import io.netty.channel.EventLoop; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.Promise; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; -import java.util.function.Function; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.helper.TimeoutStrategy; @@ -40,10 +41,12 @@ public class ReconnectingChannel implements Channel { + public static final AttributeKey PROTOCOL_STACK_KEY = AttributeKey.valueOf( "PROTOCOL_STACK" ); + private final Log log; private final Bootstrap bootstrap; + private final EventLoop eventLoop; private final SocketAddress destination; - private final Function channelInterceptorFactory; private final TimeoutStrategy connectionBackoffStrategy; private volatile io.netty.channel.Channel channel; @@ -52,21 +55,19 @@ public class ReconnectingChannel implements Channel private volatile boolean disposed; private TimeoutStrategy.Timeout connectionBackoff; - private ChannelInterceptor channelInterceptor; - ReconnectingChannel( Bootstrap bootstrap, final SocketAddress destination, final Log log, - Function channelInterceptorFactory ) + ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log ) { - this( bootstrap, destination, log, channelInterceptorFactory, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) ); + this( bootstrap, eventLoop, destination, log, new ExponentialBackoffStrategy( 100, 1600, MILLISECONDS ) ); } - private ReconnectingChannel( Bootstrap bootstrap, final SocketAddress destination, final Log log, - Function channelInterceptorFactory, TimeoutStrategy connectionBackoffStrategy ) + private ReconnectingChannel( Bootstrap bootstrap, EventLoop eventLoop, SocketAddress destination, final Log log, + TimeoutStrategy connectionBackoffStrategy ) { this.bootstrap = bootstrap; + this.eventLoop = eventLoop; this.destination = destination; this.log = log; - this.channelInterceptorFactory = channelInterceptorFactory; this.connectionBackoffStrategy = connectionBackoffStrategy; this.connectionBackoff = connectionBackoffStrategy.newTimeout(); } @@ -89,7 +90,6 @@ else if ( fChannel != null && !fChannel.isDone() ) fChannel = bootstrap.connect( destination.socketAddress() ); channel = fChannel.channel(); - channelInterceptor = channelInterceptorFactory.apply( channel ); fChannel.addListener( ( ChannelFuture f ) -> { @@ -131,18 +131,18 @@ public boolean isOpen() } @Override - public CompletableFuture write( Object msg ) + public Future write( Object msg ) { - return doWrite( msg, ChannelOutboundInvoker::write ); + return write( msg, false ); } @Override - public CompletableFuture writeAndFlush( Object msg ) + public Future writeAndFlush( Object msg ) { - return doWrite( msg, ChannelOutboundInvoker::writeAndFlush ); + return write( msg, true ); } - private CompletableFuture doWrite( Object msg, BiFunction> writer ) + private Future write( Object msg, boolean flush ) { if ( disposed ) { @@ -151,32 +151,67 @@ private CompletableFuture doWrite( Object msg, BiFunction promise = new CompletableFuture<>(); - channelInterceptor.write( writer, channel, msg, promise ); - return promise; + if ( flush ) + { + return channel.writeAndFlush( msg ); + } + else + { + return channel.write( msg ); + } } else { - CompletableFuture promise = new CompletableFuture<>(); + Promise promise = eventLoop.newPromise(); + BiConsumer writer; + + if ( flush ) + { + writer = ( channel, message ) -> chain( channel.writeAndFlush( msg ), promise ); + } + else + { + writer = ( channel, mmessage ) -> chain( channel.write( msg ), promise ); + } + deferredWrite( msg, fChannel, promise, true, writer ); return promise; } } + /** + * Chains a channel future to a promise. Used when the returned promise + * was not allocated through the channel and cannot be used as the + * first-hand promise for the I/O operation. + */ + private static void chain( ChannelFuture when, Promise then ) + { + when.addListener( f -> { + if ( f.isSuccess() ) + { + then.setSuccess( when.get() ); + } + else + { + then.setFailure( when.cause() ); + } + } ); + } + /** * Will try to reconnect once before giving up on a send. The reconnection *must* happen * after write was scheduled. This is necessary to provide proper ordering when a message * is sent right after the non-blocking channel was setup and before the server is ready * to accept a connection. This happens frequently in tests. */ - private void deferredWrite( Object msg, ChannelFuture channelFuture, CompletableFuture promise, boolean firstAttempt, - BiFunction> writer ) + private void deferredWrite( Object msg, ChannelFuture channelFuture, Promise promise, boolean firstAttempt, + BiConsumer writer ) { channelFuture.addListener( (ChannelFutureListener) f -> { if ( f.isSuccess() ) { - channelInterceptor.write( writer, f.channel(), msg, promise ); + writer.accept( f.channel(), msg ); } else if ( firstAttempt ) { @@ -185,14 +220,14 @@ else if ( firstAttempt ) } else { - promise.completeExceptionally( f.cause() ); + promise.setFailure( f.cause() ); } } ); } public Optional installedProtocolStack() { - return channelInterceptor.installedProtocolStack(); + return Optional.ofNullable( channel.attr( PROTOCOL_STACK_KEY ).get() ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java index bbf11a028062e..629cd637f13ab 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java @@ -20,7 +20,8 @@ package org.neo4j.causalclustering.messaging; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Stream; @@ -64,7 +65,7 @@ public SenderService( ChannelInitializer channelInitializer, LogProvider logProv @Override public void send( AdvertisedSocketAddress to, Message message, boolean block ) { - CompletableFuture future; + Future future; serviceLock.readLock().lock(); try { @@ -73,9 +74,7 @@ public void send( AdvertisedSocketAddress to, Message message, boolean block ) return; } - Channel channel = channel( to ); - - future = channel.writeAndFlush( message ); + future = channel( to ).writeAndFlush( message ); } finally { @@ -84,7 +83,18 @@ public void send( AdvertisedSocketAddress to, Message message, boolean block ) if ( block ) { - future.join(); + try + { + future.get(); + } + catch ( ExecutionException e ) + { + log.error( "Exception while sending to: " + to, e ); + } + catch ( InterruptedException e ) + { + log.info( "Interrupted while sending", e ); + } } } @@ -94,7 +104,7 @@ private Channel channel( AdvertisedSocketAddress destination ) if ( channel == null ) { - channel = new ReconnectingChannel( bootstrap, destination, log, ch -> new HandshakeGate( ch, log ) ); + channel = new ReconnectingChannel( bootstrap, eventLoopGroup.next(), destination, log ); channel.start(); ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java index ef2c28f9f8517..b56e8cc973bc4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java @@ -19,7 +19,7 @@ */ package org.neo4j.causalclustering.messaging; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import org.neo4j.logging.Log; @@ -56,17 +56,17 @@ public boolean isOpen() } @Override - public CompletableFuture write( Object msg ) + public Future write( Object msg ) { checkDisposed(); - return Channel.convertNettyFuture( channel.write( msg ) ); + return channel.write( msg ); } @Override - public CompletableFuture writeAndFlush( Object msg ) + public Future writeAndFlush( Object msg ) { checkDisposed(); - return Channel.convertNettyFuture( channel.writeAndFlush( msg ) ); + return channel.writeAndFlush( msg ); } private void checkDisposed() diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java index 58f90f1046466..0209880b29e62 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java @@ -27,28 +27,37 @@ import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.MessageToByteEncoder; import java.util.ArrayList; import java.util.List; +import java.util.function.Predicate; -import org.neo4j.causalclustering.messaging.HandshakeGate; +import org.neo4j.causalclustering.messaging.MessageGate; import org.neo4j.logging.Log; import static java.util.Arrays.asList; /** * Builder and installer of pipelines. - * + *

* Makes sures to install sane last-resort error handling and * handles the construction of common patterns, like framing. + *

+ * Do not modify the names of handlers you install. */ public class NettyPipelineBuilder { + static final String MESSAGE_GATE_NAME = "message_gate"; + private final ChannelPipeline pipeline; private final Log log; - private final List handlers = new ArrayList<>(); + private final List handlerInfos = new ArrayList<>(); + + private Predicate gatePredicate; private NettyPipelineBuilder( ChannelPipeline pipeline, Log log ) { @@ -56,27 +65,62 @@ private NettyPipelineBuilder( ChannelPipeline pipeline, Log log ) this.log = log; } + /** + * Entry point for the builder. + * + * @param pipeline The pipeline to build for. + * @param log The log used for last-resort errors occurring in the pipeline. + * @return The builder. + */ public static NettyPipelineBuilder with( ChannelPipeline pipeline, Log log ) { return new NettyPipelineBuilder( pipeline, log ); } + /** + * Adds buffer framing to the pipeline. Useful for pipelines marshalling + * complete POJOs as an example using {@link MessageToByteEncoder} and + * {@link ByteToMessageDecoder}. + */ public NettyPipelineBuilder addFraming() { - add( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) ); - add( new LengthFieldPrepender( 4 ) ); + add( "framing_decoder", new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) ); + add( "framing_encoder", new LengthFieldPrepender( 4 ) ); return this; } - public NettyPipelineBuilder add( List newHandlers ) + /** + * Adds handlers to the pipeline. + *

+ * The pipeline builder controls the internal names of the handlers in the + * pipeline and external actors are forbidden from manipulating them. + * + * @param name The name of the handler, which must be unique. + * @param newHandlers The new handlers. + * @return The builder. + */ + public NettyPipelineBuilder add( String name, List newHandlers ) { - handlers.addAll( newHandlers ); + newHandlers.stream().map( handler -> new HandlerInfo( name, handler ) ).forEachOrdered( handlerInfos::add ); return this; } - public NettyPipelineBuilder add( ChannelHandler... newHandlers ) + /** + * @see #add(String, List) + */ + public NettyPipelineBuilder add( String name, ChannelHandler... newHandlers ) + { + return add( name, asList( newHandlers ) ); + } + + public NettyPipelineBuilder addGate( Predicate gatePredicate ) { - return add( asList( newHandlers ) ); + if ( this.gatePredicate != null ) + { + throw new IllegalStateException( "Cannot have more than one gate." ); + } + this.gatePredicate = gatePredicate; + return this; } /** @@ -84,32 +128,56 @@ public NettyPipelineBuilder add( ChannelHandler... newHandlers ) */ public void install() { + ChannelHandler oldGateHandler = removeOldGate(); clear(); - handlers.forEach( pipeline::addLast ); -// installErrorHandling(); -- temporarily disabled for debugging purposes + for ( HandlerInfo info : handlerInfos ) + { + pipeline.addLast( info.name, info.handler ); + } + installGate( oldGateHandler ); + installErrorHandling(); } - private void clear() + private ChannelHandler removeOldGate() + { + if ( pipeline.get( MESSAGE_GATE_NAME ) != null ) + { + return pipeline.remove( MESSAGE_GATE_NAME ); + } + return null; + } + + private void installGate( ChannelHandler oldGateHandler ) { - pipeline.names().stream() - .filter( this::isNotDefault ) - .filter( this::isNotUserEvent ) - .forEach( pipeline::remove ); + if ( oldGateHandler != null && gatePredicate != null ) + { + throw new IllegalStateException( "Cannot have more than one gate." ); + } + else if ( gatePredicate != null ) + { + pipeline.addLast( MESSAGE_GATE_NAME, new MessageGate( gatePredicate ) ); + } + else if ( oldGateHandler != null ) + { + pipeline.addLast( MESSAGE_GATE_NAME, oldGateHandler ); + } } - private boolean isNotUserEvent( String name ) + private void clear() { - return !HandshakeGate.HANDSHAKE_GATE.equals( name ); + pipeline.names().stream().filter( this::isNotDefault ).forEach( pipeline::remove ); } private boolean isNotDefault( String name ) { + // these are netty internal handlers for head and tail return pipeline.get( name ) != null; } private void installErrorHandling() { - pipeline.addLast( new ChannelDuplexHandler() + // inbound goes in the direction from first->last + pipeline.addLast( "error_handler_tail", new ChannelDuplexHandler() { @Override public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) @@ -123,13 +191,16 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) log.error( "Unhandled inbound message: " + msg ); } + // this is the first handler for an outbound message, and attaches a listener to its promise if possible @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) { + // if the promise is a void-promise, then exceptions will instead propagate to the + // exceptionCaught handler on the outbound handler further below + if ( !promise.isVoid() ) { - promise.addListener( (ChannelFutureListener) future -> - { + promise.addListener( (ChannelFutureListener) future -> { if ( !future.isSuccess() ) { log.error( "Exception in outbound", future.cause() ); @@ -140,14 +211,17 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise } } ); - pipeline.addFirst( new ChannelOutboundHandlerAdapter() + pipeline.addFirst( "error_handler_head", new ChannelOutboundHandlerAdapter() { + // exceptions which did not get fulfilled on the promise of a write, etc. @Override public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) { log.error( "Exception in outbound", cause ); } + // netty can only handle bytes in the form of ByteBuf, so if you reach this then you are + // perhaps trying to send a POJO without having a suitable encoder @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) { @@ -162,4 +236,16 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise } } ); } + + private static class HandlerInfo + { + private final String name; + private final ChannelHandler handler; + + HandlerInfo( String name, ChannelHandler handler ) + { + this.name = name; + this.handler = handler; + } + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java index 8976413811be6..b15eacd1465c4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java @@ -39,9 +39,11 @@ public NettyPipelineBuilder create( Channel channel, Log log ) throws Exception { ChannelPipeline pipeline = channel.pipeline(); NettyPipelineBuilder builder = NettyPipelineBuilder.with( pipeline, log ); + int i = 0; for ( ChannelHandler handler : wrapper.handlersFor( channel ) ) { - builder.add( handler ); + builder.add( String.format( "%s_%d", wrapper.name(), i ), handler ); + i++; } return builder; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/GateEvent.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/GateEvent.java new file mode 100644 index 0000000000000..a00feee2d01bb --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/GateEvent.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.protocol.handshake; + +import java.util.Objects; + +public class GateEvent +{ + private final boolean isSuccess; + + private GateEvent( boolean isSuccess ) + { + this.isSuccess = isSuccess; + } + + private static GateEvent success = new GateEvent( true ); + private static GateEvent failure = new GateEvent( false ); + + public static GateEvent getSuccess() + { + return success; + } + + public static GateEvent getFailure() + { + return failure; + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + GateEvent that = (GateEvent) o; + return isSuccess == that.isSuccess; + } + + @Override + public int hashCode() + { + return Objects.hash( isSuccess ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeClientInitializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeClientInitializer.java index e675cf7740f12..dccc16d43ae7f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeClientInitializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeClientInitializer.java @@ -30,6 +30,7 @@ import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy; import org.neo4j.causalclustering.helper.TimeoutStrategy; +import org.neo4j.causalclustering.messaging.ReconnectingChannel; import org.neo4j.causalclustering.messaging.SimpleNettyChannel; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.Protocol; @@ -68,16 +69,16 @@ private void installHandlers( Channel channel, HandshakeClient handshakeClient ) { pipelineBuilderFactory.create( channel, log ) .addFraming() - .add( new ClientMessageEncoder() ) - .add( new ClientMessageDecoder() ) - .add( new NettyHandshakeClient( handshakeClient ) ) + .add( "handshake_client_encoder", new ClientMessageEncoder() ) + .add( "handshake_client_decoder", new ClientMessageDecoder() ) + .add( "handshake_client", new NettyHandshakeClient( handshakeClient ) ) + .addGate( msg -> !(msg instanceof ServerMessage) ) .install(); } @Override protected void initChannel( SocketChannel channel ) throws Exception { - log.info( "Initiating channel: " + channel ); HandshakeClient handshakeClient = new HandshakeClient(); installHandlers( channel, handshakeClient ); @@ -90,7 +91,6 @@ protected void initChannel( SocketChannel channel ) throws Exception */ private void scheduleHandshake( SocketChannel ch, HandshakeClient handshakeClient, TimeoutStrategy.Timeout timeout ) { - log.info( String.format( "Scheduling handshake after: %d ms", timeout.getMillis() ) ); ch.eventLoop().schedule( () -> { if ( ch.isActive() ) @@ -104,7 +104,6 @@ else if ( ch.isOpen() ) } else { - log.warn( "Channel closed" ); handshakeClient.failIfNotDone( "Channel closed" ); } }, timeout.getMillis(), MILLISECONDS ); @@ -122,7 +121,6 @@ private void scheduleTimeout( SocketChannel ch, HandshakeClient handshakeClient private void initiateHandshake( Channel channel, HandshakeClient handshakeClient ) { - log.info( "Initiating handshake" ); SimpleNettyChannel channelWrapper = new SimpleNettyChannel( channel, log ); CompletableFuture handshake = handshakeClient.initiate( channelWrapper, protocolRepository, protocolName ); @@ -131,11 +129,11 @@ private void initiateHandshake( Channel channel, HandshakeClient handshakeClient private void onHandshakeComplete( ProtocolStack protocolStack, Channel channel, Throwable failure ) { - log.info( "Handshake completed" ); if ( failure != null ) { log.error( "Error when negotiating protocol stack", failure ); - channel.pipeline().fireUserEventTriggered( ClientHandshakeFinishedEvent.Failure.instance() ); + channel.pipeline().fireUserEventTriggered( GateEvent.getFailure() ); + channel.close(); } else { @@ -143,8 +141,10 @@ private void onHandshakeComplete( ProtocolStack protocolStack, Channel channel, { log.info( "Installing: " + protocolStack ); protocolInstaller.installerFor( protocolStack.applicationProtocol() ).install( channel ); - log.info( "Firing handshake success event to handshake gate" ); - channel.pipeline().fireUserEventTriggered( new ClientHandshakeFinishedEvent.Success( protocolStack ) ); + channel.attr( ReconnectingChannel.PROTOCOL_STACK_KEY ).set( protocolStack ); + + channel.pipeline().fireUserEventTriggered( GateEvent.getSuccess() ); + channel.flush(); } catch ( Exception e ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeServerInitializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeServerInitializer.java index e750562bc7e80..0596d70ecf64c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeServerInitializer.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/HandshakeServerInitializer.java @@ -56,9 +56,9 @@ public void initChannel( SocketChannel ch ) throws Exception { pipelineBuilderFactory.create( ch, log ) .addFraming() - .add( new ServerMessageEncoder() ) - .add( new ServerMessageDecoder() ) - .add( createHandshakeServer( ch ) ) + .add( "handshake_server_encoder", new ServerMessageEncoder() ) + .add( "handshake_server_decoder", new ServerMessageDecoder() ) + .add( "handshake_server", createHandshakeServer( ch ) ) .install(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ServerMessage.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ServerMessage.java index 66f3bccb98160..1bf3eb079c3f5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ServerMessage.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ServerMessage.java @@ -22,7 +22,7 @@ /** * Messages to the server, generally requests. */ -interface ServerMessage +public interface ServerMessage { void dispatch( ServerMessageHandler handler ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChannelTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChannelTest.java deleted file mode 100644 index 1623e75ae74a3..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ChannelTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.util.concurrent.DefaultEventExecutor; -import io.netty.util.concurrent.Promise; -import org.junit.Test; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; - -import static org.junit.Assert.fail; - -public class ChannelTest -{ - @Test - public void shouldConvertSuccessfulFuture() throws Throwable - { - // given - CompletableFuture completableFuture = Channel.convertNettyFuture( - createPromise( promise -> promise.setSuccess( "yay" ) ) - ); - - // when - completableFuture.get(); - - // then no throw - } - - @Test( expected = IllegalArgumentException.class ) - public void shouldConvertFailedFuture() throws Throwable - { - // given - CompletableFuture completableFuture = Channel.convertNettyFuture( - createPromise( promise -> promise.setFailure( new IllegalArgumentException() ) ) - ); - - // when - try - { - completableFuture.get(); - fail( "Expected a failed future" ); - } - // then throw - catch ( ExecutionException ex ) - { - throw ex.getCause(); - } - } - - private Promise createPromise( Consumer> mutator ) - { - DefaultEventExecutor eventExecutors = new DefaultEventExecutor(); - Promise promise = eventExecutors.newPromise(); - mutator.accept( promise ); - return promise; - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/HandshakeGateTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/HandshakeGateTest.java deleted file mode 100644 index 2665ae1ba39f9..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/HandshakeGateTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.channel.Channel; -import org.junit.Test; -import org.mockito.Answers; -import org.mockito.Mockito; - -import java.util.Optional; - -import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; -import org.neo4j.logging.NullLog; - -import static co.unruly.matchers.OptionalMatchers.empty; -import static org.junit.Assert.assertThat; - -public class HandshakeGateTest -{ - @Test - public void shouldReturnNoInstalledProtocolIfHandshakeNotComplete() throws Throwable - { - // given - HandshakeGate handshakeGate = new HandshakeGate( Mockito.mock( Channel.class, Answers.RETURNS_MOCKS ), NullLog.getInstance() ); - - // when - Optional protocolStack = handshakeGate.installedProtocolStack(); - - // then - assertThat( protocolStack, empty() ); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/MessageGateTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/MessageGateTest.java new file mode 100644 index 0000000000000..65253f1bdf73d --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/MessageGateTest.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.messaging; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import org.neo4j.causalclustering.protocol.handshake.GateEvent; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class MessageGateTest +{ + private final String ALLOWED_MSG = "allowed"; + private final MessageGate gate = new MessageGate( m -> m != ALLOWED_MSG ); + private final ChannelHandlerContext ctx = mock( ChannelHandlerContext.class ); + private final Channel channel = mock( Channel.class ); + private final ChannelPipeline pipeline = mock( ChannelPipeline.class ); + + @Before + public void setup() + { + when( channel.pipeline() ).thenReturn( pipeline ); + when( ctx.channel() ).thenReturn( channel ); + } + + @Test + public void shouldLetAllowedMessagesPass() throws Exception + { + // when + ChannelPromise promise = mock( ChannelPromise.class ); + gate.write( ctx, ALLOWED_MSG, promise ); + gate.write( ctx, ALLOWED_MSG, promise ); + gate.write( ctx, ALLOWED_MSG, promise ); + + // then + verify( ctx, times( 3 ) ).write( ALLOWED_MSG, promise ); + } + + @Test + public void shouldGateMessages() throws Exception + { + // when + ChannelPromise promise = mock( ChannelPromise.class ); + gate.write( ctx, "A", promise ); + gate.write( ctx, "B", promise ); + gate.write( ctx, "C", promise ); + + // then + verify( ctx, never() ).write( any(), any() ); + } + + @Test + public void shouldLetGatedMessagesPassOnSuccess() throws Exception + { + // given + ChannelPromise promiseA = mock( ChannelPromise.class ); + ChannelPromise promiseB = mock( ChannelPromise.class ); + ChannelPromise promiseC = mock( ChannelPromise.class ); + + gate.write( ctx, "A", promiseA ); + gate.write( ctx, "B", promiseB ); + gate.write( ctx, "C", promiseC ); + verify( ctx, never() ).write( any(), any() ); + + // when + gate.userEventTriggered( ctx, GateEvent.getSuccess() ); + + // then + InOrder inOrder = Mockito.inOrder( ctx ); + inOrder.verify( ctx ).write( "A", promiseA ); + inOrder.verify( ctx ).write( "B", promiseB ); + inOrder.verify( ctx ).write( "C", promiseC ); + inOrder.verify( ctx, never() ).write( any(), any() ); + } + + @Test + public void shouldRemoveGateOnSuccess() throws Exception + { + // when + gate.userEventTriggered( ctx, GateEvent.getSuccess() ); + + // then + verify( pipeline ).remove( gate ); + } + + @Test + public void shouldNotLetGatedMessagesPassAfterFailure() throws Exception + { + // given + ChannelPromise promise = mock( ChannelPromise.class ); + gate.userEventTriggered( ctx, GateEvent.getFailure() ); + + // when + gate.write( ctx, "A", promise ); + gate.write( ctx, "B", promise ); + gate.write( ctx, "C", promise ); + + // then + verify( ctx, never() ).write( any(), any() ); + } + + @Test + public void shouldStillLetAllowedMessagePassAfterFailure() throws Exception + { + // given + ChannelPromise promise = mock( ChannelPromise.class ); + gate.userEventTriggered( ctx, GateEvent.getFailure() ); + + // when + gate.write( ctx, ALLOWED_MSG, promise ); + + // then + verify( ctx ).write( ALLOWED_MSG, promise ); + } + + @Test + public void shouldLeaveGateOnFailure() throws Exception + { + // when + gate.userEventTriggered( ctx, GateEvent.getFailure() ); + + // then + verify( pipeline, never() ).remove( any( ChannelHandler.class ) ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java index 88d90d83d1663..5bca242cacb5f 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/ReconnectingChannelIT.java @@ -32,8 +32,8 @@ import org.junit.Before; import org.junit.Test; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.neo4j.helpers.SocketAddress; @@ -70,7 +70,7 @@ public void before() { elg = new NioEventLoopGroup( 0 ); Bootstrap bootstrap = new Bootstrap().channel( NioSocketChannel.class ).group( elg ).handler( VOID_HANDLER ); - channel = new ReconnectingChannel( bootstrap, serverAddress, log, SimpleChannelInterceptor.getInstance() ); + channel = new ReconnectingChannel( bootstrap, elg.next(), serverAddress, log ); } @After @@ -90,7 +90,7 @@ public void shouldBeAbleToSendMessage() throws Exception channel.start(); // when - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); // then will be successfully completed fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -107,7 +107,7 @@ public void shouldAllowDeferredSend() throws Exception // this is benign in the sense that the test will pass in the condition where it was already connected as well // when - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); // then will be successfully completed fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -120,7 +120,7 @@ public void shouldFailSendWhenNoServer() throws Exception channel.start(); // when - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); // then will throw fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -134,7 +134,7 @@ public void shouldReconnectAfterServerComesBack() throws Exception channel.start(); // when - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); // then will not throw fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); @@ -170,7 +170,7 @@ public void shouldNotAllowSendingOnDisposedChannel() throws Exception channel.start(); // ensure we are connected - CompletableFuture fSend = channel.writeAndFlush( emptyBuffer() ); + Future fSend = channel.writeAndFlush( emptyBuffer() ); fSend.get( DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS ); assertEventually( "", server::childCount, equalTo( 1 ), DEFAULT_TIMEOUT_MS, MILLISECONDS ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleChannelInterceptor.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleChannelInterceptor.java deleted file mode 100644 index 36570acdf5bd2..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleChannelInterceptor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.causalclustering.messaging; - -import io.netty.channel.Channel; -import io.netty.util.concurrent.Future; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; -import java.util.function.Function; - -import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; - -public class SimpleChannelInterceptor implements ChannelInterceptor -{ - private static SimpleChannelInterceptor instance = new SimpleChannelInterceptor(); - - private SimpleChannelInterceptor() - { - } - - public static Function getInstance() - { - return ignored -> instance; - } - - @Override - public void write( BiFunction> writer, Channel channel, Object msg, CompletableFuture promise ) - { - writer.apply( channel, msg ).addListener( x -> promise.complete( null ) ); - } - - @Override - public Optional installedProtocolStack() - { - return Optional.empty(); - } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleNettyChannelTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleNettyChannelTest.java index 0accfa89c7816..15bc9604c3706 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleNettyChannelTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SimpleNettyChannelTest.java @@ -22,7 +22,7 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import org.neo4j.logging.NullLog; @@ -43,7 +43,7 @@ public void shouldWriteOnNettyChannel() // when Object msg = new Object(); - CompletableFuture writeComplete = channel.write( msg ); + Future writeComplete = channel.write( msg ); // then assertNull( nettyChannel.readOutbound() ); @@ -65,7 +65,7 @@ public void shouldWriteAndFlushOnNettyChannel() // when Object msg = new Object(); - CompletableFuture writeComplete = channel.writeAndFlush( msg ); + Future writeComplete = channel.writeAndFlush( msg ); // then assertTrue( writeComplete.isDone() ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderTest.java index ecf42879e4b6c..16a81744cdd82 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderTest.java @@ -19,33 +19,44 @@ */ package org.neo4j.causalclustering.protocol; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Ignore; import org.junit.Test; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.Log; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.neo4j.logging.AssertableLogProvider.inLog; -@Ignore public class NettyPipelineBuilderTest { private AssertableLogProvider logProvider = new AssertableLogProvider(); private Log log = logProvider.getLog( getClass() ); private EmbeddedChannel channel = new EmbeddedChannel(); + private ChannelHandlerAdapter EMPTY_HANDLER = new ChannelHandlerAdapter() + { + }; @Test public void shouldLogExceptionInbound() { // given RuntimeException ex = new RuntimeException(); - NettyPipelineBuilder.with( channel.pipeline(), log ).add( new ChannelInboundHandlerAdapter() + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "read_handler", new ChannelInboundHandlerAdapter() { @Override public void channelRead( ChannelHandlerContext ctx, Object msg ) @@ -93,7 +104,7 @@ public void shouldLogUnhandledMessageOutbound() public void shouldLogExceptionOutbound() { RuntimeException ex = new RuntimeException(); - NettyPipelineBuilder.with( channel.pipeline(), log ).add( new ChannelOutboundHandlerAdapter() + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "write_handler", new ChannelOutboundHandlerAdapter() { @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) @@ -113,7 +124,7 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise public void shouldLogExceptionOutboundWithVoidPromise() { RuntimeException ex = new RuntimeException(); - NettyPipelineBuilder.with( channel.pipeline(), log ).add( new ChannelOutboundHandlerAdapter() + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "write_handler", new ChannelOutboundHandlerAdapter() { @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) @@ -142,7 +153,7 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) // handled } }; - NettyPipelineBuilder.with( channel.pipeline(), log ).add( handler ).install(); + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "read_handler", handler ).install(); // when channel.writeOneInbound( msg ); @@ -164,7 +175,7 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ctx.write( ctx.alloc().buffer() ); } }; - NettyPipelineBuilder.with( channel.pipeline(), log ).add( encoder ).install(); + NettyPipelineBuilder.with( channel.pipeline(), log ).add( "write_handler", encoder ).install(); // when channel.writeAndFlush( msg ); @@ -172,4 +183,34 @@ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise // then logProvider.assertNoLoggingOccurred(); } + + @Test + public void shouldReInstallWithPreviousGate() throws Exception + { + // given + Object gatedMessage = new Object(); + + NettyPipelineBuilder builderA = NettyPipelineBuilder.with( channel.pipeline(), log ); + builderA.addGate( p -> p == gatedMessage ); + builderA.install(); + + assertEquals( 3, getHandlers( channel.pipeline() ).size() ); // head/tail error handlers also counted + assertThat( channel.pipeline().names(), + hasItems( "error_handler_head", NettyPipelineBuilder.MESSAGE_GATE_NAME, "error_handler_tail" ) ); + + // when + NettyPipelineBuilder builderB = NettyPipelineBuilder.with( channel.pipeline(), log ); + builderB.add( "my_handler", EMPTY_HANDLER ); + builderB.install(); + + // then + assertEquals( 4, getHandlers( channel.pipeline() ).size() ); // head/tail error handlers also counted + assertThat( channel.pipeline().names(), + hasItems( "error_handler_head", "my_handler", NettyPipelineBuilder.MESSAGE_GATE_NAME, "error_handler_tail" ) ); + } + + private List getHandlers( ChannelPipeline pipeline ) + { + return pipeline.names().stream().map( pipeline::get ).filter( Objects::nonNull ).collect( Collectors.toList() ); + } }