diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java index 81d6d00441c3d..00da10e106dfe 100644 --- a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java +++ b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java @@ -138,7 +138,7 @@ private CatchUpClient catchUpClient( Config config ) Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ).toMillis(); return new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, handshakeTimeout, - logProvider, logProvider, clock ).useNativeTransport( false ).inactivityTimeoutMillis( inactivityTimeoutMillis ).build(); + logProvider, logProvider, clock ).inactivityTimeoutMillis( inactivityTimeoutMillis ).build(); } private static BackupDelegator backupDelegator( diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java index f4acc0f0fc24f..decc72ec3242e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java @@ -27,8 +27,9 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import java.net.ConnectException; import java.time.Clock; @@ -38,7 +39,6 @@ import java.util.function.Function; import org.neo4j.causalclustering.messaging.CatchUpRequest; -import org.neo4j.causalclustering.net.BootstrapConfiguration; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -48,7 +48,6 @@ import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.neo4j.causalclustering.catchup.TimeoutLoop.waitForCompletion; -import static org.neo4j.causalclustering.net.BootstrapConfiguration.clientConfig; public class CatchUpClient extends LifecycleAdapter { @@ -58,19 +57,16 @@ public class CatchUpClient extends LifecycleAdapter private final Function> channelInitializer; private final CatchUpChannelPool pool = new CatchUpChannelPool<>( CatchUpChannel::new ); - private final BootstrapConfiguration bootstrapConfiguration; - private EventLoopGroup eventLoopGroup; + private NioEventLoopGroup eventLoopGroup; public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, - Function> channelInitializer, - boolean useNativeTransport ) + Function> channelInitializer ) { this.log = logProvider.getLog( getClass() ); this.clock = clock; this.inactivityTimeoutMillis = inactivityTimeoutMillis; this.channelInitializer = channelInitializer; - this.bootstrapConfiguration = clientConfig( useNativeTransport ); } public T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback responseHandler ) @@ -134,7 +130,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock ); bootstrap = new Bootstrap() .group( eventLoopGroup ) - .channel( bootstrapConfiguration.channelClass() ) + .channel( NioSocketChannel.class ) .handler( channelInitializer.apply( handler ) ); } @@ -191,7 +187,7 @@ public void close() @Override public void start() { - eventLoopGroup = bootstrapConfiguration.eventLoopGroup( new NamedThreadFactory( "catch-up-client" ) ); + eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "catch-up-client" ) ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupClientBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupClientBuilder.java index 506846e2d99dd..e7cc9c28fc13a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupClientBuilder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupClientBuilder.java @@ -63,7 +63,6 @@ public class CatchupClientBuilder private Collection modifierProtocols = emptyList(); private Clock clock = systemClock(); private long inactivityTimeoutMillis = TimeUnit.SECONDS.toMillis( 10 ); - private boolean useNativeTransport = true; public CatchupClientBuilder() { @@ -129,12 +128,6 @@ public CatchupClientBuilder clock( Clock clock ) return this; } - public CatchupClientBuilder useNativeTransport( boolean useNativeTransport ) - { - this.useNativeTransport = useNativeTransport; - return this; - } - public CatchUpClient build() { ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); @@ -151,6 +144,6 @@ public CatchUpClient build() handshakeTimeout, debugLogProvider, userLogProvider ); }; - return new CatchUpClient( debugLogProvider, clock, inactivityTimeoutMillis, channelInitializer, useNativeTransport ); + return new CatchUpClient( debugLogProvider, clock, inactivityTimeoutMillis, channelInitializer ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java index f241478fbb636..390e74c93ed8a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServerBuilder.java @@ -58,7 +58,6 @@ public class CatchupServerBuilder private ChannelInboundHandler parentHandler; private ListenSocketAddress listenAddress; private String serverName = "catchup-server"; - private boolean useNativeTransport = true; public CatchupServerBuilder( CatchupServerHandler catchupServerHandler ) { @@ -113,12 +112,6 @@ public CatchupServerBuilder serverName( String serverName ) return this; } - public CatchupServerBuilder useNativeTransport( boolean useNativeTransport ) - { - this.useNativeTransport = useNativeTransport; - return this; - } - public Server build() { ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); @@ -133,6 +126,6 @@ public Server build() HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, pipelineBuilder, debugLogProvider ); - return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName, useNativeTransport ); + return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index 368a8970360d9..846942adda403 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -184,10 +184,6 @@ public class CausalClusteringSettings implements LoadableConfig setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), NO_DEFAULT ); - @Description( "Use native transport if available. Epoll for Linux or Kqueue for MacOS. If this setting is set to false, or if native transport is not " + - "available, nio transport will be used." ) - public static final Setting use_native_transport = setting( "causal_clustering.use_native_transport", BOOLEAN, FALSE ); - @Description( "Type of in-flight cache." ) public static final Setting in_flight_cache_type = setting( "causal_clustering.in_flight_cache.type", optionsIgnoreCase( InFlightCacheFactory.Type.class ), diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index 1483be65c6a8a..e82d1d80e79f5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -279,10 +279,9 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule, ModifierProtocolInstaller.allClientInstallers ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); - boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport ); HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository, protocolInstallerRepository, clientPipelineBuilderFactory, handshakeTimeout, logProvider, platformModule.logging.getUserLogProvider() ); - final SenderService raftSender = new SenderService( channelInitializer, logProvider, useNativeTransport ); + final SenderService raftSender = new SenderService( channelInitializer, logProvider ); life.add( raftSender ); this.clientInstalledProtocols = raftSender::installedProtocols; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java index ca7f029d62677..62ebc23769f64 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java @@ -135,11 +135,8 @@ private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessa protocolInstallerRepository, pipelineBuilderFactory, logProvider ); ListenSocketAddress raftListenAddress = platformModule.config.get( CausalClusteringSettings.raft_listen_address ); - - boolean useNativeTransport = platformModule.config.get( CausalClusteringSettings.use_native_transport ); - Server raftServer = new Server( handshakeServerInitializer, installedProtocolsHandler, logProvider, platformModule.logging.getUserLogProvider(), - raftListenAddress, "raft-server", useNativeTransport ); + raftListenAddress, "raft-server" ); LoggingInbound> loggingRaftInbound = new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java index 3c22a0fedf24b..60b5c7c303621 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java @@ -67,7 +67,6 @@ public Optional resolveIfBackupEnabled( Config config ) if ( config.get( OnlineBackupSettings.online_backup_enabled ) ) { ListenSocketAddress backupAddress = HostnamePortAsListenAddress.resolve( config, OnlineBackupSettings.online_backup_server ); - boolean nativeTransport = config.get( CausalClusteringSettings.use_native_transport ); logProvider.getLog( TransactionBackupServiceProvider.class ).info( "Binding backup service on address %s", backupAddress ); return Optional.of( new CatchupServerBuilder( catchupServerHandler ) .serverHandler( parentHandler ) @@ -78,7 +77,6 @@ public Optional resolveIfBackupEnabled( Config config ) .debugLogProvider( logProvider ) .listenAddress( backupAddress ) .serverName( "backup-server" ) - .useNativeTransport( nativeTransport ) .build()); } else diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index a91fcbf52f66d..4b0f607afc7c2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -169,9 +169,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla this.snapshotService = new CoreSnapshotService( commandApplicationProcess, coreState, consensusModule.raftLog(), consensusModule.raftMachine() ); - boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport ); - - CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory, useNativeTransport ); + CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory ); CoreStateDownloader downloader = createCoreStateDownloader( servicesToStopOnStoreCopy, catchUpClient ); this.downloadService = new CoreStateDownloaderService( platformModule.jobScheduler, downloader, @@ -200,7 +198,6 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S .debugLogProvider( logProvider ) .listenAddress( config.get( transaction_listen_address ) ) .serverName( "catchup-server" ) - .useNativeTransport( useNativeTransport ) .build(); TransactionBackupServiceProvider transactionBackupServiceProvider = @@ -224,7 +221,7 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S backupServer.ifPresent( servicesToStopOnStoreCopy::add ); } - private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory, boolean useNativeTransport ) + private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory ) { SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config, logProvider ); ApplicationSupportedProtocols supportedCatchupProtocols = supportedProtocolCreator.createSupportedCatchupProtocol(); @@ -232,7 +229,7 @@ private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPip Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); CatchUpClient catchUpClient = new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, - handshakeTimeout, logProvider, userLogProvider, systemClock() ).useNativeTransport( useNativeTransport ).build(); + handshakeTimeout, logProvider, userLogProvider, systemClock() ).build(); platformModule.life.add( catchUpClient ); return catchUpClient; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java index 85fa65244bf11..1d979a0bf2772 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java @@ -22,11 +22,6 @@ */ package org.neo4j.causalclustering.messaging; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.SocketChannel; - import java.util.Iterator; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -34,22 +29,24 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Stream; -import org.neo4j.causalclustering.net.BootstrapConfiguration; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.collection.Pair; +import org.neo4j.scheduler.JobHandle; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; -import org.neo4j.scheduler.JobHandle; import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static org.neo4j.causalclustering.net.BootstrapConfiguration.clientConfig; public class SenderService extends LifecycleAdapter implements Outbound { - private final BootstrapConfiguration bootstrapConfiguration; private ReconnectingChannels channels; private final ChannelInitializer channelInitializer; @@ -59,14 +56,13 @@ public class SenderService extends LifecycleAdapter implements Outbound -{ - static BootstrapConfiguration serverConfig( boolean preferNative ) - { - if ( preferNative ) - { - if ( Epoll.isAvailable() ) - { - return EpollBootstrapConfig.epollServerConfig(); - } - else if ( KQueue.isAvailable() ) - { - return KQueueBootstrapConfig.kQueueServerConfig(); - } - } - return NioBootstrapConfig.nioServerConfig(); - } - - static BootstrapConfiguration clientConfig( boolean preferNative ) - { - if ( preferNative ) - { - if ( Epoll.isAvailable() ) - { - return EpollBootstrapConfig.epollClientConfig(); - } - else if ( KQueue.isAvailable() ) - { - return KQueueBootstrapConfig.kQueueClientConfig(); - } - } - return NioBootstrapConfig.nioClientConfig(); - } - - EventLoopGroup eventLoopGroup( ThreadFactory threadFactory ); - - Class channelClass(); -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/EpollBootstrapConfig.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/EpollBootstrapConfig.java deleted file mode 100644 index 005efb2c03b86..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/EpollBootstrapConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.net; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollServerSocketChannel; -import io.netty.channel.epoll.EpollSocketChannel; - -import java.util.concurrent.ThreadFactory; - -public abstract class EpollBootstrapConfig implements BootstrapConfiguration -{ - public static EpollBootstrapConfig epollServerConfig() - { - return new EpollBootstrapConfig() - { - @Override - public Class channelClass() - { - return EpollServerSocketChannel.class; - } - }; - } - - public static EpollBootstrapConfig epollClientConfig() - { - return new EpollBootstrapConfig() - { - @Override - public Class channelClass() - { - return EpollSocketChannel.class; - } - }; - } - - @Override - public EventLoopGroup eventLoopGroup( ThreadFactory threadFactory ) - { - return new EpollEventLoopGroup( 0, threadFactory ); - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/KQueueBootstrapConfig.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/KQueueBootstrapConfig.java deleted file mode 100644 index 4dd3b11062733..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/KQueueBootstrapConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.net; - -import io.netty.channel.Channel; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.kqueue.KQueueEventLoopGroup; -import io.netty.channel.kqueue.KQueueServerSocketChannel; -import io.netty.channel.kqueue.KQueueSocketChannel; - -import java.util.concurrent.ThreadFactory; - -public abstract class KQueueBootstrapConfig implements BootstrapConfiguration -{ - public static KQueueBootstrapConfig kQueueServerConfig() - { - return new KQueueBootstrapConfig() - { - @Override - public Class channelClass() - { - return KQueueServerSocketChannel.class; - } - }; - } - - public static KQueueBootstrapConfig kQueueClientConfig() - { - return new KQueueBootstrapConfig() - { - @Override - public Class channelClass() - { - return KQueueSocketChannel.class; - } - }; - } - - @Override - public EventLoopGroup eventLoopGroup( ThreadFactory threadFactory ) - { - return new KQueueEventLoopGroup( 0, threadFactory ); - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/NioBootstrapConfig.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/NioBootstrapConfig.java deleted file mode 100644 index 094f731fdf4e9..0000000000000 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/NioBootstrapConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.net; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.AbstractNioChannel; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; - -import java.util.concurrent.ThreadFactory; - -public abstract class NioBootstrapConfig implements BootstrapConfiguration -{ - public static NioBootstrapConfig nioServerConfig() - { - return new NioBootstrapConfig() - { - @Override - public Class channelClass() - { - return NioServerSocketChannel.class; - } - }; - } - - public static NioBootstrapConfig nioClientConfig() - { - return new NioBootstrapConfig() - { - @Override - public Class channelClass() - { - return NioSocketChannel.class; - } - }; - } - - @Override - public EventLoopGroup eventLoopGroup( ThreadFactory threadFactory ) - { - return new NioEventLoopGroup( 0, threadFactory ); - } -} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/Server.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/Server.java index 8ecdf8cf6219f..e3583655c648b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/Server.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/net/Server.java @@ -27,8 +27,10 @@ import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import java.net.BindException; import java.util.concurrent.TimeUnit; import org.neo4j.causalclustering.helper.SuspendableLifeCycle; @@ -39,7 +41,6 @@ import org.neo4j.logging.NullLogProvider; import static java.lang.String.format; -import static org.neo4j.causalclustering.net.BootstrapConfiguration.serverConfig; public class Server extends SuspendableLifeCycle { @@ -48,7 +49,6 @@ public class Server extends SuspendableLifeCycle private final String serverName; private final NamedThreadFactory threadFactory; - private final BootstrapConfiguration bootstrapConfiguration; private final ChildInitializer childInitializer; private final ChannelInboundHandler parentHandler; private final ListenSocketAddress listenAddress; @@ -59,11 +59,11 @@ public class Server extends SuspendableLifeCycle public Server( ChildInitializer childInitializer, LogProvider debugLogProvider, LogProvider userLogProvider, ListenSocketAddress listenAddress, String serverName ) { - this( childInitializer, null, debugLogProvider, userLogProvider, listenAddress, serverName, false ); + this( childInitializer, null, debugLogProvider, userLogProvider, listenAddress, serverName ); } public Server( ChildInitializer childInitializer, ChannelInboundHandler parentHandler, LogProvider debugLogProvider, LogProvider userLogProvider, - ListenSocketAddress listenAddress, String serverName, boolean useNativeTransport ) + ListenSocketAddress listenAddress, String serverName ) { super( debugLogProvider.getLog( Server.class ) ); this.childInitializer = childInitializer; @@ -73,12 +73,11 @@ public Server( ChildInitializer childInitializer, ChannelInboundHandler parentHa this.userLog = userLogProvider.getLog( getClass() ); this.serverName = serverName; this.threadFactory = new NamedThreadFactory( serverName ); - this.bootstrapConfiguration = serverConfig( useNativeTransport ); } public Server( ChildInitializer childInitializer, ListenSocketAddress listenAddress, String serverName ) { - this( childInitializer, null, NullLogProvider.getInstance(), NullLogProvider.getInstance(), listenAddress, serverName, false ); + this( childInitializer, null, NullLogProvider.getInstance(), NullLogProvider.getInstance(), listenAddress, serverName ); } @Override @@ -95,11 +94,11 @@ protected void start0() return; } - workerGroup = bootstrapConfiguration.eventLoopGroup( threadFactory ); + workerGroup = new NioEventLoopGroup( 0, threadFactory ); ServerBootstrap bootstrap = new ServerBootstrap() .group( workerGroup ) - .channel( bootstrapConfiguration.channelClass() ) + .channel( NioServerSocketChannel.class ) .option( ChannelOption.SO_REUSEADDR, Boolean.TRUE ) .localAddress( listenAddress.socketAddress() ) .childHandler( childInitializer.asChannelInitializer() ); @@ -112,14 +111,17 @@ protected void start0() try { channel = bootstrap.bind().syncUninterruptibly().channel(); - debugLog.info( "%s: bound to '%s' with transport '%s'", serverName, listenAddress, bootstrapConfiguration.channelClass().getSimpleName() ); + debugLog.info( serverName + ": bound to " + listenAddress ); } catch ( Exception e ) { - String message = - format( "%s: cannot bind to '%s' with transport '%s'.", serverName, listenAddress, bootstrapConfiguration.channelClass().getSimpleName() ); - userLog.error( message + " Message: " + e.getMessage() ); - debugLog.error( message, e ); + //noinspection ConstantConditions netty sneaky throw + if ( e instanceof BindException ) + { + String message = serverName + ": address is already bound: " + listenAddress; + userLog.error( message ); + debugLog.error( message, e ); + } throw e; } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index da03da7f8965c..845159c91250c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -256,11 +256,9 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, clientPipelineBuilderFactory, handshakeTimeout, logProvider, userLogProvider ); }; - boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport ); long inactivityTimeoutMs = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ).toMillis(); - CatchUpClient catchUpClient = - life.add( new CatchUpClient( logProvider, Clocks.systemClock(), inactivityTimeoutMs, channelInitializer, useNativeTransport ) ); + CatchUpClient catchUpClient = life.add( new CatchUpClient( logProvider, Clocks.systemClock(), inactivityTimeoutMs, channelInitializer ) ); final Supplier databaseHealthSupplier = () -> platformModule.dataSourceManager.getDataSource().getDependencyResolver().resolveDependency( DatabaseHealth.class ); @@ -342,7 +340,6 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, .debugLogProvider( logProvider ) .listenAddress( config.get( transaction_listen_address ) ) .serverName( "catchup-server" ) - .useNativeTransport( useNativeTransport ) .build(); TransactionBackupServiceProvider transactionBackupServiceProvider = diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpClientIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpClientIT.java index d56efd15dc90f..adae394c8b803 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpClientIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/CatchUpClientIT.java @@ -218,7 +218,7 @@ protected void initChannel( SocketChannel ch ) { ch.pipeline().addLast( channelHandlers ); } - }, true ); + } ); } private Server catchupServer( ListenSocketAddress listenSocketAddress, ChannelHandler... channelHandlers ) diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java index da8c48877b5d0..a1922055b0319 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.java @@ -327,7 +327,7 @@ public AdvertisedSocketAddress secondary() catch ( StoreCopyFailedException e ) { assertableLogProvider.assertContainsExactlyOneMessageMatching( - both( containsString( "Connection refused:" ) ).and( containsString( "localhost/127.0.0.1:" + port ) ) ); + both( startsWith( "Connection refused:" ) ).and( containsString( "localhost/127.0.0.1:" + port ) ) ); } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java index 7ba274ddd8564..b53a8b5650972 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/messaging/SenderServiceIT.java @@ -155,7 +155,7 @@ private Server raftServer( ChannelInboundHandler nettyHandler, int port ) installer, pipelineFactory, logProvider ); ListenSocketAddress listenAddress = new ListenSocketAddress( "localhost", port ); - return new Server( channelInitializer, null, logProvider, logProvider, listenAddress, "raft-server", true ); + return new Server( channelInitializer, null, logProvider, logProvider, listenAddress, "raft-server" ); } private SenderService raftSender() @@ -176,7 +176,7 @@ private SenderService raftSender() logProvider, logProvider ); - return new SenderService( channelInitializer, logProvider, true ); + return new SenderService( channelInitializer, logProvider ); } private ApplicationProtocolRepository clientRepository() diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/net/BootstrapConfigurationTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/net/BootstrapConfigurationTest.java deleted file mode 100644 index 884932246b30d..0000000000000 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/net/BootstrapConfigurationTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j Enterprise Edition. The included source - * code can be redistributed and/or modified under the terms of the - * GNU AFFERO GENERAL PUBLIC LICENSE Version 3 - * (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the - * Commons Clause, as found in the associated LICENSE.txt file. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * Neo4j object code can be licensed independently from the source - * under separate terms from the AGPL. Inquiries can be directed to: - * licensing@neo4j.com - * - * More information is also available at: - * https://neo4j.com/licensing/ - */ -package org.neo4j.causalclustering.net; - -import io.netty.channel.epoll.Epoll; -import io.netty.channel.epoll.EpollServerSocketChannel; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueServerSocketChannel; -import io.netty.channel.kqueue.KQueueSocketChannel; -import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.ExtensionContext; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -class BootstrapConfigurationTest -{ - @Test - @AssumeEpoll - void shouldChooseEpollIfAvailable() - { - BootstrapConfiguration cConfig = BootstrapConfiguration.clientConfig( true ); - BootstrapConfiguration sConfig = BootstrapConfiguration.serverConfig( true ); - - assertEquals( EpollSocketChannel.class, cConfig.channelClass() ); - assertEquals( EpollServerSocketChannel.class, sConfig.channelClass() ); - } - - @Test - @AssumeKQueue - void shouldChooseKqueueIfAvailable() - { - BootstrapConfiguration cConfig = BootstrapConfiguration.clientConfig( true ); - BootstrapConfiguration sConfig = BootstrapConfiguration.serverConfig( true ); - - assertEquals( KQueueSocketChannel.class, cConfig.channelClass() ); - assertEquals( KQueueServerSocketChannel.class, sConfig.channelClass() ); - } - - @Test - @AssumeNoEpollOrKQueue - void shouldChooseNioIfNoNativeAvailable() - { - BootstrapConfiguration cConfig = BootstrapConfiguration.clientConfig( true ); - BootstrapConfiguration sConfig = BootstrapConfiguration.serverConfig( true ); - - assertEquals( NioSocketChannel.class, cConfig.channelClass() ); - assertEquals( NioServerSocketChannel.class, sConfig.channelClass() ); - } - - @Test - void shouldChooseNioIfNativeIsNotPrefered() - { - BootstrapConfiguration cConfig = BootstrapConfiguration.clientConfig( false ); - BootstrapConfiguration sConfig = BootstrapConfiguration.serverConfig( false ); - - assertEquals( NioSocketChannel.class, cConfig.channelClass() ); - assertEquals( NioServerSocketChannel.class, sConfig.channelClass() ); - } - - private static class KQueueExecutionCondition implements ExecutionCondition - { - @Override - public ConditionEvaluationResult evaluateExecutionCondition( ExtensionContext context ) - { - return KQueue.isAvailable() ? ConditionEvaluationResult.enabled( "KQueue is available" ) - : ConditionEvaluationResult.disabled( "KQueue is not available" ); - } - } - - private static class EpollExecutionCondition implements ExecutionCondition - { - @Override - public ConditionEvaluationResult evaluateExecutionCondition( ExtensionContext context ) - { - return Epoll.isAvailable() ? ConditionEvaluationResult.enabled( "Epoll is available" ) - : ConditionEvaluationResult.disabled( "Epoll is not available" ); - } - } - - private static class NoEpollOrKqueueCondition implements ExecutionCondition - { - @Override - public ConditionEvaluationResult evaluateExecutionCondition( ExtensionContext context ) - { - return Epoll.isAvailable() || KQueue.isAvailable() ? ConditionEvaluationResult.disabled( "Epoll or Kqueue is available" ) - : ConditionEvaluationResult.enabled( "Epoll and KQueue is not available" ); - } - } - - @Retention( RetentionPolicy.RUNTIME ) - @ExtendWith( KQueueExecutionCondition.class ) - private @interface AssumeKQueue - { } - - @Retention( RetentionPolicy.RUNTIME ) - @ExtendWith( EpollExecutionCondition.class ) - private @interface AssumeEpoll - { } - - @Retention( RetentionPolicy.RUNTIME ) - @ExtendWith( NoEpollOrKqueueCondition.class ) - private @interface AssumeNoEpollOrKQueue - { } -} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java index 91bb8e2c2e98d..3b54135a97aa2 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/ConnectionInfoIT.java @@ -75,9 +75,8 @@ public void testAddressAlreadyBoundMessage() throws Throwable { //expected. } - String expectedPartOfMessage = String.format( "server-name: cannot bind to '%s' with transport ", listenSocketAddress ); - logProvider.assertContainsMessageContaining( expectedPartOfMessage ); - userLogProvider.assertContainsMessageContaining( expectedPartOfMessage ); + logProvider.assertContainsMessageContaining( "server-name: address is already bound: " ); + userLogProvider.assertContainsMessageContaining( "server-name: address is already bound: " ); } @SuppressWarnings( "SameParameterValue" )