Skip to content

Commit

Permalink
Protocol handshake
Browse files Browse the repository at this point in the history
Currently handshaking the Raft protocol.
  • Loading branch information
martinfurmanski committed Jan 29, 2018
1 parent 7468047 commit adce450
Show file tree
Hide file tree
Showing 76 changed files with 4,506 additions and 345 deletions.
32 changes: 32 additions & 0 deletions community/common/src/main/java/org/neo4j/stream/Streams.java
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.stream;

import java.util.Optional;
import java.util.stream.Stream;

public class Streams
{
@SuppressWarnings( "OptionalUsedAsFieldOrParameterType" )
public static <T> Stream<T> ofOptional( Optional<T> opt )
{
return opt.map( Stream::of ).orElse( Stream.empty() );
}
}
Expand Up @@ -28,9 +28,9 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
import org.neo4j.causalclustering.handlers.NoOpPipelineHandlerAppenderFactory;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppenderFactory;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.handlers.DuplexPipelineWrapperFactory;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
Expand Down Expand Up @@ -88,9 +88,9 @@ private BackupProtocolService haFromConfig( PageCache pageCache )

private BackupDelegator backupDelegatorFromConfig( PageCache pageCache, Config config )
{
PipelineHandlerAppender pipelineHandlerAppender = createPipelineHandlerAppender( config );
PipelineWrapper pipelineWrapper = createPipelineWrapper( config );
CatchUpClient catchUpClient = new CatchUpClient(
logProvider, clock, INACTIVITY_TIMEOUT_MILLIS, monitors, pipelineHandlerAppender );
logProvider, clock, INACTIVITY_TIMEOUT_MILLIS, monitors, pipelineWrapper );
TxPullClient txPullClient = new TxPullClient( catchUpClient, monitors );
StoreCopyClient storeCopyClient = new StoreCopyClient( catchUpClient, logProvider );

Expand All @@ -102,10 +102,10 @@ private BackupDelegator backupDelegatorFromConfig( PageCache pageCache, Config c
return backupDelegator( remoteStore, catchUpClient, storeCopyClient );
}

protected PipelineHandlerAppender createPipelineHandlerAppender( Config config )
protected PipelineWrapper createPipelineWrapper( Config config )
{
PipelineHandlerAppenderFactory pipelineHandlerAppenderFactory = new NoOpPipelineHandlerAppenderFactory();
return pipelineHandlerAppenderFactory.create( config, null, logProvider );
DuplexPipelineWrapperFactory pipelineWrapperFactory = new VoidPipelineWrapperFactory();
return pipelineWrapperFactory.forServer( config, null, logProvider );
}

private static BackupDelegator backupDelegator(
Expand Down
Expand Up @@ -21,8 +21,8 @@

import org.junit.Test;

import org.neo4j.causalclustering.handlers.NoOpPipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.commandline.admin.OutsideWorld;
import org.neo4j.kernel.configuration.Config;

Expand All @@ -42,8 +42,8 @@ public void communityBackupSupportingFactory()

BackupSupportingClassesFactoryProvider provider = getProvidersByPriority().findFirst().get();
BackupSupportingClassesFactory factory = provider.getFactory( backupModule );
assertEquals( NoOpPipelineHandlerAppender.class,
factory.createPipelineHandlerAppender( Config.defaults() ).getClass() );
assertEquals( VoidPipelineWrapperFactory.VOID_WRAPPER,
factory.createPipelineWrapper( Config.defaults() ) );
}

