Skip to content

Commit

Permalink
Remove native transport for causal clustering
Browse files Browse the repository at this point in the history
Revert "Disable native transports in tests"

This reverts commit 787b119

Revert "Native transports disabled by default in clustering"

This reverts commit 1aa86ff

Revert "Added conditional tests for transport configuration"

This reverts commit d9051f7

Revert "Add more context to config"

This reverts commit 088884f

Revert "Pass boolean to initiator method"

This reverts commit 185185d

Revert "Remove sneaky netty catch"

This reverts commit d2fb491

Revert "Support using native transport in CausalClusering"

This reverts commit 04b69ad
  • Loading branch information
RagnarW committed Oct 12, 2018
1 parent 1f20e0f commit d41dcda
Show file tree
Hide file tree
Showing 21 changed files with 47 additions and 482 deletions.
Expand Up @@ -138,7 +138,7 @@ private CatchUpClient catchUpClient( Config config )
Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );
long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ).toMillis(); long inactivityTimeoutMillis = config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ).toMillis();
return new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, handshakeTimeout, return new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, handshakeTimeout,
logProvider, logProvider, clock ).useNativeTransport( false ).inactivityTimeoutMillis( inactivityTimeoutMillis ).build(); logProvider, logProvider, clock ).inactivityTimeoutMillis( inactivityTimeoutMillis ).build();
} }


private static BackupDelegator backupDelegator( private static BackupDelegator backupDelegator(
Expand Down
Expand Up @@ -27,8 +27,9 @@
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


import java.net.ConnectException; import java.net.ConnectException;
import java.time.Clock; import java.time.Clock;
Expand All @@ -38,7 +39,6 @@
import java.util.function.Function; import java.util.function.Function;


import org.neo4j.causalclustering.messaging.CatchUpRequest; import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.causalclustering.net.BootstrapConfiguration;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand All @@ -48,7 +48,6 @@
import static java.lang.String.format; import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.neo4j.causalclustering.catchup.TimeoutLoop.waitForCompletion; import static org.neo4j.causalclustering.catchup.TimeoutLoop.waitForCompletion;
import static org.neo4j.causalclustering.net.BootstrapConfiguration.clientConfig;


public class CatchUpClient extends LifecycleAdapter public class CatchUpClient extends LifecycleAdapter
{ {
Expand All @@ -58,19 +57,16 @@ public class CatchUpClient extends LifecycleAdapter
private final Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer; private final Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer;


private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new ); private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new );
private final BootstrapConfiguration<? extends SocketChannel> bootstrapConfiguration;


private EventLoopGroup eventLoopGroup; private NioEventLoopGroup eventLoopGroup;


public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis,
Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer, Function<CatchUpResponseHandler,ChannelInitializer<SocketChannel>> channelInitializer )
boolean useNativeTransport )
{ {
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.clock = clock; this.clock = clock;
this.inactivityTimeoutMillis = inactivityTimeoutMillis; this.inactivityTimeoutMillis = inactivityTimeoutMillis;
this.channelInitializer = channelInitializer; this.channelInitializer = channelInitializer;
this.bootstrapConfiguration = clientConfig( useNativeTransport );
} }


public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler ) public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler )
Expand Down Expand Up @@ -134,7 +130,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel
handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock ); handler = new TrackingResponseHandler( new CatchUpResponseAdaptor(), clock );
bootstrap = new Bootstrap() bootstrap = new Bootstrap()
.group( eventLoopGroup ) .group( eventLoopGroup )
.channel( bootstrapConfiguration.channelClass() ) .channel( NioSocketChannel.class )
.handler( channelInitializer.apply( handler ) ); .handler( channelInitializer.apply( handler ) );
} }


Expand Down Expand Up @@ -191,7 +187,7 @@ public void close()
@Override @Override
public void start() public void start()
{ {
eventLoopGroup = bootstrapConfiguration.eventLoopGroup( new NamedThreadFactory( "catch-up-client" ) ); eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "catch-up-client" ) );
} }


@Override @Override
Expand Down
Expand Up @@ -63,7 +63,6 @@ public class CatchupClientBuilder
private Collection<ModifierSupportedProtocols> modifierProtocols = emptyList(); private Collection<ModifierSupportedProtocols> modifierProtocols = emptyList();
private Clock clock = systemClock(); private Clock clock = systemClock();
private long inactivityTimeoutMillis = TimeUnit.SECONDS.toMillis( 10 ); private long inactivityTimeoutMillis = TimeUnit.SECONDS.toMillis( 10 );
private boolean useNativeTransport = true;


