Skip to content

Commit

Permalink
Support using native transport in CausalClusering
Browse files Browse the repository at this point in the history
If use_native_transport config is true (default = true) then use epoll
(linux) or Kqueue (macos) if possible.
  • Loading branch information
RagnarW committed Sep 28, 2018
1 parent 7863a1b commit 04b69ad
Show file tree
Hide file tree
Showing 18 changed files with 338 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private CatchUpClient catchUpClient( Config config )
NettyPipelineBuilderFactory clientPipelineBuilderFactory = new NettyPipelineBuilderFactory( createPipelineWrapper( config ) );
Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );
return new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, handshakeTimeout,
logProvider, logProvider, clock ).build();
logProvider, logProvider, clock ).useNativeTransport( false ).build();
}

private static BackupDelegator backupDelegator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.ConnectException;
import java.time.Clock;
Expand All @@ -39,13 +38,16 @@
import java.util.function.Function;

import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.causalclustering.net.BootstrapConfiguration;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.neo4j.causalclustering.net.BootstrapConfiguration.preferNativeClientConfig;
import static org.neo4j.causalclustering.net.NioBootstrapConfig.nioClientConfig;

public class CatchUpClient extends LifecycleAdapter
{
Expand All @@ -54,14 +56,17 @@ public class CatchUpClient extends LifecycleAdapter
private final Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer;

private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new );
private final BootstrapConfiguration<? extends SocketChannel> bootstrapConfiguration;

private NioEventLoopGroup eventLoopGroup;
private EventLoopGroup eventLoopGroup;

public CatchUpClient( LogProvider logProvider, Clock clock, Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer )
public CatchUpClient( LogProvider logProvider, Clock clock, Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer,
boolean useNativeTransport )
{
this.log = logProvider.getLog( getClass() );
this.clock = clock;
this.channelInitializer = channelInitializer;
this.bootstrapConfiguration = useNativeTransport ? preferNativeClientConfig() : nioClientConfig();
}

public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler )
Expand Down Expand Up @@ -124,7 +129,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel
handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock );
bootstrap = new Bootstrap()
.group( eventLoopGroup )
.channel( NioSocketChannel.class )
.channel( bootstrapConfiguration.channelClass() )
.handler( channelInitializer.apply( handler ) );
}

Expand Down Expand Up @@ -181,7 +186,7 @@ public void close()
@Override
public void start()
{
eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "catch-up-client" ) );
eventLoopGroup = bootstrapConfiguration.eventLoopGroup( new NamedThreadFactory( "catch-up-client" ) );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class CatchupClientBuilder
private ApplicationSupportedProtocols catchupProtocols = new ApplicationSupportedProtocols( CATCHUP, emptyList() );
private Collection<ModifierSupportedProtocols> modifierProtocols = emptyList();
private Clock clock = systemClock();
private boolean useNativeTransport = true;

public CatchupClientBuilder()
{
Expand Down Expand Up @@ -120,6 +121,12 @@ public CatchupClientBuilder clock( Clock clock )
return this;
}

public CatchupClientBuilder useNativeTransport( boolean useNativeTransport )
{
this.useNativeTransport = useNativeTransport;
return this;
}

public CatchUpClient build()
{
ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols );
Expand All @@ -136,6 +143,6 @@ public CatchUpClient build()
handshakeTimeout, debugLogProvider, userLogProvider );
};

return new CatchUpClient( debugLogProvider, clock, channelInitializer );
return new CatchUpClient( debugLogProvider, clock, channelInitializer, useNativeTransport );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class CatchupServerBuilder
private ChannelInboundHandler parentHandler;
private ListenSocketAddress listenAddress;
private String serverName = "catchup-server";
private boolean useNativeTransport = true;

public CatchupServerBuilder( CatchupServerHandler catchupServerHandler )
{
Expand Down Expand Up @@ -112,6 +113,12 @@ public CatchupServerBuilder serverName( String serverName )
return this;
}

public CatchupServerBuilder useNativeTransport( boolean useNativeTransport )
{
this.useNativeTransport = useNativeTransport;
return this;
}

public Server build()
{
ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols );
Expand All @@ -126,6 +133,6 @@ public Server build()
HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, pipelineBuilder, debugLogProvider );

return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName );
return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName, useNativeTransport );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ public class CausalClusteringSettings implements LoadableConfig
setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ),
NO_DEFAULT );

@Description( "Use native transport if available for Linux or MacOS. If false, always use Nio" )
public static final Setting<Boolean> use_native_transport = setting( "causal_clustering.use_native_transport", BOOLEAN, TRUE );

@Description( "Type of in-flight cache." )
public static final Setting<InFlightCacheFactory.Type> in_flight_cache_type =
setting( "causal_clustering.in_flight_cache.type", options( InFlightCacheFactory.Type.class, true ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,10 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
ModifierProtocolInstaller.allClientInstallers );

Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );
boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport );
HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, clientPipelineBuilderFactory, handshakeTimeout, logProvider, platformModule.logging.getUserLogProvider() );
final SenderService raftSender = new SenderService( channelInitializer, logProvider );
final SenderService raftSender = new SenderService( channelInitializer, logProvider, useNativeTransport );
life.add( raftSender );
this.clientInstalledProtocols = raftSender::installedProtocols;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessa
protocolInstallerRepository, pipelineBuilderFactory, logProvider );

ListenSocketAddress raftListenAddress = platformModule.config.get( CausalClusteringSettings.raft_listen_address );

boolean useNativeTransport = platformModule.config.get( CausalClusteringSettings.use_native_transport );