/**
Expand All @@ -57,7 +57,7 @@ public BackupSupportingClassesFactory getFactory( BackupModule backupModule )
return new BackupSupportingClassesFactory( backupModule )
{
@Override
protected PipelineHandlerAppender createPipelineHandlerAppender( Config config )
protected PipelineWrapper createPipelineWrapper( Config config )
{
throw new AssertionError( "This provider should never be loaded" );
}
Expand Down
Expand Up @@ -31,7 +31,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
Expand All @@ -50,21 +50,21 @@ public class CatchUpClient extends LifecycleAdapter
private final Log log;
private final Clock clock;
private final Monitors monitors;
private final PipelineHandlerAppender pipelineAppender;
private final PipelineWrapper pipelineWrapper;
private final long inactivityTimeoutMillis;
private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new );

private NioEventLoopGroup eventLoopGroup;

public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Monitors monitors,
PipelineHandlerAppender pipelineAppender )
PipelineWrapper pipelineWrapper )
{
this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() );
this.clock = clock;
this.inactivityTimeoutMillis = inactivityTimeoutMillis;
this.monitors = monitors;
this.pipelineAppender = pipelineAppender;
this.pipelineWrapper = pipelineWrapper;
}

public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler )
Expand Down Expand Up @@ -110,7 +110,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel
@Override
protected void initChannel( SocketChannel ch ) throws Exception
{
CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider, monitors, pipelineAppender );
CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider, monitors, pipelineWrapper );
}
} );

Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.causalclustering.catchup;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand Down Expand Up @@ -47,7 +48,7 @@
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

Expand All @@ -58,13 +59,16 @@ private CatchUpClientChannelPipeline()
}

static void initChannel( SocketChannel ch, CatchUpResponseHandler handler, LogProvider logProvider,
Monitors monitors, PipelineHandlerAppender pipelineHandlerAppender ) throws Exception
Monitors monitors, PipelineWrapper pipelineWrapper ) throws Exception
{
CatchupClientProtocol protocol = new CatchupClientProtocol();

ChannelPipeline pipeline = ch.pipeline();

pipelineHandlerAppender.addPipelineHandlerForClient( pipeline, ch );
for ( ChannelHandler wrappingHandler : pipelineWrapper.handlersFor( ch ) )
{
pipeline.addLast( wrappingHandler );
}

pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) );
Expand Down
Expand Up @@ -26,6 +26,7 @@

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -64,7 +65,7 @@
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
Expand Down Expand Up @@ -95,7 +96,7 @@ public class CatchupServer extends LifecycleAdapter
private final BooleanSupplier dataSourceAvailabilitySupplier;
private final FileSystemAbstraction fs;
private final PageCache pageCache;
private final PipelineHandlerAppender pipelineAppender;
private final PipelineWrapper pipelineWrapper;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" );
Expand All @@ -112,7 +113,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier,
CoreSnapshotService snapshotService, Config config, Monitors monitors, Supplier<CheckPointer> checkPointerSupplier,
FileSystemAbstraction fs, PageCache pageCache,
StoreCopyCheckPointMutex storeCopyCheckPointMutex, PipelineHandlerAppender pipelineAppender )
StoreCopyCheckPointMutex storeCopyCheckPointMutex, PipelineWrapper pipelineWrapper )
{
this.snapshotService = snapshotService;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
Expand All @@ -129,7 +130,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
this.checkPointerSupplier = checkPointerSupplier;
this.fs = fs;
this.pageCache = pageCache;
this.pipelineAppender = pipelineAppender;
this.pipelineWrapper = pipelineWrapper;
}

@Override
Expand All @@ -153,7 +154,10 @@ protected void initChannel( SocketChannel ch ) throws Exception

ChannelPipeline pipeline = ch.pipeline();

pipelineAppender.addPipelineHandlerForServer( pipeline, ch );
for ( ChannelHandler handler : pipelineWrapper.handlersFor( ch ) )
{
pipeline.addLast( handler );
}

pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) );
Expand Down
Expand Up @@ -424,6 +424,10 @@ public enum DiscoveryType
public static final Setting<String> load_balancing_plugin =
setting( "causal_clustering.load_balancing.plugin", STRING, "server_policies" );

@Description( "Time out for protocol negotiation handshake" )
public static final Setting<Duration> handshake_timeout =
setting( "causal_clustering.handshake_timeout", DURATION, "5000ms" );

static BaseSetting<String> prefixSetting( final String name, final Function<String, String> parser,
final String defaultValue )
{
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreFiles;
import org.neo4j.causalclustering.core.consensus.ConsensusModule;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.RaftProtocolClientInstaller;
import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.core.replication.ReplicationBenchmarkProcedure;
import org.neo4j.causalclustering.core.replication.Replicator;
Expand All @@ -45,9 +46,9 @@
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure;
import org.neo4j.causalclustering.handlers.NoOpPipelineHandlerAppenderFactory;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppenderFactory;
import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.handlers.DuplexPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader;
import org.neo4j.causalclustering.load_balancing.LoadBalancingProcessor;
Expand All @@ -57,12 +58,16 @@
import org.neo4j.causalclustering.logging.BetterMessageLogger;
import org.neo4j.causalclustering.logging.MessageLogger;
import org.neo4j.causalclustering.logging.NullMessageLogger;
import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.LoggingOutbound;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.messaging.RaftChannelInitializer;
import org.neo4j.causalclustering.messaging.RaftOutbound;
import org.neo4j.causalclustering.messaging.SenderService;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ProtocolRepository;
import org.neo4j.com.storecopy.StoreUtil;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.DependencyResolver;
Expand Down Expand Up @@ -102,11 +107,11 @@
import org.neo4j.kernel.internal.KernelData;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleStatus;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.time.Clocks;
import org.neo4j.udc.UsageData;

import static java.util.Collections.singletonList;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.raft_messages_log_path;

/**
Expand Down Expand Up @@ -170,7 +175,6 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
final FileSystemAbstraction fileSystem = platformModule.fileSystem;
final File storeDir = platformModule.storeDir;
final LifeSupport life = platformModule.life;
final Monitors monitors = platformModule.monitors;

final File dataDir = config.get( GraphDatabaseSettings.data_directory );
final ClusterStateDirectory clusterStateDirectory = new ClusterStateDirectory( dataDir, storeDir, false );
Expand Down Expand Up @@ -210,16 +214,22 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
// We need to satisfy the dependency here to keep users of it, such as BoltKernelExtension, happy.
dependencies.satisfyDependency( SslPolicyLoader.create( config, logProvider ) );

PipelineHandlerAppenderFactory appenderFactory = appenderFactory();
PipelineHandlerAppender pipelineHandlerAppender = appenderFactory.create( config, dependencies, logProvider );
PipelineWrapper clientPipelineWrapper = pipelineWrapperFactory().forClient( config, dependencies, logProvider );
PipelineWrapper serverPipelineWrapper = pipelineWrapperFactory().forServer( config, dependencies, logProvider );

NettyPipelineBuilderFactory clientPipelineBuilderFactory = new NettyPipelineBuilderFactory( clientPipelineWrapper );
NettyPipelineBuilderFactory serverPipelineBuilderFactory = new NettyPipelineBuilderFactory( serverPipelineWrapper );

topologyService = clusteringModule.topologyService();

long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis();

final SenderService raftSender = new SenderService(
new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider, monitors, pipelineHandlerAppender ),
logProvider, platformModule.monitors );
ProtocolRepository protocolRepository = new ProtocolRepository( Protocol.Protocols.values() );
ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository =
new ProtocolInstallerRepository<>( singletonList( new RaftProtocolClientInstaller( logProvider, clientPipelineBuilderFactory ) ) );
HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( logProvider, protocolRepository, Protocol.Identifier.RAFT,
protocolInstallerRepository, config, clientPipelineBuilderFactory );
final SenderService raftSender = new SenderService( channelInitializer, logProvider, platformModule.monitors );
life.add( raftSender );

final MessageLogger<MemberId> messageLogger = createMessageLogger( config, life, identityModule.myself() );
Expand Down Expand Up @@ -255,10 +265,9 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
this.accessCapability = new LeaderCanWrite( consensusModule.raftMachine() );

CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule, coreStateMachinesModule, clusteringModule,
replicationModule, localDatabase, databaseHealthSupplier, clusterStateDirectory.get(), pipelineHandlerAppender );
replicationModule, localDatabase, databaseHealthSupplier, clusterStateDirectory.get(), serverPipelineWrapper, clientPipelineWrapper );

new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, pipelineHandlerAppender, monitors,
messageLogger );
new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, serverPipelineBuilderFactory, messageLogger );

editionInvariants( platformModule, dependencies, config, logging, life );

Expand Down Expand Up @@ -289,9 +298,9 @@ protected ClusteringModule getClusteringModule( PlatformModule platformModule,
platformModule, clusterStateDirectory.get() );
}

protected PipelineHandlerAppenderFactory appenderFactory()
protected DuplexPipelineWrapperFactory pipelineWrapperFactory()
{
return new NoOpPipelineHandlerAppenderFactory();
return new VoidPipelineWrapperFactory();
}

@Override
Expand Down

0 comments on commit adce450

Please sign in to comment.