public CatchupClientBuilder() public CatchupClientBuilder()
{ {
Expand Down Expand Up @@ -129,12 +128,6 @@ public CatchupClientBuilder clock( Clock clock )
return this; return this;
} }


public CatchupClientBuilder useNativeTransport( boolean useNativeTransport )
{
this.useNativeTransport = useNativeTransport;
return this;
}

public CatchUpClient build() public CatchUpClient build()
{ {
ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols );
Expand All @@ -151,6 +144,6 @@ public CatchUpClient build()
handshakeTimeout, debugLogProvider, userLogProvider ); handshakeTimeout, debugLogProvider, userLogProvider );
}; };


return new CatchUpClient( debugLogProvider, clock, inactivityTimeoutMillis, channelInitializer, useNativeTransport ); return new CatchUpClient( debugLogProvider, clock, inactivityTimeoutMillis, channelInitializer );
} }
} }
Expand Up @@ -58,7 +58,6 @@ public class CatchupServerBuilder
private ChannelInboundHandler parentHandler; private ChannelInboundHandler parentHandler;
private ListenSocketAddress listenAddress; private ListenSocketAddress listenAddress;
private String serverName = "catchup-server"; private String serverName = "catchup-server";
private boolean useNativeTransport = true;


public CatchupServerBuilder( CatchupServerHandler catchupServerHandler ) public CatchupServerBuilder( CatchupServerHandler catchupServerHandler )
{ {
Expand Down Expand Up @@ -113,12 +112,6 @@ public CatchupServerBuilder serverName( String serverName )
return this; return this;
} }


public CatchupServerBuilder useNativeTransport( boolean useNativeTransport )
{
this.useNativeTransport = useNativeTransport;
return this;
}

public Server build() public Server build()
{ {
ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); ApplicationProtocolRepository applicationProtocolRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols );
Expand All @@ -133,6 +126,6 @@ public Server build()
HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository, HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( applicationProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, pipelineBuilder, debugLogProvider ); protocolInstallerRepository, pipelineBuilder, debugLogProvider );


return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName, useNativeTransport ); return new Server( handshakeServerInitializer, parentHandler, debugLogProvider, userLogProvider, listenAddress, serverName );
} }
} }
Expand Up @@ -184,10 +184,6 @@ public class CausalClusteringSettings implements LoadableConfig
setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ), setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ),
NO_DEFAULT ); NO_DEFAULT );


@Description( "Use native transport if available. Epoll for Linux or Kqueue for MacOS. If this setting is set to false, or if native transport is not " +
"available, nio transport will be used." )
public static final Setting<Boolean> use_native_transport = setting( "causal_clustering.use_native_transport", BOOLEAN, FALSE );

@Description( "Type of in-flight cache." ) @Description( "Type of in-flight cache." )
public static final Setting<InFlightCacheFactory.Type> in_flight_cache_type = public static final Setting<InFlightCacheFactory.Type> in_flight_cache_type =
setting( "causal_clustering.in_flight_cache.type", optionsIgnoreCase( InFlightCacheFactory.Type.class ), setting( "causal_clustering.in_flight_cache.type", optionsIgnoreCase( InFlightCacheFactory.Type.class ),
Expand Down
Expand Up @@ -279,10 +279,9 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
ModifierProtocolInstaller.allClientInstallers ); ModifierProtocolInstaller.allClientInstallers );


Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );
boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport );
HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository, HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( applicationProtocolRepository, modifierProtocolRepository,
protocolInstallerRepository, clientPipelineBuilderFactory, handshakeTimeout, logProvider, platformModule.logging.getUserLogProvider() ); protocolInstallerRepository, clientPipelineBuilderFactory, handshakeTimeout, logProvider, platformModule.logging.getUserLogProvider() );
final SenderService raftSender = new SenderService( channelInitializer, logProvider, useNativeTransport ); final SenderService raftSender = new SenderService( channelInitializer, logProvider );
life.add( raftSender ); life.add( raftSender );
this.clientInstalledProtocols = raftSender::installedProtocols; this.clientInstalledProtocols = raftSender::installedProtocols;


