Skip to content

Commit

Permalink
Support using native transport in CausalClusering
Browse files Browse the repository at this point in the history
If use_native_transport config is true (default = true) then use epoll
(linux) or Kqueue (macos) if possible.
  • Loading branch information
RagnarW committed Sep 28, 2018
1 parent 24dce65 commit f28a269
Show file tree
Hide file tree
Showing 18 changed files with 338 additions and 32 deletions.
Expand Up @@ -137,7 +137,7 @@ private CatchUpClient catchUpClient( Config config )
NettyPipelineBuilderFactory clientPipelineBuilderFactory = new NettyPipelineBuilderFactory( createPipelineWrapper( config ) ); NettyPipelineBuilderFactory clientPipelineBuilderFactory = new NettyPipelineBuilderFactory( createPipelineWrapper( config ) );
Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );
return new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, handshakeTimeout, return new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, handshakeTimeout,
logProvider, logProvider, clock ).build(); logProvider, logProvider, clock ).useNativeTransport( false ).build();
} }


private static BackupDelegator backupDelegator( private static BackupDelegator backupDelegator(
Expand Down
Expand Up @@ -27,9 +27,8 @@
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


import java.net.ConnectException; import java.net.ConnectException;
import java.time.Clock; import java.time.Clock;
Expand All @@ -39,13 +38,16 @@
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.causalclustering.messaging.CatchUpRequest; import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.causalclustering.net.BootstrapConfiguration;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.neo4j.causalclustering.net.BootstrapConfiguration.preferNativeClientConfig;
import static org.neo4j.causalclustering.net.NioBootstrapConfig.nioClientConfig;


public class CatchUpClient extends LifecycleAdapter public class CatchUpClient extends LifecycleAdapter
{ {
Expand All @@ -54,14 +56,17 @@ public class CatchUpClient extends LifecycleAdapter
private final Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer; private final Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer;


private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new ); private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new );
private final BootstrapConfiguration<? extends SocketChannel> bootstrapConfiguration;


private NioEventLoopGroup eventLoopGroup; private EventLoopGroup eventLoopGroup;


public CatchUpClient( LogProvider logProvider, Clock clock, Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer ) public CatchUpClient( LogProvider logProvider, Clock clock, Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer,
boolean useNativeTransport )
{ {
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.clock = clock; this.clock = clock;
this.channelInitializer = channelInitializer; this.channelInitializer = channelInitializer;
this.bootstrapConfiguration = useNativeTransport ? preferNativeClientConfig() : nioClientConfig();
} }


public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler ) public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler )
Expand Down Expand Up @@ -124,7 +129,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel
handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock ); handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock );
bootstrap = new Bootstrap() bootstrap = new Bootstrap()
.group( eventLoopGroup ) .group( eventLoopGroup )
.channel( NioSocketChannel.class ) .channel( bootstrapConfiguration.channelClass() )
.handler( channelInitializer.apply( handler ) ); .handler( channelInitializer.apply( handler ) );
} }


Expand Down Expand Up @@ -181,7 +186,7 @@ public void close()
@Override @Override
public void start() public void start()
{ {
eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "catch-up-client" ) ); eventLoopGroup = bootstrapConfiguration.eventLoopGroup( new NamedThreadFactory( "catch-up-client" ) );
} }


@Override @Override
Expand Down
Expand Up @@ -61,6 +61,7 @@ public class CatchupClientBuilder
private ApplicationSupportedProtocols catchupProtocols = new ApplicationSupportedProtocols( CATCHUP, emptyList() ); private ApplicationSupportedProtocols catchupProtocols = new ApplicationSupportedProtocols( CATCHUP, emptyList() );
private Collection<ModifierSupportedProtocols> modifierProtocols = emptyList(); private Collection<ModifierSupportedProtocols> modifierProtocols = emptyList();
private Clock clock = systemClock(); private Clock clock = systemClock();
private boolean useNativeTransport = true;


public CatchupClientBuilder() public CatchupClientBuilder()
{ {
Expand Down Expand Up @@ -120,6 +121,12 @@ public CatchupClientBuilder clock( Clock clock )
return this; return this;
} }


