Skip to content

Commit

Permalink
Add more logging around channel handshake progress
Browse files Browse the repository at this point in the history
  • Loading branch information
martinfurmanski committed Feb 8, 2018
1 parent 2d15824 commit af42abc
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.causalclustering.protocol.handshake.ClientHandshakeException; import org.neo4j.causalclustering.protocol.handshake.ClientHandshakeException;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer; import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.handshake.HandshakeFinishedEvent; import org.neo4j.causalclustering.protocol.handshake.HandshakeFinishedEvent;
import org.neo4j.logging.Log;


/** /**
* Gates messages written before the handshake has completed. The handshake is finalized * Gates messages written before the handshake has completed. The handshake is finalized
Expand All @@ -41,19 +42,22 @@ public class HandshakeGate implements ChannelInterceptor


private final CompletableFuture<Void> handshakePromise = new CompletableFuture<>(); private final CompletableFuture<Void> handshakePromise = new CompletableFuture<>();


HandshakeGate( Channel channel ) HandshakeGate( Channel channel, Log log )
{ {
log.info( "Handshake gate added" );
channel.pipeline().addFirst( HANDSHAKE_GATE, new ChannelInboundHandlerAdapter() channel.pipeline().addFirst( HANDSHAKE_GATE, new ChannelInboundHandlerAdapter()
{ {
@Override @Override
public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception
{ {
if ( HandshakeFinishedEvent.getSuccess().equals( evt ) ) if ( HandshakeFinishedEvent.getSuccess().equals( evt ) )
{ {
log.info( "Handshake gate success" );
handshakePromise.complete( null ); handshakePromise.complete( null );
} }
else if ( HandshakeFinishedEvent.getFailure().equals( evt ) ) else if ( HandshakeFinishedEvent.getFailure().equals( evt ) )
{ {
log.warn( "Handshake gate failed" );
handshakePromise.completeExceptionally( new ClientHandshakeException( "Handshake failed" ) ); handshakePromise.completeExceptionally( new ClientHandshakeException( "Handshake failed" ) );
channel.close(); channel.close();
} }
Expand Down
Expand Up @@ -91,7 +91,7 @@ private Channel channel( AdvertisedSocketAddress destination )


if ( channel == null ) if ( channel == null )
{ {
channel = new ReconnectingChannel( bootstrap, destination, log, HandshakeGate::new ); channel = new ReconnectingChannel( bootstrap, destination, log, ch -> new HandshakeGate( ch, log ) );
channel.start(); channel.start();
ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel ); ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel );


Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.causalclustering.protocol.handshake; package org.neo4j.causalclustering.protocol.handshake;


import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


Expand Down Expand Up @@ -129,12 +128,14 @@ public void handle( SwitchOverResponse response )
future.complete( protocolStack ); future.complete( protocolStack );
} }


void checkTimeout( Duration timeout ) boolean failIfNotDone( String message )
{ {
if ( !future.isDone() ) if ( !future.isDone() )
{ {
decline( "Timed out after " + timeout ); decline( message );
return true;
} }
return false;
} }


private void decline( String message ) private void decline( String message )
Expand Down
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.messaging.SimpleNettyChannel; import org.neo4j.causalclustering.messaging.SimpleNettyChannel;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory; import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.Protocol; import org.neo4j.causalclustering.protocol.Protocol;
Expand All @@ -47,6 +49,7 @@ public class HandshakeClientInitializer extends ChannelInitializer<SocketChannel
private final Duration timeout; private final Duration timeout;
private final ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstaller; private final ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstaller;
private final NettyPipelineBuilderFactory pipelineBuilderFactory; private final NettyPipelineBuilderFactory pipelineBuilderFactory;
private final TimeoutStrategy timeoutStrategy;


public HandshakeClientInitializer( LogProvider logProvider, ProtocolRepository protocolRepository, Protocol.Identifier protocolName, public HandshakeClientInitializer( LogProvider logProvider, ProtocolRepository protocolRepository, Protocol.Identifier protocolName,
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository, Config config, ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository, Config config,
Expand All @@ -58,6 +61,7 @@ public HandshakeClientInitializer( LogProvider logProvider, ProtocolRepository p
this.timeout = config.get( CausalClusteringSettings.handshake_timeout ); this.timeout = config.get( CausalClusteringSettings.handshake_timeout );
this.protocolInstaller = protocolInstallerRepository; this.protocolInstaller = protocolInstallerRepository;
this.pipelineBuilderFactory = pipelineBuilderFactory; this.pipelineBuilderFactory = pipelineBuilderFactory;
this.timeoutStrategy = new ExponentialBackoffStrategy( 1, 2000, MILLISECONDS );
} }


private void installHandlers( Channel channel, HandshakeClient handshakeClient ) throws Exception private void installHandlers( Channel channel, HandshakeClient handshakeClient ) throws Exception
Expand All @@ -73,18 +77,20 @@ private void installHandlers( Channel channel, HandshakeClient handshakeClient )
@Override @Override
protected void initChannel( SocketChannel channel ) throws Exception protected void initChannel( SocketChannel channel ) throws Exception
{ {
log.info( "Initiating channel: " + channel );
HandshakeClient handshakeClient = new HandshakeClient(); HandshakeClient handshakeClient = new HandshakeClient();
installHandlers( channel, handshakeClient ); installHandlers( channel, handshakeClient );


scheduleHandshake( channel, handshakeClient, 0 ); scheduleHandshake( channel, handshakeClient, timeoutStrategy.newTimeout() );
scheduleTimeout( channel, handshakeClient ); scheduleTimeout( channel, handshakeClient );
} }


/** /**
* schedules the handshake initiation after the connection attempt * schedules the handshake initiation after the connection attempt
*/ */
private void scheduleHandshake( SocketChannel ch, HandshakeClient handshakeClient, long delay ) private void scheduleHandshake( SocketChannel ch, HandshakeClient handshakeClient, TimeoutStrategy.Timeout timeout )
{ {
log.info( String.format( "Scheduling handshake after: %d ms", timeout.getMillis() ) );
ch.eventLoop().schedule( () -> ch.eventLoop().schedule( () ->
{ {
if ( ch.isActive() ) if ( ch.isActive() )
Expand All @@ -93,26 +99,39 @@ private void scheduleHandshake( SocketChannel ch, HandshakeClient handshakeClien
} }
else if ( ch.isOpen() ) else if ( ch.isOpen() )
{ {
scheduleHandshake( ch, handshakeClient, delay + 1 ); timeout.increment();
scheduleHandshake( ch, handshakeClient, timeout );
} }
}, delay, MILLISECONDS ); else
{
log.warn( "Channel closed" );
handshakeClient.failIfNotDone( "Channel closed" );
}
}, timeout.getMillis(), MILLISECONDS );
} }