Expand Down
Expand Up @@ -135,11 +135,8 @@ private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessa
protocolInstallerRepository, pipelineBuilderFactory, logProvider ); protocolInstallerRepository, pipelineBuilderFactory, logProvider );


ListenSocketAddress raftListenAddress = platformModule.config.get( CausalClusteringSettings.raft_listen_address ); ListenSocketAddress raftListenAddress = platformModule.config.get( CausalClusteringSettings.raft_listen_address );

boolean useNativeTransport = platformModule.config.get( CausalClusteringSettings.use_native_transport );

Server raftServer = new Server( handshakeServerInitializer, installedProtocolsHandler, logProvider, platformModule.logging.getUserLogProvider(), Server raftServer = new Server( handshakeServerInitializer, installedProtocolsHandler, logProvider, platformModule.logging.getUserLogProvider(),
raftListenAddress, "raft-server", useNativeTransport ); raftListenAddress, "raft-server" );


LoggingInbound<ReceivedInstantClusterIdAwareMessage<?>> loggingRaftInbound = LoggingInbound<ReceivedInstantClusterIdAwareMessage<?>> loggingRaftInbound =
new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() ); new LoggingInbound<>( nettyHandler, messageLogger, identityModule.myself() );
Expand Down
Expand Up @@ -67,7 +67,6 @@ public Optional<Server> resolveIfBackupEnabled( Config config )
if ( config.get( OnlineBackupSettings.online_backup_enabled ) ) if ( config.get( OnlineBackupSettings.online_backup_enabled ) )
{ {
ListenSocketAddress backupAddress = HostnamePortAsListenAddress.resolve( config, OnlineBackupSettings.online_backup_server ); ListenSocketAddress backupAddress = HostnamePortAsListenAddress.resolve( config, OnlineBackupSettings.online_backup_server );
boolean nativeTransport = config.get( CausalClusteringSettings.use_native_transport );
logProvider.getLog( TransactionBackupServiceProvider.class ).info( "Binding backup service on address %s", backupAddress ); logProvider.getLog( TransactionBackupServiceProvider.class ).info( "Binding backup service on address %s", backupAddress );
return Optional.of( new CatchupServerBuilder( catchupServerHandler ) return Optional.of( new CatchupServerBuilder( catchupServerHandler )
.serverHandler( parentHandler ) .serverHandler( parentHandler )
Expand All @@ -78,7 +77,6 @@ public Optional<Server> resolveIfBackupEnabled( Config config )
.debugLogProvider( logProvider ) .debugLogProvider( logProvider )
.listenAddress( backupAddress ) .listenAddress( backupAddress )
.serverName( "backup-server" ) .serverName( "backup-server" )
.useNativeTransport( nativeTransport )
.build()); .build());
} }
else else
Expand Down
Expand Up @@ -169,9 +169,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla


this.snapshotService = new CoreSnapshotService( commandApplicationProcess, coreState, consensusModule.raftLog(), consensusModule.raftMachine() ); this.snapshotService = new CoreSnapshotService( commandApplicationProcess, coreState, consensusModule.raftLog(), consensusModule.raftMachine() );


boolean useNativeTransport = config.get( CausalClusteringSettings.use_native_transport ); CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory );

CatchUpClient catchUpClient = createCatchupClient( clientPipelineBuilderFactory, useNativeTransport );
CoreStateDownloader downloader = createCoreStateDownloader( servicesToStopOnStoreCopy, catchUpClient ); CoreStateDownloader downloader = createCoreStateDownloader( servicesToStopOnStoreCopy, catchUpClient );


this.downloadService = new CoreStateDownloaderService( platformModule.jobScheduler, downloader, this.downloadService = new CoreStateDownloaderService( platformModule.jobScheduler, downloader,
Expand Down Expand Up @@ -200,7 +198,6 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S
.debugLogProvider( logProvider ) .debugLogProvider( logProvider )
.listenAddress( config.get( transaction_listen_address ) ) .listenAddress( config.get( transaction_listen_address ) )
.serverName( "catchup-server" ) .serverName( "catchup-server" )
.useNativeTransport( useNativeTransport )
.build(); .build();


TransactionBackupServiceProvider transactionBackupServiceProvider = TransactionBackupServiceProvider transactionBackupServiceProvider =
Expand All @@ -224,15 +221,15 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S
backupServer.ifPresent( servicesToStopOnStoreCopy::add ); backupServer.ifPresent( servicesToStopOnStoreCopy::add );
} }