Server raftServer = new Server( handshakeServerInitializer, installedProtocolsHandler, logProvider, platformModule.logging.getUserLogProvider(),
raftListenAddress, "raft-server" );
raftListenAddress, "raft-server", useNativeTransport );

LoggingInbound<ReceivedInstantClusterIdAwareMessage<?>> loggingRaftInbound =
new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public Optional<Server> resolveIfBackupEnabled( Config config )
if ( config.get( OnlineBackupSettings.online_backup_enabled ) )
{
ListenSocketAddress backupAddress = HostnamePortAsListenAddress.resolve( config, OnlineBackupSettings.online_backup_server );
boolean nativeTransport = config.get( CausalClusteringSettings.use_native_transport );
logProvider.getLog( TransactionBackupServiceProvider.class ).info( "Binding backup service on address %s", backupAddress );
return Optional.of( new CatchupServerBuilder( catchupServerHandler )
.serverHandler( parentHandler )
Expand All @@ -77,6 +78,7 @@ public Optional<Server> resolveIfBackupEnabled( Config config )
.debugLogProvider( logProvider )
.listenAddress( backupAddress )
.serverName( "backup-server" )
.useNativeTransport( nativeTransport )
.build());
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla

this.snapshotService = new CoreSnapshotService( commandApplicationProcess, coreState, consensusModule.raftLog(), consensusModule.raftMachine() );

CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory );
boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport );

CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory, useNativeTransport );
CoreStateDownloader downloader = createCoreStateDownloader( servicesToStopOnStoreCopy, catchUpClient );

this.downloadService = new CoreStateDownloaderService( platformModule.jobScheduler, downloader,
Expand Down Expand Up @@ -198,6 +200,7 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S
.debugLogProvider( logProvider )
.listenAddress( config.get( transaction_listen_address ) )
.serverName( "catchup-server" )
.useNativeTransport( useNativeTransport )
.build();

TransactionBackupServiceProvider transactionBackupServiceProvider =
Expand All @@ -221,15 +224,15 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S
backupServer.ifPresent( servicesToStopOnStoreCopy::add );
}

private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory )
private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory, boolean useNativeTransport )
{
SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config, logProvider );
ApplicationSupportedProtocols supportedCatchupProtocols = supportedProtocolCreator.createSupportedCatchupProtocol();
Collection<ModifierSupportedProtocols> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols();
Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );

CatchUpClient catchUpClient = new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory,
handshakeTimeout, logProvider, userLogProvider, systemClock() ).build();
handshakeTimeout, logProvider, userLogProvider, systemClock() ).useNativeTransport( useNativeTransport ).build();
platformModule.life.add( catchUpClient );
return catchUpClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;

import org.neo4j.causalclustering.net.BootstrapConfiguration;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
Expand All @@ -44,9 +45,12 @@
import org.neo4j.logging.LogProvider;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.neo4j.causalclustering.net.BootstrapConfiguration.preferNativeClientConfig;
import static org.neo4j.causalclustering.net.NioBootstrapConfig.nioClientConfig;

public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress,Message>
{
private final BootstrapConfiguration<? extends SocketChannel> bootstrapConfiguration;
private ReconnectingChannels channels;

private final ChannelInitializer channelInitializer;
Expand All @@ -56,13 +60,14 @@ public class SenderService extends LifecycleAdapter implements Outbound<Advertis
private JobHandle jobHandle;
private boolean senderServiceRunning;
private Bootstrap bootstrap;
private NioEventLoopGroup eventLoopGroup;
private EventLoopGroup eventLoopGroup;

public SenderService( ChannelInitializer channelInitializer, LogProvider logProvider )
public SenderService( ChannelInitializer channelInitializer, LogProvider logProvider, boolean useNativeTransport )
{
this.channelInitializer = channelInitializer;
this.log = logProvider.getLog( getClass() );
this.channels = new ReconnectingChannels();
this.bootstrapConfiguration = useNativeTransport ? preferNativeClientConfig() : nioClientConfig();
}

@Override
Expand Down Expand Up @@ -131,10 +136,10 @@ public synchronized void start()
serviceLock.writeLock().lock();
try
{
eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "sender-service" ) );
eventLoopGroup = bootstrapConfiguration.eventLoopGroup( new NamedThreadFactory( "sender-service" ) );
bootstrap = new Bootstrap()
.group( eventLoopGroup )
.channel( NioSocketChannel.class )
.channel( bootstrapConfiguration.channelClass() )
.handler( channelInitializer );

senderServiceRunning = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.causalclustering.net;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;

import java.util.concurrent.ThreadFactory;

public interface BootstrapConfiguration<TYPE extends Channel>
{
static BootstrapConfiguration<? extends ServerSocketChannel> preferNativeServerConfig()
{
if ( Epoll.isAvailable() )
{
return EpollBootstrapConfig.epollServerConfig();
}
else if ( KQueue.isAvailable() )
{
return KQueueBootsrapConfig.kQueueServerConfig();
}
else
{
return NioBootstrapConfig.nioServerConfig();
}
}

static BootstrapConfiguration<? extends SocketChannel> preferNativeClientConfig()
{
if ( Epoll.isAvailable() )
{
return EpollBootstrapConfig.epollClientConfig();
}
else if ( KQueue.isAvailable() )
{
return KQueueBootsrapConfig.kQueueClientConfig();
}
else
{
return NioBootstrapConfig.nioClientConfig();
}
}

EventLoopGroup eventLoopGroup( ThreadFactory threadFactory );

Class<TYPE> channelClass();
}

0 comments on commit 04b69ad

Please sign in to comment.