public CatchupClientBuilder useNativeTransport( boolean useNativeTransport )
{
this.useNativeTransport = useNativeTransport;
return this;
}

public CatchUpClient build() public CatchUpClient build()
{ {
ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols );
Expand All @@ -136,6 +143,6 @@ public CatchUpClient build()
handshakeTimeout, debugLogProvider, userLogProvider ); handshakeTimeout, debugLogProvider, userLogProvider );
}; };


return new CatchUpClient( debugLogProvider, clock, channelInitializer ); return new CatchUpClient( debugLogProvider, clock, channelInitializer, useNativeTransport );
} }
} }
Expand Up @@ -58,6 +58,7 @@ public class CatchupServerBuilder
private ChannelInboundHandler parentHandler; private ChannelInboundHandler parentHandler;
private ListenSocketAddress listenAddress; private ListenSocketAddress listenAddress;
private String serverName = "catchup-server"; private String serverName = "catchup-server";
private boolean useNativeTransport = true;


public CatchupServerBuilder( CatchupServerHandler catchupServerHandler ) public CatchupServerBuilder( CatchupServerHandler catchupServerHandler )
{ {
Expand Down Expand Up @@ -112,6 +113,12 @@ public CatchupServerBuilder serverName( String serverName )
return this; return this;
} }


public CatchupServerBuilder useNativeTransport( boolean useNativeTransport )
{
this.useNativeTransport = useNativeTransport;
return this;
}

public Server build() public Server build()
{ {
ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols );
Expand All @@ -126,6 +133,6 @@ public Server build()
HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository, HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, pipelineBuilder, debugLogProvider ); protocolInstallerRepository, pipelineBuilder, debugLogProvider );


return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName ); return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName, useNativeTransport );
} }
} }
Expand Up @@ -191,6 +191,9 @@ public class CausalClusteringSettings implements LoadableConfig
setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ),
NO_DEFAULT ); NO_DEFAULT );


@Description( "Use native transport if available for Linux or MacOS. If false, always use Nio" )
public static final Setting<Boolean> use_native_transport = setting( "causal_clustering.use_native_transport", BOOLEAN, TRUE );

@Description( "Type of in-flight cache." ) @Description( "Type of in-flight cache." )
public static final Setting<InFlightCacheFactory.Type> in_flight_cache_type = public static final Setting<InFlightCacheFactory.Type> in_flight_cache_type =
setting( "causal_clustering.in_flight_cache.type", options( InFlightCacheFactory.Type.class, true ), setting( "causal_clustering.in_flight_cache.type", options( InFlightCacheFactory.Type.class, true ),
Expand Down
Expand Up @@ -279,9 +279,10 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
ModifierProtocolInstaller.allClientInstallers ); ModifierProtocolInstaller.allClientInstallers );


Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );
boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport );
HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository, HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, clientPipelineBuilderFactory, handshakeTimeout, logProvider, platformModule.logging.getUserLogProvider() ); protocolInstallerRepository, clientPipelineBuilderFactory, handshakeTimeout, logProvider, platformModule.logging.getUserLogProvider() );
final SenderService raftSender = new SenderService( channelInitializer, logProvider ); final SenderService raftSender = new SenderService( channelInitializer, logProvider, useNativeTransport );
life.add( raftSender ); life.add( raftSender );
this.clientInstalledProtocols = raftSender::installedProtocols; this.clientInstalledProtocols = raftSender::installedProtocols;


Expand Down
Expand Up @@ -134,8 +134,11 @@ private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessa
protocolInstallerRepository, pipelineBuilderFactory, logProvider ); protocolInstallerRepository, pipelineBuilderFactory, logProvider );


ListenSocketAddress raftListenAddress = platformModule.config.get( CausalClusteringSettings.raft_listen_address ); ListenSocketAddress raftListenAddress = platformModule.config.get( CausalClusteringSettings.raft_listen_address );

boolean useNativeTransport = platformModule.config.get( CausalClusteringSettings.use_native_transport );