private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory, boolean useNativeTransport ) private CatchUpClient createCatchupClient( NettyPipelineBuilderFactory clientPipelineBuilderFactory )
{ {
SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config, logProvider ); SupportedProtocolCreator supportedProtocolCreator = new SupportedProtocolCreator( config, logProvider );
ApplicationSupportedProtocols supportedCatchupProtocols = supportedProtocolCreator.createSupportedCatchupProtocol(); ApplicationSupportedProtocols supportedCatchupProtocols = supportedProtocolCreator.createSupportedCatchupProtocol();
Collection<ModifierSupportedProtocols> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols(); Collection<ModifierSupportedProtocols> supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols();
Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout ); Duration handshakeTimeout = config.get( CausalClusteringSettings.handshake_timeout );


CatchUpClient catchUpClient = new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory, CatchUpClient catchUpClient = new CatchupClientBuilder( supportedCatchupProtocols, supportedModifierProtocols, clientPipelineBuilderFactory,
handshakeTimeout, logProvider, userLogProvider, systemClock() ).useNativeTransport( useNativeTransport ).build(); handshakeTimeout, logProvider, userLogProvider, systemClock() ).build();
platformModule.life.add( catchUpClient ); platformModule.life.add( catchUpClient );
return catchUpClient; return catchUpClient;
} }
Expand Down
Expand Up @@ -22,34 +22,31 @@
*/ */
package org.neo4j.causalclustering.messaging; package org.neo4j.causalclustering.messaging;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;

import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream; import java.util.stream.Stream;


import org.neo4j.causalclustering.net.BootstrapConfiguration; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import org.neo4j.causalclustering.protocol.handshake.ProtocolStack; import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.collection.Pair; import org.neo4j.helpers.collection.Pair;
import org.neo4j.scheduler.JobHandle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobHandle;


import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.neo4j.causalclustering.net.BootstrapConfiguration.clientConfig;


public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress,Message> public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress,Message>
{ {
private final BootstrapConfiguration<? extends SocketChannel> bootstrapConfiguration;
private ReconnectingChannels channels; private ReconnectingChannels channels;


private final ChannelInitializer channelInitializer; private final ChannelInitializer channelInitializer;
Expand All @@ -59,14 +56,13 @@ public class SenderService extends LifecycleAdapter implements Outbound<Advertis
private JobHandle jobHandle; private JobHandle jobHandle;
private boolean senderServiceRunning; private boolean senderServiceRunning;
private Bootstrap bootstrap; private Bootstrap bootstrap;
private EventLoopGroup eventLoopGroup; private NioEventLoopGroup eventLoopGroup;


public SenderService( ChannelInitializer channelInitializer, LogProvider logProvider, boolean useNativeTransport ) public SenderService( ChannelInitializer channelInitializer, LogProvider logProvider )
{ {
this.channelInitializer = channelInitializer; this.channelInitializer = channelInitializer;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.channels = new ReconnectingChannels(); this.channels = new ReconnectingChannels();
this.bootstrapConfiguration = clientConfig( useNativeTransport );
} }


@Override @Override
Expand Down Expand Up @@ -135,10 +131,10 @@ public synchronized void start()
serviceLock.writeLock().lock(); serviceLock.writeLock().lock();
try try
{ {
eventLoopGroup = bootstrapConfiguration.eventLoopGroup( new NamedThreadFactory( "sender-service" ) ); eventLoopGroup = new NioEventLoopGroup( 0, new NamedThreadFactory( "sender-service" ) );
bootstrap = new Bootstrap() bootstrap = new Bootstrap()
.group( eventLoopGroup ) .group( eventLoopGroup )
.channel( bootstrapConfiguration.channelClass() ) .channel( NioSocketChannel.class )
.handler( channelInitializer ); .handler( channelInitializer );


senderServiceRunning = true; senderServiceRunning = true;
Expand Down

This file was deleted.

0 comments on commit d41dcda

Please sign in to comment.