private void scheduleTimeout( SocketChannel ch, HandshakeClient handshakeClient ) private void scheduleTimeout( SocketChannel ch, HandshakeClient handshakeClient )
{ {
ch.eventLoop().schedule( () -> handshakeClient.checkTimeout( timeout ), timeout.toMillis(), TimeUnit.MILLISECONDS ); ch.eventLoop().schedule( () -> {
if ( handshakeClient.failIfNotDone( "Timed out after " + timeout ) )
{
log.warn( "Failed handshake after timeout" );
}
}, timeout.toMillis(), TimeUnit.MILLISECONDS );
} }


private void initiateHandshake( Channel ch, HandshakeClient handshakeClient ) private void initiateHandshake( Channel channel, HandshakeClient handshakeClient )
{ {
SimpleNettyChannel channelWrapper = new SimpleNettyChannel( ch, log ); log.info( "Initiating handshake" );
SimpleNettyChannel channelWrapper = new SimpleNettyChannel( channel, log );
CompletableFuture<ProtocolStack> handshake = handshakeClient.initiate( channelWrapper, protocolRepository, protocolName ); CompletableFuture<ProtocolStack> handshake = handshakeClient.initiate( channelWrapper, protocolRepository, protocolName );


handshake.whenComplete( ( protocolStack, failure ) -> onHandshakeComplete( protocolStack, ch, failure ) ); handshake.whenComplete( ( protocolStack, failure ) -> onHandshakeComplete( protocolStack, channel, failure ) );
} }


private void onHandshakeComplete( ProtocolStack protocolStack, Channel channel, Throwable failure ) private void onHandshakeComplete( ProtocolStack protocolStack, Channel channel, Throwable failure )
{ {
log.info( "Handshake completed" );
if ( failure != null ) if ( failure != null )
{ {
log.error( "Error when negotiating protocol stack", failure ); log.error( "Error when negotiating protocol stack", failure );
Expand All @@ -122,12 +141,14 @@ private void onHandshakeComplete( ProtocolStack protocolStack, Channel channel,
{ {
try try
{ {
log.info( "Installing: " + protocolStack );
protocolInstaller.installerFor( protocolStack.applicationProtocol() ).install( channel ); protocolInstaller.installerFor( protocolStack.applicationProtocol() ).install( channel );
log.info( "Firing handshake success event to handshake gate" );
channel.pipeline().fireUserEventTriggered( HandshakeFinishedEvent.getSuccess() ); channel.pipeline().fireUserEventTriggered( HandshakeFinishedEvent.getSuccess() );
} }
catch ( Exception e ) catch ( Exception e )
{ {
// TODO: handle better? log.error( "Error installing pipeline", e );
channel.close(); channel.close();
} }
} }
Expand Down

0 comments on commit af42abc

Please sign in to comment.