diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java index 592ad6b71c733..6454673c86aa1 100644 --- a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java +++ b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java @@ -137,7 +137,7 @@ private CatchUpClient catchUpClient( Config config ) NettyPipelineBuilderFactory clientPipelineBuilderFactory = new NettyPipelineBuilderFactory( createPipelineWrapper( config ) ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); return new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, handshakeTimeout, - logProvider, logProvider, clock ).build(); + logProvider, logProvider, clock ).useNativeTransport( false ).build(); } private static BackupDelegator backupDelegator( diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java index 5929c9ad322dc..d78996e50dba8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java @@ -27,9 +27,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import java.net.ConnectException; import java.time.Clock; @@ -39,6 +38,7 @@ import java.util.function.Function; import org.neo4j.causalclustering.messaging.CatchUpRequest; +import org.neo4j.causalclustering.net.BootstrapConfiguration; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -46,6 +46,8 @@ import org.neo4j.logging.LogProvider; import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static org.neo4j.causalclustering.net.BootstrapConfiguration.preferNativeClientConfig; +import static org.neo4j.causalclustering.net.NioBootstrapConfig.nioClientConfig; public class CatchUpClient extends LifecycleAdapter { @@ -54,14 +56,17 @@ public class CatchUpClient extends LifecycleAdapter private final Function> channelInitializer; private final CatchUpChannelPool pool = new CatchUpChannelPool<>( CatchUpChannel::new ); + private final BootstrapConfiguration bootstrapConfiguration; - private NioEventLoopGroup eventLoopGroup; + private EventLoopGroup eventLoopGroup; - public CatchUpClient( LogProvider logProvider, Clock clock, Function> channelInitializer ) + public CatchUpClient( LogProvider logProvider, Clock clock, Function> channelInitializer, + boolean useNativeTransport ) { this.log = logProvider.getLog( getClass() ); this.clock = clock; this.channelInitializer = channelInitializer; + this.bootstrapConfiguration = useNativeTransport ? preferNativeClientConfig() : nioClientConfig(); } public T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback responseHandler ) @@ -124,7 +129,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock ); bootstrap = new Bootstrap() .group( eventLoopGroup ) - .channel( NioSocketChannel.class ) + .channel( bootstrapConfiguration.channelClass() ) .handler( channelInitializer.apply( handler ) ); } @@ -181,7 +186,7 @@ public void close() @Override public void start() { - eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "catch-up-client" ) ); + eventLoopGroup = bootstrapConfiguration.eventLoopGroup( new NamedThreadFactory( "catch-up-client" ) ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupClientBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupClientBuilder.java index 0c415b8cfa4ec..c18de1b84c9c0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupClientBuilder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupClientBuilder.java @@ -61,6 +61,7 @@ public class CatchupClientBuilder private ApplicationSupportedProtocols catchupProtocols = new ApplicationSupportedProtocols( CATCHUP, emptyList() ); private Collection modifierProtocols = emptyList(); private Clock clock = systemClock(); + private boolean useNativeTransport = true; public CatchupClientBuilder() { @@ -120,6 +121,12 @@ public CatchupClientBuilder clock( Clock clock ) return this; } + public CatchupClientBuilder useNativeTransport( boolean useNativeTransport ) + { + this.useNativeTransport = useNativeTransport; + return this; + } + public CatchUpClient build() { ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); @@ -136,6 +143,6 @@ public CatchUpClient build() handshakeTimeout, debugLogProvider, userLogProvider ); }; - return new CatchUpClient( debugLogProvider, clock, channelInitializer ); + return new CatchUpClient( debugLogProvider, clock, channelInitializer, useNativeTransport ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java index 390e74c93ed8a..f241478fbb636 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java @@ -58,6 +58,7 @@ public class CatchupServerBuilder private ChannelInboundHandler parentHandler; private ListenSocketAddress listenAddress; private String serverName = "catchup-server"; + private boolean useNativeTransport = true; public CatchupServerBuilder( CatchupServerHandler catchupServerHandler ) { @@ -112,6 +113,12 @@ public CatchupServerBuilder serverName( String serverName ) return this; } + public CatchupServerBuilder useNativeTransport( boolean useNativeTransport ) + { + this.useNativeTransport = useNativeTransport; + return this; + } + public Server build() { ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); @@ -126,6 +133,6 @@ public Server build() HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, pipelineBuilder, debugLogProvider ); - return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName ); + return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName, useNativeTransport ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index 713373b01dc2d..5334e740f47a8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -191,6 +191,9 @@ public class CausalClusteringSettings implements LoadableConfig setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), NO_DEFAULT ); + @Description( "Use native transport if available for Linux or MacOS. If false, always use Nio" ) + public static final Setting use_native_transport = setting( "causal_clustering.use_native_transport", BOOLEAN, TRUE ); + @Description( "Type of in-flight cache." ) public static final Setting in_flight_cache_type = setting( "causal_clustering.in_flight_cache.type", options( InFlightCacheFactory.Type.class, true ), diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index ff0f1b9ea0c9a..bcd101d7f509b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -279,9 +279,10 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, ModifierProtocolInstaller.allClientInstallers ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); + boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport ); HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, clientPipelineBuilderFactory, handshakeTimeout, logProvider, platformModule.logging.getUserLogProvider() ); - final SenderService raftSender = new SenderService( channelInitializer, logProvider ); + final SenderService raftSender = new SenderService( channelInitializer, logProvider, useNativeTransport ); life.add( raftSender ); this.clientInstalledProtocols = raftSender::installedProtocols; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java index b66efdc11786a..e58cf08648ac5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java @@ -134,8 +134,11 @@ private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessa protocolInstallerRepository, pipelineBuilderFactory, logProvider ); ListenSocketAddress raftListenAddress = platformModule.config.get( CausalClusteringSettings.raft_listen_address ); + + boolean useNativeTransport = platformModule.config.get( CausalClusteringSettings.use_native_transport ); + Server raftServer = new Server( handshakeServerInitializer, installedProtocolsHandler, logProvider, platformModule.logging.getUserLogProvider(), - raftListenAddress, "raft-server" ); + raftListenAddress, "raft-server", useNativeTransport ); LoggingInbound> loggingRaftInbound = new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java index 60b5c7c303621..3c22a0fedf24b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java @@ -67,6 +67,7 @@ public Optional resolveIfBackupEnabled( Config config ) if ( config.get( OnlineBackupSettings.online_backup_enabled ) ) { ListenSocketAddress backupAddress = HostnamePortAsListenAddress.resolve( config, OnlineBackupSettings.online_backup_server ); + boolean nativeTransport = config.get( CausalClusteringSettings.use_native_transport ); logProvider.getLog( TransactionBackupServiceProvider.class ).info( "Binding backup service on address %s", backupAddress ); return Optional.of( new CatchupServerBuilder( catchupServerHandler ) .serverHandler( parentHandler ) @@ -77,6 +78,7 @@ public Optional resolveIfBackupEnabled( Config config ) .debugLogProvider( logProvider ) .listenAddress( backupAddress ) .serverName( "backup-server" ) + .useNativeTransport( nativeTransport ) .build()); } else diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index 4b0f607afc7c2..a91fcbf52f66d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -169,7 +169,9 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla this.snapshotService = new CoreSnapshotService( commandApplicationProcess, coreState, consensusModule.raftLog(), consensusModule.raftMachine() ); - CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory ); + boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport ); + + CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory, useNativeTransport ); CoreStateDownloader downloader = createCoreStateDownloader( servicesToStopOnStoreCopy, catchUpClient ); this.downloadService = new CoreStateDownloaderService( platformModule.jobScheduler, downloader, @@ -198,6 +200,7 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S .debugLogProvider( logProvider ) .listenAddress( config.get( transaction_listen_address ) ) .serverName( "catchup-server" ) + .useNativeTransport( useNativeTransport ) .build(); TransactionBackupServiceProvider transactionBackupServiceProvider = @@ -221,7 +224,7 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S backupServer.ifPresent( servicesToStopOnStoreCopy::add ); } - private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory ) + private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory, boolean useNativeTransport ) { SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config, logProvider ); ApplicationSupportedProtocols supportedCatchupProtocols = supportedProtocolCreator.createSupportedCatchupProtocol(); @@ -229,7 +232,7 @@ private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPip Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); CatchUpClient catchUpClient = new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, - handshakeTimeout, logProvider, userLogProvider, systemClock() ).build(); + handshakeTimeout, logProvider, userLogProvider, systemClock() ).useNativeTransport( useNativeTransport ).build(); platformModule.life.add( catchUpClient ); return catchUpClient; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java index 1d979a0bf2772..9bc4e4b81aff7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java @@ -31,9 +31,10 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import org.neo4j.causalclustering.net.BootstrapConfiguration; import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.NamedThreadFactory; @@ -44,9 +45,12 @@ import org.neo4j.logging.LogProvider; import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static org.neo4j.causalclustering.net.BootstrapConfiguration.preferNativeClientConfig; +import static org.neo4j.causalclustering.net.NioBootstrapConfig.nioClientConfig; public class SenderService extends LifecycleAdapter implements Outbound { + private final BootstrapConfiguration bootstrapConfiguration; private ReconnectingChannels channels; private final ChannelInitializer channelInitializer; @@ -56,13 +60,14 @@ public class SenderService extends LifecycleAdapter implements Outbound +{ + static BootstrapConfiguration preferNativeServerConfig() + { + if ( Epoll.isAvailable() ) + { + return EpollBootstrapConfig.epollServerConfig(); + } + else if ( KQueue.isAvailable() ) + { + return KQueueBootsrapConfig.kQueueServerConfig(); + } + else + { + return NioBootstrapConfig.nioServerConfig(); + } + } + + static BootstrapConfiguration preferNativeClientConfig() + { + if ( Epoll.isAvailable() ) + { + return EpollBootstrapConfig.epollClientConfig(); + } + else if ( KQueue.isAvailable() ) + { + return KQueueBootsrapConfig.kQueueClientConfig(); + } + else + { + return NioBootstrapConfig.nioClientConfig(); + } + } + + EventLoopGroup eventLoopGroup( ThreadFactory threadFactory ); + + Class channelClass(); +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/EpollBootstrapConfig.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/EpollBootstrapConfig.java new file mode 100644 index 0000000000000..005efb2c03b86 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/EpollBootstrapConfig.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.net; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; + +import java.util.concurrent.ThreadFactory; + +public abstract class EpollBootstrapConfig implements BootstrapConfiguration +{ + public static EpollBootstrapConfig epollServerConfig() + { + return new EpollBootstrapConfig() + { + @Override + public Class channelClass() + { + return EpollServerSocketChannel.class; + } + }; + } + + public static EpollBootstrapConfig epollClientConfig() + { + return new EpollBootstrapConfig() + { + @Override + public Class channelClass() + { + return EpollSocketChannel.class; + } + }; + } + + @Override + public EventLoopGroup eventLoopGroup( ThreadFactory threadFactory ) + { + return new EpollEventLoopGroup( 0, threadFactory ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/KQueueBootsrapConfig.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/KQueueBootsrapConfig.java new file mode 100644 index 0000000000000..d6249dcbef8af --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/KQueueBootsrapConfig.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.net; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueServerSocketChannel; +import io.netty.channel.kqueue.KQueueSocketChannel; + +import java.util.concurrent.ThreadFactory; + +public abstract class KQueueBootsrapConfig implements BootstrapConfiguration +{ + public static KQueueBootsrapConfig kQueueServerConfig() + { + return new KQueueBootsrapConfig() + { + @Override + public Class channelClass() + { + return KQueueServerSocketChannel.class; + } + }; + } + + public static KQueueBootsrapConfig kQueueClientConfig() + { + return new KQueueBootsrapConfig() + { + @Override + public Class channelClass() + { + return KQueueSocketChannel.class; + } + }; + } + + @Override + public EventLoopGroup eventLoopGroup( ThreadFactory threadFactory ) + { + return new KQueueEventLoopGroup( 0, threadFactory ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/NioBootstrapConfig.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/NioBootstrapConfig.java new file mode 100644 index 0000000000000..094f731fdf4e9 --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/NioBootstrapConfig.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j Enterprise Edition. The included source + * code can be redistributed and/or modified under the terms of the + * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 + * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the + * Commons Clause, as found in the associated LICENSE.txt file. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * Neo4j object code can be licensed independently from the source + * under separate terms from the AGPL. Inquiries can be directed to: + * licensing@neo4j.com + * + * More information is also available at: + * https://neo4j.com/licensing/ + */ +package org.neo4j.causalclustering.net; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.AbstractNioChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +import java.util.concurrent.ThreadFactory; + +public abstract class NioBootstrapConfig implements BootstrapConfiguration +{ + public static NioBootstrapConfig nioServerConfig() + { + return new NioBootstrapConfig() + { + @Override + public Class channelClass() + { + return NioServerSocketChannel.class; + } + }; + } + + public static NioBootstrapConfig nioClientConfig() + { + return new NioBootstrapConfig() + { + @Override + public Class channelClass() + { + return NioSocketChannel.class; + } + }; + } + + @Override + public EventLoopGroup eventLoopGroup( ThreadFactory threadFactory ) + { + return new NioEventLoopGroup( 0, threadFactory ); + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/Server.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/Server.java index e3583655c648b..d6b8b2628e8c0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/Server.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/Server.java @@ -27,8 +27,7 @@ import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.ServerSocketChannel; import java.net.BindException; import java.util.concurrent.TimeUnit; @@ -41,6 +40,8 @@ import org.neo4j.logging.NullLogProvider; import static java.lang.String.format; +import static org.neo4j.causalclustering.net.BootstrapConfiguration.preferNativeServerConfig; +import static org.neo4j.causalclustering.net.NioBootstrapConfig.nioServerConfig; public class Server extends SuspendableLifeCycle { @@ -49,6 +50,7 @@ public class Server extends SuspendableLifeCycle private final String serverName; private final NamedThreadFactory threadFactory; + private final boolean useNativeTransport; private final ChildInitializer childInitializer; private final ChannelInboundHandler parentHandler; private final ListenSocketAddress listenAddress; @@ -59,11 +61,11 @@ public class Server extends SuspendableLifeCycle public Server( ChildInitializer childInitializer, LogProvider debugLogProvider, LogProvider userLogProvider, ListenSocketAddress listenAddress, String serverName ) { - this( childInitializer, null, debugLogProvider, userLogProvider, listenAddress, serverName ); + this( childInitializer, null, debugLogProvider, userLogProvider, listenAddress, serverName, true ); } public Server( ChildInitializer childInitializer, ChannelInboundHandler parentHandler, LogProvider debugLogProvider, LogProvider userLogProvider, - ListenSocketAddress listenAddress, String serverName ) + ListenSocketAddress listenAddress, String serverName, boolean useNativeTransport ) { super( debugLogProvider.getLog( Server.class ) ); this.childInitializer = childInitializer; @@ -73,11 +75,12 @@ public Server( ChildInitializer childInitializer, ChannelInboundHandler parentHa this.userLog = userLogProvider.getLog( getClass() ); this.serverName = serverName; this.threadFactory = new NamedThreadFactory( serverName ); + this.useNativeTransport = useNativeTransport; } public Server( ChildInitializer childInitializer, ListenSocketAddress listenAddress, String serverName ) { - this( childInitializer, null, NullLogProvider.getInstance(), NullLogProvider.getInstance(), listenAddress, serverName ); + this( childInitializer, null, NullLogProvider.getInstance(), NullLogProvider.getInstance(), listenAddress, serverName, true ); } @Override @@ -93,12 +96,13 @@ protected void start0() { return; } + BootstrapConfiguration bootstrapConfiguration = useNativeTransport ? preferNativeServerConfig() : nioServerConfig(); - workerGroup = new NioEventLoopGroup( 0, threadFactory ); + workerGroup = bootstrapConfiguration.eventLoopGroup( threadFactory ); ServerBootstrap bootstrap = new ServerBootstrap() .group( workerGroup ) - .channel( NioServerSocketChannel.class ) + .channel( bootstrapConfiguration.channelClass() ) .option( ChannelOption.SO_REUSEADDR, Boolean.TRUE ) .localAddress( listenAddress.socketAddress() ) .childHandler( childInitializer.asChannelInitializer() ); @@ -111,7 +115,7 @@ protected void start0() try { channel = bootstrap.bind().syncUninterruptibly().channel(); - debugLog.info( serverName + ": bound to " + listenAddress ); + debugLog.info( "%s: bound to '%s' with transport '%s'", serverName, listenAddress, bootstrapConfiguration.channelClass() ); } catch ( Exception e ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 5c46fb7aa6fd7..9cfcb196507c3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -256,7 +256,9 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, clientPipelineBuilderFactory, handshakeTimeout, logProvider, userLogProvider ); }; - CatchUpClient catchUpClient = life.add( new CatchUpClient( logProvider, Clocks.systemClock(), channelInitializer ) ); + boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport ); + + CatchUpClient catchUpClient = life.add( new CatchUpClient( logProvider, Clocks.systemClock(), channelInitializer, useNativeTransport ) ); final Supplier databaseHealthSupplier = () -> platformModule.dataSourceManager.getDataSource().getDependencyResolver().resolveDependency( DatabaseHealth.class ); @@ -338,6 +340,7 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, .debugLogProvider( logProvider ) .listenAddress( config.get( transaction_listen_address ) ) .serverName( "catchup-server" ) + .useNativeTransport( useNativeTransport ) .build(); TransactionBackupServiceProvider transactionBackupServiceProvider = diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpClientIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpClientIT.java index 5f9f8bf26c9b2..6d9e812c3935a 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpClientIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpClientIT.java @@ -186,7 +186,7 @@ protected void initChannel( SocketChannel ch ) { ch.pipeline().addLast( channelHandlers ); } - } ); + }, true ); } private Server catchupServer( ListenSocketAddress listenSocketAddress, ChannelHandler... channelHandlers ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java index b53a8b5650972..7ba274ddd8564 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java @@ -155,7 +155,7 @@ private Server raftServer( ChannelInboundHandler nettyHandler, int port ) installer, pipelineFactory, logProvider ); ListenSocketAddress listenAddress = new ListenSocketAddress( "localhost", port ); - return new Server( channelInitializer, null, logProvider, logProvider, listenAddress, "raft-server" ); + return new Server( channelInitializer, null, logProvider, logProvider, listenAddress, "raft-server", true ); } private SenderService raftSender() @@ -176,7 +176,7 @@ private SenderService raftSender() logProvider, logProvider ); - return new SenderService( channelInitializer, logProvider ); + return new SenderService( channelInitializer, logProvider, true ); } private ApplicationProtocolRepository clientRepository()