Server raftServer = new Server( handshakeServerInitializer, installedProtocolsHandler, logProvider, platformModule.logging.getUserLogProvider(), Server raftServer = new Server( handshakeServerInitializer, installedProtocolsHandler, logProvider, platformModule.logging.getUserLogProvider(),
raftListenAddress, "raft-server" ); raftListenAddress, "raft-server", useNativeTransport );


LoggingInbound<ReceivedInstantClusterIdAwareMessage<?>> loggingRaftInbound = LoggingInbound<ReceivedInstantClusterIdAwareMessage<?>> loggingRaftInbound =
new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() ); new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() );
Expand Down
Expand Up @@ -67,6 +67,7 @@ public Optional<Server> resolveIfBackupEnabled( Config config )
if ( config.get( OnlineBackupSettings.online_backup_enabled ) ) if ( config.get( OnlineBackupSettings.online_backup_enabled ) )
{ {
ListenSocketAddress backupAddress = HostnamePortAsListenAddress.resolve( config, OnlineBackupSettings.online_backup_server ); ListenSocketAddress backupAddress = HostnamePortAsListenAddress.resolve( config, OnlineBackupSettings.online_backup_server );
boolean nativeTransport = config.get( CausalClusteringSettings.use_native_transport );
logProvider.getLog( TransactionBackupServiceProvider.class ).info( "Binding backup service on address %s", backupAddress ); logProvider.getLog( TransactionBackupServiceProvider.class ).info( "Binding backup service on address %s", backupAddress );
return Optional.of( new CatchupServerBuilder( catchupServerHandler ) return Optional.of( new CatchupServerBuilder( catchupServerHandler )
.serverHandler( parentHandler ) .serverHandler( parentHandler )
Expand All @@ -77,6 +78,7 @@ public Optional<Server> resolveIfBackupEnabled( Config config )
.debugLogProvider( logProvider ) .debugLogProvider( logProvider )
.listenAddress( backupAddress ) .listenAddress( backupAddress )
.serverName( "backup-server" ) .serverName( "backup-server" )
.useNativeTransport( nativeTransport )
.build()); .build());
} }
else else
Expand Down
Expand Up @@ -169,7 +169,9 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla


this.snapshotService = new CoreSnapshotService( commandApplicationProcess, coreState, consensusModule.raftLog(), consensusModule.raftMachine() ); this.snapshotService = new CoreSnapshotService( commandApplicationProcess, coreState, consensusModule.raftLog(), consensusModule.raftMachine() );


CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory ); boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport );

CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory, useNativeTransport );
CoreStateDownloader downloader = createCoreStateDownloader( servicesToStopOnStoreCopy, catchUpClient ); CoreStateDownloader downloader = createCoreStateDownloader( servicesToStopOnStoreCopy, catchUpClient );


this.downloadService = new CoreStateDownloaderService( platformModule.jobScheduler, downloader, this.downloadService = new CoreStateDownloaderService( platformModule.jobScheduler, downloader,
Expand Down Expand Up @@ -198,6 +200,7 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S
.debugLogProvider( logProvider ) .debugLogProvider( logProvider )
.listenAddress( config.get( transaction_listen_address ) ) .listenAddress( config.get( transaction_listen_address ) )
.serverName( "catchup-server" ) .serverName( "catchup-server" )
.useNativeTransport( useNativeTransport )
.build(); .build();


TransactionBackupServiceProvider transactionBackupServiceProvider = TransactionBackupServiceProvider transactionBackupServiceProvider =
Expand All @@ -221,15 +224,15 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S
backupServer.ifPresent( servicesToStopOnStoreCopy::add ); backupServer.ifPresent( servicesToStopOnStoreCopy::add );
} }


private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory ) private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory, boolean useNativeTransport )
{ {
SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config, logProvider ); SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config, logProvider );
ApplicationSupportedProtocols supportedCatchupProtocols = supportedProtocolCreator.createSupportedCatchupProtocol(); ApplicationSupportedProtocols supportedCatchupProtocols = supportedProtocolCreator.createSupportedCatchupProtocol();
Collection<ModifierSupportedProtocols> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols(); Collection<ModifierSupportedProtocols> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols();
Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );


CatchUpClient catchUpClient = new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, CatchUpClient catchUpClient = new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory,
handshakeTimeout, logProvider, userLogProvider, systemClock() ).build(); handshakeTimeout, logProvider, userLogProvider, systemClock() ).useNativeTransport( useNativeTransport ).build();
platformModule.life.add( catchUpClient ); platformModule.life.add( catchUpClient );
return catchUpClient; return catchUpClient;
} }
Expand Down
Expand Up @@ -31,9 +31,10 @@


import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.SocketChannel;


import org.neo4j.causalclustering.net.BootstrapConfiguration;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.NamedThreadFactory;
Expand All @@ -44,9 +45,12 @@
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.neo4j.causalclustering.net.BootstrapConfiguration.preferNativeClientConfig;
import static org.neo4j.causalclustering.net.NioBootstrapConfig.nioClientConfig;


public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress,Message> public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress,Message>
{ {
private final BootstrapConfiguration<? extends SocketChannel> bootstrapConfiguration;
private ReconnectingChannels channels; private ReconnectingChannels channels;


private final ChannelInitializer channelInitializer; private final ChannelInitializer channelInitializer;
Expand All @@ -56,13 +60,14 @@ public class SenderService extends LifecycleAdapter implements Outbound<Advertis
private JobHandle jobHandle; private JobHandle jobHandle;
private boolean senderServiceRunning; private boolean senderServiceRunning;
private Bootstrap bootstrap; private Bootstrap bootstrap;
private NioEventLoopGroup eventLoopGroup; private EventLoopGroup eventLoopGroup;


public SenderService( ChannelInitializer channelInitializer, LogProvider logProvider ) public SenderService( ChannelInitializer channelInitializer, LogProvider logProvider, boolean useNativeTransport )
{ {
this.channelInitializer = channelInitializer; this.channelInitializer = channelInitializer;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.channels = new ReconnectingChannels(); this.channels = new ReconnectingChannels();
this.bootstrapConfiguration = useNativeTransport ? preferNativeClientConfig() : nioClientConfig();
} }


@Override @Override
Expand Down Expand Up @@ -131,10 +136,10 @@ public synchronized void start()
serviceLock.writeLock().lock(); serviceLock.writeLock().lock();
try try
{ {
eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "sender-service" ) ); eventLoopGroup = bootstrapConfiguration.eventLoopGroup( new NamedThreadFactory( "sender-service" ) );
bootstrap = new Bootstrap() bootstrap = new Bootstrap()
.group( eventLoopGroup ) .group( eventLoopGroup )
.channel( NioSocketChannel.class ) .channel( bootstrapConfiguration.channelClass() )
.handler( channelInitializer ); .handler( channelInitializer );


senderServiceRunning = true; senderServiceRunning = true;
Expand Down
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.net;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;

import java.util.concurrent.ThreadFactory;

public interface BootstrapConfiguration<TYPE extends Channel>
{
static BootstrapConfiguration<? extends ServerSocketChannel> preferNativeServerConfig()
{
if ( Epoll.isAvailable() )
{
return EpollBootstrapConfig.epollServerConfig();
}
else if ( KQueue.isAvailable() )
{
return KQueueBootsrapConfig.kQueueServerConfig();
}
else
{
return NioBootstrapConfig.nioServerConfig();
}
}

static BootstrapConfiguration<? extends SocketChannel> preferNativeClientConfig()
{
if ( Epoll.isAvailable() )
{
return EpollBootstrapConfig.epollClientConfig();
}
else if ( KQueue.isAvailable() )
{
return KQueueBootsrapConfig.kQueueClientConfig();
}
else
{
return NioBootstrapConfig.nioClientConfig();
}
}

EventLoopGroup eventLoopGroup( ThreadFactory threadFactory );

Class<TYPE> channelClass();
}

0 comments on commit f28a269

Please sign in to comment.