Skip to content

Commit

Permalink
Fix race with channel interceptor
Browse files Browse the repository at this point in the history
Remove the ChannelInterceptor concept since it potentially left a gap between connect()
and adding the interceptor where messages could slip by without being intercepted. Instead
the complete handshake pipeline is installed immediately from the ChannelInitializer and
the NettyPipelineBuilder has been extended with functionality to cater for the message gating.

The message gate is now a free-standing handler of its own and which does as little as
possible to fulfil its role. Closing and flusing logic has been moved outside to the
components which drive the handshake.
  • Loading branch information
martinfurmanski committed Feb 14, 2018
1 parent f35ebb8 commit 2626f03
Show file tree
Hide file tree
Showing 24 changed files with 605 additions and 428 deletions.
Expand Up @@ -46,7 +46,7 @@ public void install( Channel channel ) throws Exception
{
clientPipelineBuilderFactory.create( channel, log )
.addFraming()
.add( new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) )
.add( "raft_encoder", new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) )
.install();
}
}
Expand Up @@ -51,8 +51,8 @@ public void install( Channel channel ) throws Exception
{
pipelineBuilderFactory.create( channel, log )
.addFraming()
.add( new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) )
.add( raftMessageHandler )
.add( "raft_decoder", new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) )
.add( "raft_handler", raftMessageHandler )
.install();
}
}
Expand Up @@ -37,4 +37,6 @@ default List<ChannelHandler> handlersFor( Channel channel ) throws Exception
{
return emptyList();
}

String name();
}
Expand Up @@ -22,7 +22,6 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;

import java.util.Collections;
import java.util.List;

import org.neo4j.kernel.configuration.Config;
Expand All @@ -40,6 +39,12 @@ public List<ChannelHandler> handlersFor( Channel channel )
{
return emptyList();
}

@Override
public String name()
{
return "void";
}
};

@Override
Expand Down
Expand Up @@ -19,9 +19,7 @@
*/
package org.neo4j.causalclustering.messaging;

import io.netty.util.concurrent.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public interface Channel
{
Expand All @@ -31,24 +29,7 @@ public interface Channel

boolean isOpen();

CompletableFuture<Void> write( Object msg );

CompletableFuture<Void> writeAndFlush( Object msg );
Future<Void> write( Object msg );

static CompletableFuture<Void> convertNettyFuture( Future<?> nettyFuture )
{
CompletableFuture<Void> promise = new CompletableFuture<>();
nettyFuture.addListener( future ->
{
if ( future.isSuccess() )
{
promise.complete( null );
}
else
{
promise.completeExceptionally( future.cause() );
}
} );
return promise;
}
Future<Void> writeAndFlush( Object msg );
}

This file was deleted.

This file was deleted.

@@ -0,0 +1,101 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.messaging;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;

import org.neo4j.causalclustering.protocol.handshake.GateEvent;

/**
* Gates messages and keeps them on a queue until the gate is either
* opened successfully or closed forever.
*/
@ChannelHandler.Sharable
public class MessageGate extends ChannelDuplexHandler
{
private final Predicate<Object> gated;

private List<GatedWrite> pending = new ArrayList<>();

public MessageGate( Predicate<Object> gated )
{
this.gated = gated;
}

@Override
public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception
{
if ( evt instanceof GateEvent )
{
if ( GateEvent.getSuccess().equals( evt ) )
{
for ( GatedWrite write : pending )
{
ctx.write( write.msg, write.promise );
}

ctx.channel().pipeline().remove( this );
}

pending.clear();
pending = null;
}
else
{
super.userEventTriggered( ctx, evt );
}
}

@Override
public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception
{
if ( !gated.test( msg ) )
{
ctx.write( msg, promise );
}
else if ( pending != null )
{
pending.add( new GatedWrite( msg, promise ) );
}
else
{
promise.setFailure( new RuntimeException( "Gate failed and has been permanently closed." ) );
}
}

static class GatedWrite
{
final Object msg;
final ChannelPromise promise;

GatedWrite( Object msg, ChannelPromise promise )
{
this.msg = msg;
this.promise = promise;
}
}
}

0 comments on commit 2626f03

Please sign in to comment.