diff --git a/community/common/src/main/java/org/neo4j/stream/Streams.java b/community/common/src/main/java/org/neo4j/stream/Streams.java
new file mode 100644
index 0000000000000..578af11dec586
--- /dev/null
+++ b/community/common/src/main/java/org/neo4j/stream/Streams.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.stream;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+public class Streams
+{
+ @SuppressWarnings( "OptionalUsedAsFieldOrParameterType" )
+ public static Stream ofOptional( Optional opt )
+ {
+ return opt.map( Stream::of ).orElse( Stream.empty() );
+ }
+}
diff --git a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java
index 97c33569059ec..83bb33a95b586 100644
--- a/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java
+++ b/enterprise/backup/src/main/java/org/neo4j/backup/impl/BackupSupportingClassesFactory.java
@@ -28,9 +28,9 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.tx.TransactionLogCatchUpFactory;
import org.neo4j.causalclustering.catchup.tx.TxPullClient;
-import org.neo4j.causalclustering.handlers.NoOpPipelineHandlerAppenderFactory;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppenderFactory;
+import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
+import org.neo4j.causalclustering.handlers.PipelineWrapper;
+import org.neo4j.causalclustering.handlers.DuplexPipelineWrapperFactory;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
@@ -88,9 +88,9 @@ private BackupProtocolService haFromConfig( PageCache pageCache )
private BackupDelegator backupDelegatorFromConfig( PageCache pageCache, Config config )
{
- PipelineHandlerAppender pipelineHandlerAppender = createPipelineHandlerAppender( config );
+ PipelineWrapper pipelineWrapper = createPipelineWrapper( config );
CatchUpClient catchUpClient = new CatchUpClient(
- logProvider, clock, INACTIVITY_TIMEOUT_MILLIS, monitors, pipelineHandlerAppender );
+ logProvider, clock, INACTIVITY_TIMEOUT_MILLIS, monitors, pipelineWrapper );
TxPullClient txPullClient = new TxPullClient( catchUpClient, monitors );
StoreCopyClient storeCopyClient = new StoreCopyClient( catchUpClient, logProvider );
@@ -102,10 +102,10 @@ private BackupDelegator backupDelegatorFromConfig( PageCache pageCache, Config c
return backupDelegator( remoteStore, catchUpClient, storeCopyClient );
}
- protected PipelineHandlerAppender createPipelineHandlerAppender( Config config )
+ protected PipelineWrapper createPipelineWrapper( Config config )
{
- PipelineHandlerAppenderFactory pipelineHandlerAppenderFactory = new NoOpPipelineHandlerAppenderFactory();
- return pipelineHandlerAppenderFactory.create( config, null, logProvider );
+ DuplexPipelineWrapperFactory pipelineWrapperFactory = new VoidPipelineWrapperFactory();
+ return pipelineWrapperFactory.forServer( config, null, logProvider );
}
private static BackupDelegator backupDelegator(
diff --git a/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandProviderTest.java b/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandProviderTest.java
index 06a8d11a71770..663840f29ec34 100644
--- a/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandProviderTest.java
+++ b/enterprise/backup/src/test/java/org/neo4j/backup/impl/OnlineBackupCommandProviderTest.java
@@ -21,8 +21,8 @@
import org.junit.Test;
-import org.neo4j.causalclustering.handlers.NoOpPipelineHandlerAppender;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
+import org.neo4j.causalclustering.handlers.PipelineWrapper;
+import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
import org.neo4j.commandline.admin.OutsideWorld;
import org.neo4j.kernel.configuration.Config;
@@ -42,8 +42,8 @@ public void communityBackupSupportingFactory()
BackupSupportingClassesFactoryProvider provider = getProvidersByPriority().findFirst().get();
BackupSupportingClassesFactory factory = provider.getFactory( backupModule );
- assertEquals( NoOpPipelineHandlerAppender.class,
- factory.createPipelineHandlerAppender( Config.defaults() ).getClass() );
+ assertEquals( VoidPipelineWrapperFactory.VOID_WRAPPER,
+ factory.createPipelineWrapper( Config.defaults() ) );
}
/**
@@ -57,7 +57,7 @@ public BackupSupportingClassesFactory getFactory( BackupModule backupModule )
return new BackupSupportingClassesFactory( backupModule )
{
@Override
- protected PipelineHandlerAppender createPipelineHandlerAppender( Config config )
+ protected PipelineWrapper createPipelineWrapper( Config config )
{
throw new AssertionError( "This provider should never be loaded" );
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java
index ee9beab4c9bdd..f37afe3a3a7e5 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClient.java
@@ -31,7 +31,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
+import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
@@ -50,21 +50,21 @@ public class CatchUpClient extends LifecycleAdapter
private final Log log;
private final Clock clock;
private final Monitors monitors;
- private final PipelineHandlerAppender pipelineAppender;
+ private final PipelineWrapper pipelineWrapper;
private final long inactivityTimeoutMillis;
private final CatchUpChannelPool pool = new CatchUpChannelPool<>( CatchUpChannel::new );
private NioEventLoopGroup eventLoopGroup;
public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Monitors monitors,
- PipelineHandlerAppender pipelineAppender )
+ PipelineWrapper pipelineWrapper )
{
this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() );
this.clock = clock;
this.inactivityTimeoutMillis = inactivityTimeoutMillis;
this.monitors = monitors;
- this.pipelineAppender = pipelineAppender;
+ this.pipelineWrapper = pipelineWrapper;
}
public T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback responseHandler )
@@ -110,7 +110,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel
@Override
protected void initChannel( SocketChannel ch ) throws Exception
{
- CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider, monitors, pipelineAppender );
+ CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider, monitors, pipelineWrapper );
}
} );
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClientChannelPipeline.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClientChannelPipeline.java
index fa6f50a22a0f3..ab7d53091e510 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClientChannelPipeline.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchUpClientChannelPipeline.java
@@ -19,6 +19,7 @@
*/
package org.neo4j.causalclustering.catchup;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -47,7 +48,7 @@
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
+import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
@@ -58,13 +59,16 @@ private CatchUpClientChannelPipeline()
}
static void initChannel( SocketChannel ch, CatchUpResponseHandler handler, LogProvider logProvider,
- Monitors monitors, PipelineHandlerAppender pipelineHandlerAppender ) throws Exception
+ Monitors monitors, PipelineWrapper pipelineWrapper ) throws Exception
{
CatchupClientProtocol protocol = new CatchupClientProtocol();
ChannelPipeline pipeline = ch.pipeline();
- pipelineHandlerAppender.addPipelineHandlerForClient( pipeline, ch );
+ for ( ChannelHandler wrappingHandler : pipelineWrapper.handlersFor( ch ) )
+ {
+ pipeline.addLast( wrappingHandler );
+ }
pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) );
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java
index 8b72cb3631e11..fd66a99a3a873 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/CatchupServer.java
@@ -26,6 +26,7 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -64,7 +65,7 @@
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
+import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
@@ -95,7 +96,7 @@ public class CatchupServer extends LifecycleAdapter
private final BooleanSupplier dataSourceAvailabilitySupplier;
private final FileSystemAbstraction fs;
private final PageCache pageCache;
- private final PipelineHandlerAppender pipelineAppender;
+ private final PipelineWrapper pipelineWrapper;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" );
@@ -112,7 +113,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
Supplier dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier,
CoreSnapshotService snapshotService, Config config, Monitors monitors, Supplier checkPointerSupplier,
FileSystemAbstraction fs, PageCache pageCache,
- StoreCopyCheckPointMutex storeCopyCheckPointMutex, PipelineHandlerAppender pipelineAppender )
+ StoreCopyCheckPointMutex storeCopyCheckPointMutex, PipelineWrapper pipelineWrapper )
{
this.snapshotService = snapshotService;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
@@ -129,7 +130,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
this.checkPointerSupplier = checkPointerSupplier;
this.fs = fs;
this.pageCache = pageCache;
- this.pipelineAppender = pipelineAppender;
+ this.pipelineWrapper = pipelineWrapper;
}
@Override
@@ -153,7 +154,10 @@ protected void initChannel( SocketChannel ch ) throws Exception
ChannelPipeline pipeline = ch.pipeline();
- pipelineAppender.addPipelineHandlerForServer( pipeline, ch );
+ for ( ChannelHandler handler : pipelineWrapper.handlersFor( ch ) )
+ {
+ pipeline.addLast( handler );
+ }
pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) );
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java
index a8e372b2c8f0f..0dd6d1b9bbddf 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java
@@ -424,6 +424,10 @@ public enum DiscoveryType
public static final Setting load_balancing_plugin =
setting( "causal_clustering.load_balancing.plugin", STRING, "server_policies" );
+ @Description( "Time out for protocol negotiation handshake" )
+ public static final Setting handshake_timeout =
+ setting( "causal_clustering.handshake_timeout", DURATION, "5000ms" );
+
static BaseSetting prefixSetting( final String name, final Function parser,
final String defaultValue )
{
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java
index 41cd38a87a8d8..dc097d211c011 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java
@@ -32,6 +32,7 @@
import org.neo4j.causalclustering.catchup.storecopy.StoreFiles;
import org.neo4j.causalclustering.core.consensus.ConsensusModule;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
+import org.neo4j.causalclustering.core.consensus.RaftProtocolClientInstaller;
import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.core.replication.ReplicationBenchmarkProcedure;
import org.neo4j.causalclustering.core.replication.Replicator;
@@ -45,9 +46,9 @@
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure;
-import org.neo4j.causalclustering.handlers.NoOpPipelineHandlerAppenderFactory;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppenderFactory;
+import org.neo4j.causalclustering.handlers.VoidPipelineWrapperFactory;
+import org.neo4j.causalclustering.handlers.PipelineWrapper;
+import org.neo4j.causalclustering.handlers.DuplexPipelineWrapperFactory;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader;
import org.neo4j.causalclustering.load_balancing.LoadBalancingProcessor;
@@ -57,12 +58,16 @@
import org.neo4j.causalclustering.logging.BetterMessageLogger;
import org.neo4j.causalclustering.logging.MessageLogger;
import org.neo4j.causalclustering.logging.NullMessageLogger;
-import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.LoggingOutbound;
import org.neo4j.causalclustering.messaging.Outbound;
-import org.neo4j.causalclustering.messaging.RaftChannelInitializer;
import org.neo4j.causalclustering.messaging.RaftOutbound;
import org.neo4j.causalclustering.messaging.SenderService;
+import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
+import org.neo4j.causalclustering.protocol.ProtocolInstaller;
+import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
+import org.neo4j.causalclustering.protocol.Protocol;
+import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
+import org.neo4j.causalclustering.protocol.handshake.ProtocolRepository;
import org.neo4j.com.storecopy.StoreUtil;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.DependencyResolver;
@@ -102,11 +107,11 @@
import org.neo4j.kernel.internal.KernelData;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.LifecycleStatus;
-import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.time.Clocks;
import org.neo4j.udc.UsageData;
+import static java.util.Collections.singletonList;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.raft_messages_log_path;
/**
@@ -170,7 +175,6 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
final FileSystemAbstraction fileSystem = platformModule.fileSystem;
final File storeDir = platformModule.storeDir;
final LifeSupport life = platformModule.life;
- final Monitors monitors = platformModule.monitors;
final File dataDir = config.get( GraphDatabaseSettings.data_directory );
final ClusterStateDirectory clusterStateDirectory = new ClusterStateDirectory( dataDir, storeDir, false );
@@ -210,16 +214,22 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
// We need to satisfy the dependency here to keep users of it, such as BoltKernelExtension, happy.
dependencies.satisfyDependency( SslPolicyLoader.create( config, logProvider ) );
- PipelineHandlerAppenderFactory appenderFactory = appenderFactory();
- PipelineHandlerAppender pipelineHandlerAppender = appenderFactory.create( config, dependencies, logProvider );
+ PipelineWrapper clientPipelineWrapper = pipelineWrapperFactory().forClient( config, dependencies, logProvider );
+ PipelineWrapper serverPipelineWrapper = pipelineWrapperFactory().forServer( config, dependencies, logProvider );
+
+ NettyPipelineBuilderFactory clientPipelineBuilderFactory = new NettyPipelineBuilderFactory( clientPipelineWrapper );
+ NettyPipelineBuilderFactory serverPipelineBuilderFactory = new NettyPipelineBuilderFactory( serverPipelineWrapper );
topologyService = clusteringModule.topologyService();
long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis();
- final SenderService raftSender = new SenderService(
- new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider, monitors, pipelineHandlerAppender ),
- logProvider, platformModule.monitors );
+ ProtocolRepository protocolRepository = new ProtocolRepository( Protocol.Protocols.values() );
+ ProtocolInstallerRepository protocolInstallerRepository =
+ new ProtocolInstallerRepository<>( singletonList( new RaftProtocolClientInstaller( logProvider, clientPipelineBuilderFactory ) ) );
+ HandshakeClientInitializer channelInitializer = new HandshakeClientInitializer( logProvider, protocolRepository, Protocol.Identifier.RAFT,
+ protocolInstallerRepository, config, clientPipelineBuilderFactory );
+ final SenderService raftSender = new SenderService( channelInitializer, logProvider, platformModule.monitors );
life.add( raftSender );
final MessageLogger messageLogger = createMessageLogger( config, life, identityModule.myself() );
@@ -255,10 +265,9 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
this.accessCapability = new LeaderCanWrite( consensusModule.raftMachine() );
CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule, coreStateMachinesModule, clusteringModule,
- replicationModule, localDatabase, databaseHealthSupplier, clusterStateDirectory.get(), pipelineHandlerAppender );
+ replicationModule, localDatabase, databaseHealthSupplier, clusterStateDirectory.get(), serverPipelineWrapper, clientPipelineWrapper );
- new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, pipelineHandlerAppender, monitors,
- messageLogger );
+ new RaftServerModule( platformModule, consensusModule, identityModule, coreServerModule, localDatabase, serverPipelineBuilderFactory, messageLogger );
editionInvariants( platformModule, dependencies, config, logging, life );
@@ -289,9 +298,9 @@ protected ClusteringModule getClusteringModule( PlatformModule platformModule,
platformModule, clusterStateDirectory.get() );
}
- protected PipelineHandlerAppenderFactory appenderFactory()
+ protected DuplexPipelineWrapperFactory pipelineWrapperFactory()
{
- return new NoOpPipelineHandlerAppenderFactory();
+ return new VoidPipelineWrapperFactory();
}
@Override
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java
index a6509841e68b4..167af1456bf70 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java
@@ -26,44 +26,49 @@
import org.neo4j.causalclustering.core.consensus.ContinuousJob;
import org.neo4j.causalclustering.core.consensus.LeaderAvailabilityHandler;
import org.neo4j.causalclustering.core.consensus.RaftMessageMonitoringHandler;
+import org.neo4j.causalclustering.core.consensus.RaftMessageNettyHandler;
import org.neo4j.causalclustering.core.consensus.RaftMessages.ReceivedInstantClusterIdAwareMessage;
+import org.neo4j.causalclustering.core.consensus.RaftProtocolServerInstaller;
import org.neo4j.causalclustering.core.consensus.RaftServer;
import org.neo4j.causalclustering.core.server.CoreServerModule;
import org.neo4j.causalclustering.core.state.RaftMessageApplier;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.logging.MessageLogger;
import org.neo4j.causalclustering.messaging.ComposableMessageHandler;
-import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.messaging.LoggingInbound;
+import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
+import org.neo4j.causalclustering.protocol.Protocol;
+import org.neo4j.causalclustering.protocol.ProtocolInstaller;
+import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
+import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer;
+import org.neo4j.causalclustering.protocol.handshake.ProtocolRepository;
import org.neo4j.kernel.impl.factory.PlatformModule;
-import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;
-public class RaftServerModule
+import static java.util.Collections.singletonList;
+
+class RaftServerModule
{
private final PlatformModule platformModule;
private final ConsensusModule consensusModule;
private final IdentityModule identityModule;
private final LocalDatabase localDatabase;
- private final Monitors monitors;
private final MessageLogger messageLogger;
private final LogProvider logProvider;
- private final PipelineHandlerAppender pipelineHandlerAppender;
+ private final NettyPipelineBuilderFactory pipelineBuilderFactory;
RaftServerModule( PlatformModule platformModule, ConsensusModule consensusModule, IdentityModule identityModule, CoreServerModule coreServerModule,
- LocalDatabase localDatabase, PipelineHandlerAppender pipelineHandlerAppender, Monitors monitors, MessageLogger messageLogger )
+ LocalDatabase localDatabase, NettyPipelineBuilderFactory pipelineBuilderFactory, MessageLogger messageLogger )
{
this.platformModule = platformModule;
this.consensusModule = consensusModule;
this.identityModule = identityModule;
this.localDatabase = localDatabase;
- this.monitors = monitors;
this.messageLogger = messageLogger;
this.logProvider = platformModule.logging.getInternalLogProvider();
- this.pipelineHandlerAppender = pipelineHandlerAppender;
+ this.pipelineBuilderFactory = pipelineBuilderFactory;
LifecycleMessageHandler messageHandlerChain = createMessageHandlerChain( coreServerModule );
@@ -72,10 +77,19 @@ public class RaftServerModule
private void createRaftServer( CoreServerModule coreServerModule, LifecycleMessageHandler messageHandlerChain )
{
- RaftServer raftServer = new RaftServer( new CoreReplicatedContentMarshal(), pipelineHandlerAppender, platformModule.config, logProvider,
- platformModule.logging.getUserLogProvider(), monitors, platformModule.clock );
+ ProtocolRepository protocolRepository = new ProtocolRepository( Protocol.Protocols.values() );
+
+ RaftMessageNettyHandler nettyHandler = new RaftMessageNettyHandler( logProvider );
+ RaftProtocolServerInstaller raftProtocolServerInstaller = new RaftProtocolServerInstaller( nettyHandler, pipelineBuilderFactory, logProvider );
+ ProtocolInstallerRepository protocolInstallerRepository =
+ new ProtocolInstallerRepository<>( singletonList( raftProtocolServerInstaller ) );
+
+ HandshakeServerInitializer handshakeServerInitializer = new HandshakeServerInitializer( logProvider, protocolRepository, Protocol.Identifier.RAFT,
+ protocolInstallerRepository, pipelineBuilderFactory );
+ RaftServer raftServer = new RaftServer( handshakeServerInitializer, platformModule.config,
+ logProvider, platformModule.logging.getUserLogProvider() );
- LoggingInbound loggingRaftInbound = new LoggingInbound<>( raftServer,
+ LoggingInbound loggingRaftInbound = new LoggingInbound<>( nettyHandler,
messageLogger, identityModule.myself() );
loggingRaftInbound.registerHandler( messageHandlerChain );
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMessageNettyHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMessageNettyHandler.java
new file mode 100644
index 0000000000000..50b6cb8888082
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMessageNettyHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.core.consensus;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import org.neo4j.causalclustering.messaging.Inbound;
+import org.neo4j.logging.Log;
+import org.neo4j.logging.LogProvider;
+
+import static java.lang.String.format;
+
+@ChannelHandler.Sharable
+public class RaftMessageNettyHandler extends SimpleChannelInboundHandler
+ implements Inbound
+{
+ private Inbound.MessageHandler actual;
+ private Log log;
+
+ public RaftMessageNettyHandler( LogProvider logProvider )
+ {
+ this.log = logProvider.getLog( getClass() );
+ }
+
+ @Override
+ public void registerHandler( Inbound.MessageHandler actual )
+ {
+ this.actual = actual;
+ }
+
+ @Override
+ protected void channelRead0( ChannelHandlerContext channelHandlerContext, RaftMessages.ReceivedInstantClusterIdAwareMessage incomingMessage )
+ throws Exception
+ {
+ try
+ {
+ actual.handle( incomingMessage );
+ }
+ catch ( Exception e )
+ {
+ log.error( format( "Failed to process message %s", incomingMessage ), e );
+ }
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java
new file mode 100644
index 0000000000000..679cd721efbd1
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolClientInstaller.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.core.consensus;
+
+import io.netty.channel.Channel;
+
+import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal;
+import org.neo4j.causalclustering.messaging.marshalling.RaftMessageEncoder;
+import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
+import org.neo4j.causalclustering.protocol.Protocol;
+import org.neo4j.causalclustering.protocol.ProtocolInstaller;
+import org.neo4j.logging.Log;
+import org.neo4j.logging.LogProvider;
+
+public class RaftProtocolClientInstaller extends ProtocolInstaller
+{
+ private final Log log;
+ private final NettyPipelineBuilderFactory clientPipelineBuilderFactory;
+
+ public RaftProtocolClientInstaller( LogProvider logProvider, NettyPipelineBuilderFactory clientPipelineBuilderFactory )
+ {
+ super( Protocol.Protocols.RAFT_1 );
+ this.log = logProvider.getLog( getClass() );
+ this.clientPipelineBuilderFactory = clientPipelineBuilderFactory;
+ }
+
+ @Override
+ public void install( Channel channel ) throws Exception
+ {
+ clientPipelineBuilderFactory.create( channel, log )
+ .addFraming()
+ .add( new RaftMessageEncoder( new CoreReplicatedContentMarshal() ) )
+ .install();
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java
new file mode 100644
index 0000000000000..3cb5aad56b21c
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftProtocolServerInstaller.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.core.consensus;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInboundHandler;
+
+import java.time.Clock;
+
+import org.neo4j.causalclustering.messaging.CoreReplicatedContentMarshal;
+import org.neo4j.causalclustering.messaging.marshalling.RaftMessageDecoder;
+import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
+import org.neo4j.causalclustering.protocol.Protocol;
+import org.neo4j.causalclustering.protocol.ProtocolInstaller;
+import org.neo4j.logging.Log;
+import org.neo4j.logging.LogProvider;
+
+public class RaftProtocolServerInstaller extends ProtocolInstaller
+{
+ private final ChannelInboundHandler raftMessageHandler;
+ private final NettyPipelineBuilderFactory pipelineBuilderFactory;
+ private final Log log;
+
+ public RaftProtocolServerInstaller( ChannelInboundHandler raftMessageHandler, NettyPipelineBuilderFactory pipelineBuilderFactory, LogProvider logProvider )
+ {
+ super( Protocol.Protocols.RAFT_1 );
+ this.raftMessageHandler = raftMessageHandler;
+ this.pipelineBuilderFactory = pipelineBuilderFactory;
+ this.log = logProvider.getLog( getClass() );
+ }
+
+ @Override
+ public void install( Channel channel ) throws Exception
+ {
+ pipelineBuilderFactory.create( channel, log )
+ .addFraming()
+ .add( new RaftMessageDecoder( new CoreReplicatedContentMarshal(), Clock.systemUTC() ) )
+ .add( raftMessageHandler )
+ .install();
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftServer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftServer.java
index 590cec6d9ad56..c0ca586185f0e 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftServer.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftServer.java
@@ -19,74 +19,46 @@
*/
package org.neo4j.causalclustering.core.consensus;
-import java.net.BindException;
-import java.util.concurrent.TimeUnit;
-
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-
-import java.time.Clock;
-
-import org.neo4j.causalclustering.VersionDecoder;
-import org.neo4j.causalclustering.VersionPrepender;
-import org.neo4j.causalclustering.core.replication.ReplicatedContent;
-import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
-import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
-import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
-import org.neo4j.causalclustering.messaging.Inbound;
-import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
-import org.neo4j.causalclustering.messaging.marshalling.RaftMessageDecoder;
+
+import java.net.BindException;
+import java.util.concurrent.TimeUnit;
+
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
-import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
-import static java.lang.String.format;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.raft_listen_address;
-public class RaftServer extends LifecycleAdapter implements Inbound
+public class RaftServer extends LifecycleAdapter
{
- private final ChannelMarshal marshal;
- private final PipelineHandlerAppender pipelineAppender;
+ private final ChannelInitializer channelInitializer;
+
private final ListenSocketAddress listenAddress;
- private final LogProvider logProvider;
private final Log log;
private final Log userLog;
- private final Monitors monitors;
- private final Clock clock;
- private MessageHandler messageHandler;
private EventLoopGroup workerGroup;
private Channel channel;
private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" );
- public RaftServer( ChannelMarshal marshal, PipelineHandlerAppender pipelineAppender, Config config, LogProvider logProvider,
- LogProvider userLogProvider, Monitors monitors, Clock clock )
+ public RaftServer( ChannelInitializer channelInitializer, Config config, LogProvider logProvider, LogProvider userLogProvider )
{
- this.marshal = marshal;
- this.pipelineAppender = pipelineAppender;
+ this.channelInitializer = channelInitializer;
this.listenAddress = config.get( raft_listen_address );
- this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() );
this.userLog = userLogProvider.getLog( getClass() );
- this.monitors = monitors;
- this.clock = clock;
}
@Override
@@ -126,30 +98,7 @@ private void startNettyServer()
.channel( NioServerSocketChannel.class )
.option( ChannelOption.SO_REUSEADDR, true )
.localAddress( listenAddress.socketAddress() )
- .childHandler( new ChannelInitializer()
- {
- @Override
- protected void initChannel( SocketChannel ch ) throws Exception
- {
- ChannelPipeline pipeline = ch.pipeline();
-
- pipelineAppender.addPipelineHandlerForServer( pipeline, ch );
-
- pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
- pipeline.addLast( new LengthFieldPrepender( 4 ) );
-
- pipeline.addLast( new VersionDecoder( logProvider ) );
- pipeline.addLast( new VersionPrepender() );
-
- pipeline.addLast( new RaftMessageDecoder( marshal, clock ) );
- pipeline.addLast( new RaftMessageHandler() );
-
- pipeline.addLast( new ExceptionLoggingHandler( log ) );
- pipeline.addLast( new ExceptionMonitoringHandler(
- monitors.newMonitor( ExceptionMonitoringHandler.Monitor.class, RaftServer.class ) ) );
- pipeline.addLast( new ExceptionSwallowingHandler() );
- }
- } );
+ .childHandler( channelInitializer );
try
{
@@ -168,28 +117,4 @@ protected void initChannel( SocketChannel ch ) throws Exception
}
}
}
-
- @Override
- public void registerHandler( Inbound.MessageHandler handler )
- {
- this.messageHandler = handler;
- }
-
- private class RaftMessageHandler extends SimpleChannelInboundHandler
- {
- @Override
- protected void channelRead0( ChannelHandlerContext channelHandlerContext,
- RaftMessages.ReceivedInstantClusterIdAwareMessage incomingMessage ) throws Exception
- {
- try
- {
- messageHandler.handle( incomingMessage );
- }
- catch ( Exception e )
- {
- log.error( format( "Failed to process message %s", incomingMessage ), e );
- }
- }
- }
-
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java
index 40e4d47f356c6..25a0f7a0f394d 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java
@@ -54,7 +54,7 @@
import org.neo4j.causalclustering.core.state.snapshot.CoreStateDownloaderService;
import org.neo4j.causalclustering.core.state.storage.DurableStateStorage;
import org.neo4j.causalclustering.core.state.storage.StateStorage;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
+import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.io.fs.FileSystemAbstraction;
@@ -94,12 +94,12 @@ public class CoreServerModule
private final JobScheduler jobScheduler;
private final LogProvider logProvider;
private final PlatformModule platformModule;
- private final PipelineHandlerAppender pipelineAppender;
+ private final PipelineWrapper clientPipelineWrapper;
public CoreServerModule( IdentityModule identityModule, final PlatformModule platformModule, ConsensusModule consensusModule,
CoreStateMachinesModule coreStateMachinesModule, ClusteringModule clusteringModule, ReplicationModule replicationModule,
LocalDatabase localDatabase, Supplier dbHealthSupplier,
- File clusterStateDirectory, PipelineHandlerAppender pipelineAppender )
+ File clusterStateDirectory, PipelineWrapper serverPipelineWrapper, PipelineWrapper clientPipelineWrapper )
{
this.identityModule = identityModule;
this.coreStateMachinesModule = coreStateMachinesModule;
@@ -108,7 +108,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
this.localDatabase = localDatabase;
this.dbHealthSupplier = dbHealthSupplier;
this.platformModule = platformModule;
- this.pipelineAppender = pipelineAppender;
+ this.clientPipelineWrapper = clientPipelineWrapper;
this.config = platformModule.config;
this.jobScheduler = platformModule.jobScheduler;
@@ -161,7 +161,7 @@ public CoreServerModule( IdentityModule identityModule, final PlatformModule pla
platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable,
snapshotService, config, platformModule.monitors, new CheckpointerSupplier( platformModule.dependencies ), fileSystem, platformModule.pageCache,
- platformModule.storeCopyCheckPointMutex, pipelineAppender );
+ platformModule.storeCopyCheckPointMutex, serverPipelineWrapper );
RaftLogPruner raftLogPruner = new RaftLogPruner( consensusModule.raftMachine(), commandApplicationProcess, platformModule.clock );
dependencies.satisfyDependency( raftLogPruner );
@@ -179,7 +179,7 @@ private CoreStateDownloader createCoreStateDownloader( LifeSupport servicesToSto
{
long inactivityTimeoutMillis = platformModule.config.get( CausalClusteringSettings.catch_up_client_inactivity_timeout ).toMillis();
CatchUpClient catchUpClient = platformModule.life.add(
- new CatchUpClient( logProvider, Clocks.systemClock(), inactivityTimeoutMillis, platformModule.monitors, pipelineAppender ) );
+ new CatchUpClient( logProvider, Clocks.systemClock(), inactivityTimeoutMillis, platformModule.monitors, clientPipelineWrapper ) );
RemoteStore remoteStore = new RemoteStore(
logProvider, platformModule.fileSystem, platformModule.pageCache, new StoreCopyClient( catchUpClient, logProvider ),
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineHandlerAppenderFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/DuplexPipelineWrapperFactory.java
similarity index 80%
rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineHandlerAppenderFactory.java
rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/DuplexPipelineWrapperFactory.java
index 0dd709a04b5bd..48055133ed84c 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineHandlerAppenderFactory.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/DuplexPipelineWrapperFactory.java
@@ -23,7 +23,9 @@
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.logging.LogProvider;
-public interface PipelineHandlerAppenderFactory
+public interface DuplexPipelineWrapperFactory
{
- PipelineHandlerAppender create( Config config, Dependencies dependencies, LogProvider logProvider );
+ PipelineWrapper forServer( Config config, Dependencies dependencies, LogProvider logProvider );
+
+ PipelineWrapper forClient( Config config, Dependencies dependencies, LogProvider logProvider );
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineHandlerAppender.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java
similarity index 69%
rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineHandlerAppender.java
rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java
index f684509b23329..adad4e9f152c6 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineHandlerAppender.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/PipelineWrapper.java
@@ -20,17 +20,20 @@
package org.neo4j.causalclustering.handlers;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelHandler;
-public interface PipelineHandlerAppender
-{
- default void addPipelineHandlerForServer( ChannelPipeline pipeline, Channel ch ) throws Exception
- {
+import java.util.List;
- }
+import static java.util.Collections.emptyList;
- default void addPipelineHandlerForClient( ChannelPipeline pipeline, Channel ch ) throws Exception
+/**
+ * Intended to provide handlers which can wrap an entire sub-pipeline in a neutral
+ * fashion, e.g compression, integrity checks, ...
+ */
+public interface PipelineWrapper
+{
+ default List handlersFor( Channel channel ) throws Exception
{
-
+ return emptyList();
}
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java
new file mode 100644
index 0000000000000..03ebaabc3544e
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/VoidPipelineWrapperFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.handlers;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.neo4j.kernel.configuration.Config;
+import org.neo4j.kernel.impl.util.Dependencies;
+import org.neo4j.logging.LogProvider;
+
+import static java.util.Collections.emptyList;
+
+public class VoidPipelineWrapperFactory implements DuplexPipelineWrapperFactory
+{
+ public static final PipelineWrapper VOID_WRAPPER = new PipelineWrapper()
+ {
+ @Override
+ public List handlersFor( Channel channel ) throws Exception
+ {
+ return emptyList();
+ }
+ };
+
+ @Override
+ public PipelineWrapper forServer( Config config, Dependencies dependencies, LogProvider logProvider )
+ {
+ return VOID_WRAPPER;
+ }
+
+ @Override
+ public PipelineWrapper forClient( Config config, Dependencies dependencies, LogProvider logProvider )
+ {
+ return VOID_WRAPPER;
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java
new file mode 100644
index 0000000000000..55c7800625aa4
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/Channel.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.messaging;
+
+import io.netty.util.concurrent.Future;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface Channel
+{
+ void dispose();
+
+ boolean isDisposed();
+
+ boolean isOpen();
+
+ CompletableFuture write( Object msg );
+
+ CompletableFuture writeAndFlush( Object msg );
+
+ static CompletableFuture convertNettyFuture( Future> nettyFuture )
+ {
+ CompletableFuture promise = new CompletableFuture<>();
+ nettyFuture.addListener( future ->
+ {
+ if ( future.isSuccess() )
+ {
+ promise.complete( null );
+ }
+ else
+ {
+ promise.completeExceptionally( future.cause() );
+ }
+ } );
+ return promise;
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java
new file mode 100644
index 0000000000000..60d9d6bf6fa2b
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ChannelInterceptor.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.messaging;
+
+import io.netty.channel.Channel;
+import io.netty.util.concurrent.Future;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+
+/**
+ * Allows intercepting the writing to a channel.
+ */
+public interface ChannelInterceptor
+{
+ void write( BiFunction> writer, io.netty.channel.Channel channel, Object msg,
+ CompletableFuture promise );
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java
new file mode 100644
index 0000000000000..93c8670431357
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/HandshakeGate.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.messaging;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.Future;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+
+import org.neo4j.causalclustering.protocol.handshake.ClientHandshakeException;
+import org.neo4j.causalclustering.protocol.handshake.HandshakeClientInitializer;
+import org.neo4j.causalclustering.protocol.handshake.HandshakeFinishedEvent;
+
+/**
+ * Gates messages written before the handshake has completed. The handshake is finalized
+ * by firing a HandshakeFinishedEvent (as a netty user event) in {@link HandshakeClientInitializer}.
+ */
+public class HandshakeGate implements ChannelInterceptor
+{
+ public static final String HANDSHAKE_GATE = "HandshakeGate";
+
+ private final CompletableFuture handshakePromise = new CompletableFuture<>();
+
+ HandshakeGate( Channel channel )
+ {
+ channel.pipeline().addFirst( HANDSHAKE_GATE, new ChannelInboundHandlerAdapter()
+ {
+ @Override
+ public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception
+ {
+ if ( HandshakeFinishedEvent.getSuccess().equals( evt ) )
+ {
+ handshakePromise.complete( null );
+ }
+ else if ( HandshakeFinishedEvent.getFailure().equals( evt ) )
+ {
+ handshakePromise.completeExceptionally( new ClientHandshakeException( "Handshake failed" ) );
+ channel.close();
+ }
+ else
+ {
+ super.userEventTriggered( ctx, evt );
+ }
+ }
+ } );
+ }
+
+ @Override
+ public void write( BiFunction> writer, Channel channel, Object msg, CompletableFuture promise )
+ {
+ handshakePromise.whenComplete( ( ignored, failure ) ->
+ writer.apply( channel, msg ).addListener( x -> promise.complete( null ) ) );
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/IdleChannelReaperHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/IdleChannelReaperHandler.java
index 0623113818687..658002b108216 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/IdleChannelReaperHandler.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/IdleChannelReaperHandler.java
@@ -29,11 +29,11 @@
public class IdleChannelReaperHandler extends ChannelDuplexHandler
{
- private NonBlockingChannels nonBlockingChannels;
+ private ReconnectingChannels channels;
- public IdleChannelReaperHandler( NonBlockingChannels nonBlockingChannels )
+ public IdleChannelReaperHandler( ReconnectingChannels channels )
{
- this.nonBlockingChannels = nonBlockingChannels;
+ this.channels = channels;
}
@Override
@@ -45,7 +45,7 @@ public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws E
final AdvertisedSocketAddress address =
new AdvertisedSocketAddress( socketAddress.getHostName(), socketAddress.getPort() );
- nonBlockingChannels.remove( address );
+ channels.remove( address );
}
}
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/RaftChannelInitializer.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/RaftChannelInitializer.java
deleted file mode 100644
index 9bd811306a3a2..0000000000000
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/RaftChannelInitializer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (c) 2002-2018 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-package org.neo4j.causalclustering.messaging;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldPrepender;
-
-import org.neo4j.causalclustering.VersionPrepender;
-import org.neo4j.causalclustering.core.replication.ReplicatedContent;
-import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
-import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
-import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
-import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
-import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
-import org.neo4j.causalclustering.messaging.marshalling.RaftMessageEncoder;
-import org.neo4j.kernel.monitoring.Monitors;
-import org.neo4j.logging.Log;
-import org.neo4j.logging.LogProvider;
-
-public class RaftChannelInitializer extends ChannelInitializer
-{
- private final ChannelMarshal marshal;
- private final Log log;
- private final Monitors monitors;
- private final PipelineHandlerAppender pipelineAppender;
-
- public RaftChannelInitializer( ChannelMarshal marshal, LogProvider logProvider,
- Monitors monitors, PipelineHandlerAppender pipelineAppender )
- {
- this.marshal = marshal;
- this.log = logProvider.getLog( getClass() );
- this.monitors = monitors;
- this.pipelineAppender = pipelineAppender;
- }
-
- @Override
- protected void initChannel( SocketChannel ch ) throws Exception
- {
- ChannelPipeline pipeline = ch.pipeline();
-
- pipelineAppender.addPipelineHandlerForClient( pipeline, ch );
-
- pipeline.addLast( "frameEncoder", new LengthFieldPrepender( 4 ) );
- pipeline.addLast( new VersionPrepender() );
- pipeline.addLast( "raftMessageEncoder", new RaftMessageEncoder( marshal ) );
-
- pipeline.addLast( new ExceptionLoggingHandler( log ) );
- pipeline.addLast( new ExceptionMonitoringHandler(
- monitors.newMonitor( ExceptionMonitoringHandler.Monitor.class, SenderService.class ) ) );
- pipeline.addLast( new ExceptionSwallowingHandler() );
- }
-}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java
similarity index 52%
rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannel.java
rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java
index 3b19d875c8135..243c18b291812 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannel.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannel.java
@@ -20,38 +20,52 @@
package org.neo4j.causalclustering.messaging;
import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoop;
+import io.netty.channel.ChannelOutboundInvoker;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.Promise;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
+import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.logging.Log;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-class NonBlockingChannel
+public class ReconnectingChannel implements Channel
{
- private static final int CONNECT_BACKOFF_MS = 250;
-
private final Log log;
private final Bootstrap bootstrap;
- private final EventLoop eventLoop;
private final SocketAddress destination;
+ private final Function channelInterceptorFactory;
+ private final TimeoutStrategy connectionBackoffStrategy;
- private volatile Channel channel;
+ private volatile io.netty.channel.Channel channel;
private volatile ChannelFuture fChannel;
private volatile boolean disposed;
- NonBlockingChannel( Bootstrap bootstrap, EventLoop eventLoop, final SocketAddress destination, final Log log )
+ private TimeoutStrategy.Timeout connectionBackoff;
+ private ChannelInterceptor channelInterceptor;
+
+ ReconnectingChannel( Bootstrap bootstrap, final SocketAddress destination, final Log log,
+ Function channelInterceptorFactory )
+ {
+ this( bootstrap, destination, log, channelInterceptorFactory, new ExponentialBackoffStrategy( 10, 2000, MILLISECONDS ) );
+ }
+
+ private ReconnectingChannel( Bootstrap bootstrap, final SocketAddress destination, final Log log,
+ Function channelInterceptorFactory, TimeoutStrategy connectionBackoffStrategy )
{
this.bootstrap = bootstrap;
- this.eventLoop = eventLoop;
this.destination = destination;
this.log = log;
+ this.channelInterceptorFactory = channelInterceptorFactory;
+ this.connectionBackoffStrategy = connectionBackoffStrategy;
}
void start()
@@ -70,14 +84,18 @@ else if ( fChannel != null && !fChannel.isDone() )
return;
}
+ connectionBackoff = connectionBackoffStrategy.newTimeout();
+
fChannel = bootstrap.connect( destination.socketAddress() );
channel = fChannel.channel();
+ channelInterceptor = channelInterceptorFactory.apply( channel );
fChannel.addListener( ( ChannelFuture f ) ->
{
if ( !f.isSuccess() )
{
- f.channel().eventLoop().schedule( this::tryConnect, CONNECT_BACKOFF_MS, MILLISECONDS );
+ f.channel().eventLoop().schedule( this::tryConnect, connectionBackoff.getMillis(), MILLISECONDS );
+ connectionBackoff.increment();
}
else
{
@@ -85,19 +103,45 @@ else if ( fChannel != null && !fChannel.isDone() )
f.channel().closeFuture().addListener( closed ->
{
log.warn( String.format( "Lost connection to: %s (%s)", destination, channel.remoteAddress() ) );
- f.channel().eventLoop().schedule( this::tryConnect, CONNECT_BACKOFF_MS, MILLISECONDS );
+ connectionBackoff = connectionBackoffStrategy.newTimeout();
+ f.channel().eventLoop().schedule( this::tryConnect, 0, MILLISECONDS );
} );
}
} );
}
+ @Override
public synchronized void dispose()
{
disposed = true;
channel.close();
}
- public Future send( Object msg )
+ @Override
+ public boolean isDisposed()
+ {
+ return disposed;
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return channel.isOpen();
+ }
+
+ @Override
+ public CompletableFuture write( Object msg )
+ {
+ return doWrite( msg, ChannelOutboundInvoker::write );
+ }
+
+ @Override
+ public CompletableFuture writeAndFlush( Object msg )
+ {
+ return doWrite( msg, ChannelOutboundInvoker::writeAndFlush );
+ }
+
+ private CompletableFuture doWrite( Object msg, BiFunction> writer )
{
if ( disposed )
{
@@ -106,12 +150,14 @@ public Future send( Object msg )
if ( channel.isActive() )
{
- return channel.writeAndFlush( msg );
+ CompletableFuture promise = new CompletableFuture<>();
+ channelInterceptor.write( writer, channel, msg, promise );
+ return promise;
}
else
{
- Promise promise = eventLoop.newPromise();
- deferredWrite( msg, fChannel, promise, true );
+ CompletableFuture promise = new CompletableFuture<>();
+ deferredWrite( msg, fChannel, promise, true, writer );
return promise;
}
}
@@ -122,23 +168,30 @@ public Future send( Object msg )
* is sent right after the non-blocking channel was setup and before the server is ready
* to accept a connection. This happens frequently in tests.
*/
- private void deferredWrite( Object msg, ChannelFuture channelFuture, Promise> promise, boolean firstAttempt )
+ private void deferredWrite( Object msg, ChannelFuture channelFuture, CompletableFuture promise, boolean firstAttempt,
+ BiFunction> writer )
{
channelFuture.addListener( (ChannelFutureListener) f ->
{
if ( f.isSuccess() )
{
- f.channel().writeAndFlush( msg ).addListener( x -> promise.setSuccess( null ) );
+ channelInterceptor.write( writer, f.channel(), msg, promise );
}
else if ( firstAttempt )
{
tryConnect();
- deferredWrite( msg, fChannel, promise, false );
+ deferredWrite( msg, fChannel, promise, false, writer );
}
else
{
- promise.setFailure( f.cause() );
+ promise.completeExceptionally( f.cause() );
}
} );
}
+
+ @Override
+ public String toString()
+ {
+ return "ReconnectingChannel{" + "channel=" + channel + ", disposed=" + disposed + '}';
+ }
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannels.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannels.java
similarity index 79%
rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannels.java
rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannels.java
index 00d8b1e0fa6b5..aede3ede0c929 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/NonBlockingChannels.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/ReconnectingChannels.java
@@ -24,9 +24,9 @@
import org.neo4j.helpers.AdvertisedSocketAddress;
-public class NonBlockingChannels
+public class ReconnectingChannels
{
- private final ConcurrentHashMap lazyChannelMap =
+ private final ConcurrentHashMap lazyChannelMap =
new ConcurrentHashMap<>();
public int size()
@@ -34,17 +34,17 @@ public int size()
return lazyChannelMap.size();
}
- public NonBlockingChannel get( AdvertisedSocketAddress to )
+ public ReconnectingChannel get( AdvertisedSocketAddress to )
{
return lazyChannelMap.get( to );
}
- public NonBlockingChannel putIfAbsent( AdvertisedSocketAddress to, NonBlockingChannel timestampedLazyChannel )
+ public ReconnectingChannel putIfAbsent( AdvertisedSocketAddress to, ReconnectingChannel timestampedLazyChannel )
{
return lazyChannelMap.putIfAbsent( to, timestampedLazyChannel );
}
- public Collection values()
+ public Collection values()
{
return lazyChannelMap.values();
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java
index a51b58260eb23..62c6fee1ef78d 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SenderService.java
@@ -20,15 +20,15 @@
package org.neo4j.causalclustering.messaging;
import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.Future;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.causalclustering.messaging.monitoring.MessageQueueMonitor;
@@ -43,9 +43,9 @@
public class SenderService extends LifecycleAdapter implements Outbound
{
- private NonBlockingChannels nonBlockingChannels;
+ private ReconnectingChannels channels;
- private final ChannelInitializer channelInitializer;
+ private final ChannelInitializer channelInitializer;
private final ReadWriteLock serviceLock = new ReentrantReadWriteLock();
private final Log log;
private final Monitors monitors;
@@ -55,18 +55,18 @@ public class SenderService extends LifecycleAdapter implements Outbound channelInitializer, LogProvider logProvider, Monitors monitors )
+ public SenderService( ChannelInitializer channelInitializer, LogProvider logProvider, Monitors monitors )
{
this.channelInitializer = channelInitializer;
this.log = logProvider.getLog( getClass() );
this.monitors = monitors;
- this.nonBlockingChannels = new NonBlockingChannels();
+ this.channels = new ReconnectingChannels();
}
@Override
public void send( AdvertisedSocketAddress to, Message message, boolean block )
{
- Future future;
+ AtomicReference> future = new AtomicReference<>( new CompletableFuture<>() );
serviceLock.readLock().lock();
try
{
@@ -75,7 +75,9 @@ public void send( AdvertisedSocketAddress to, Message message, boolean block )
return;
}
- future = channel( to ).send( message );
+ Channel channel = channel( to );
+
+ future.set( channel.writeAndFlush( message ) );
}
finally
{
@@ -84,34 +86,34 @@ public void send( AdvertisedSocketAddress to, Message message, boolean block )
if ( block )
{
- future.awaitUninterruptibly();
+ future.get().join();
}
}
- private NonBlockingChannel channel( AdvertisedSocketAddress to )
+ private Channel channel( AdvertisedSocketAddress destination )
{
- MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, NonBlockingChannel.class );
- NonBlockingChannel nonBlockingChannel = nonBlockingChannels.get( to );
+ MessageQueueMonitor monitor = monitors.newMonitor( MessageQueueMonitor.class, ReconnectingChannel.class );
+ ReconnectingChannel channel = channels.get( destination );
- if ( nonBlockingChannel == null )
+ if ( channel == null )
{
- nonBlockingChannel = new NonBlockingChannel( bootstrap, eventLoopGroup.next(), to, log );
- nonBlockingChannel.start();
- NonBlockingChannel existingNonBlockingChannel = nonBlockingChannels.putIfAbsent( to, nonBlockingChannel );
+ channel = new ReconnectingChannel( bootstrap, destination, log, HandshakeGate::new );
+ channel.start();
+ ReconnectingChannel existingNonBlockingChannel = channels.putIfAbsent( destination, channel );
if ( existingNonBlockingChannel != null )
{
- nonBlockingChannel.dispose();
- nonBlockingChannel = existingNonBlockingChannel;
+ channel.dispose();
+ channel = existingNonBlockingChannel;
}
else
{
- log.info( "Creating channel to: [%s] ", to );
+ log.info( "Creating channel to: [%s] ", destination );
}
}
- monitor.register( to );
- return nonBlockingChannel;
+ monitor.register( destination );
+ return channel;
}
@Override
@@ -148,10 +150,10 @@ public synchronized void stop()
jobHandle = null;
}
- Iterator itr = nonBlockingChannels.values().iterator();
+ Iterator itr = channels.values().iterator();
while ( itr.hasNext() )
{
- NonBlockingChannel timestampedChannel = itr.next();
+ Channel timestampedChannel = itr.next();
timestampedChannel.dispose();
itr.remove();
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java
new file mode 100644
index 0000000000000..ef2c28f9f8517
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/SimpleNettyChannel.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.messaging;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.neo4j.logging.Log;
+
+public class SimpleNettyChannel implements Channel
+{
+ private final Log log;
+ private final io.netty.channel.Channel channel;
+ private volatile boolean disposed;
+
+ public SimpleNettyChannel( io.netty.channel.Channel channel, Log log )
+ {
+ this.channel = channel;
+ this.log = log;
+ }
+
+ @Override
+ public boolean isDisposed()
+ {
+ return disposed;
+ }
+
+ @Override
+ public synchronized void dispose()
+ {
+ log.info( "Disposing channel: " + channel );
+ disposed = true;
+ channel.close();
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return channel.isOpen();
+ }
+
+ @Override
+ public CompletableFuture write( Object msg )
+ {
+ checkDisposed();
+ return Channel.convertNettyFuture( channel.write( msg ) );
+ }
+
+ @Override
+ public CompletableFuture writeAndFlush( Object msg )
+ {
+ checkDisposed();
+ return Channel.convertNettyFuture( channel.writeAndFlush( msg ) );
+ }
+
+ private void checkDisposed()
+ {
+ if ( disposed )
+ {
+ throw new IllegalStateException( "sending on disposed channel" );
+ }
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java
new file mode 100644
index 0000000000000..f72802d969745
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilder.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.neo4j.causalclustering.messaging.HandshakeGate;
+import org.neo4j.logging.Log;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Builder and installer of pipelines.
+ *
+ * Makes sures to install sane last-resort error handling and
+ * handles the construction of common patterns, like framing.
+ */
+public class NettyPipelineBuilder
+{
+ private final ChannelPipeline pipeline;
+ private final Log log;
+ private final List handlers = new ArrayList<>();
+
+ private NettyPipelineBuilder( ChannelPipeline pipeline, Log log )
+ {
+ this.pipeline = pipeline;
+ this.log = log;
+ }
+
+ public static NettyPipelineBuilder with( ChannelPipeline pipeline, Log log )
+ {
+ return new NettyPipelineBuilder( pipeline, log );
+ }
+
+ public NettyPipelineBuilder addFraming()
+ {
+ add( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
+ add( new LengthFieldPrepender( 4 ) );
+ return this;
+ }
+
+ public NettyPipelineBuilder add( List newHandlers )
+ {
+ handlers.addAll( newHandlers );
+ return this;
+ }
+
+ public NettyPipelineBuilder add( ChannelHandler... newHandlers )
+ {
+ return add( asList( newHandlers ) );
+ }
+
+ /**
+ * Installs the built pipeline and removes any old pipeline.
+ */
+ public void install()
+ {
+ clear();
+ handlers.forEach( pipeline::addLast );
+ installErrorHandling();
+ }
+
+ private void clear()
+ {
+ pipeline.names().stream()
+ .filter( this::isNotDefault )
+ .filter( this::isNotUserEvent )
+ .forEach( pipeline::remove );
+ }
+
+ private boolean isNotUserEvent( String name )
+ {
+ return !HandshakeGate.HANDSHAKE_GATE.equals( name );
+ }
+
+ private boolean isNotDefault( String name )
+ {
+ return pipeline.get( name ) != null;
+ }
+
+ private void installErrorHandling()
+ {
+ pipeline.addLast( new ChannelDuplexHandler()
+ {
+ @Override
+ public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) throws Exception
+ {
+ log.error( "Exception in inbound", cause );
+ }
+
+ @Override
+ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
+ {
+ log.error( "Unhandled inbound message: " + msg );
+ }
+
+ @Override
+ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception
+ {
+ if ( !promise.isVoid() )
+ {
+ promise.addListener( (ChannelFutureListener) future ->
+ {
+ if ( !future.isSuccess() )
+ {
+ log.error( "Exception in outbound", future.cause() );
+ }
+ } );
+ }
+ ctx.write( msg, promise );
+ }
+ } );
+
+ pipeline.addFirst( new ChannelOutboundHandlerAdapter()
+ {
+ @Override
+ public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) throws Exception
+ {
+ log.error( "Exception in outbound", cause );
+ }
+
+ @Override
+ public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception
+ {
+ if ( !(msg instanceof ByteBuf) )
+ {
+ log.error( "Unhandled outbound message: " + msg );
+ }
+ else
+ {
+ ctx.write( msg );
+ }
+ }
+ } );
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java
new file mode 100644
index 0000000000000..8976413811be6
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/NettyPipelineBuilderFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+
+import org.neo4j.causalclustering.handlers.PipelineWrapper;
+import org.neo4j.logging.Log;
+
+public class NettyPipelineBuilderFactory
+{
+ private final PipelineWrapper wrapper;
+
+ public NettyPipelineBuilderFactory( PipelineWrapper wrapper )
+ {
+ this.wrapper = wrapper;
+ }
+
+ public NettyPipelineBuilder create( Channel channel, Log log ) throws Exception
+ {
+ ChannelPipeline pipeline = channel.pipeline();
+ NettyPipelineBuilder builder = NettyPipelineBuilder.with( pipeline, log );
+ for ( ChannelHandler handler : wrapper.handlersFor( channel ) )
+ {
+ builder.add( handler );
+ }
+ return builder;
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/Protocol.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/Protocol.java
new file mode 100644
index 0000000000000..652d8b4e6d075
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/Protocol.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol;
+
+public interface Protocol
+{
+ String identifier();
+
+ int version();
+
+ enum Identifier
+ {
+ RAFT,
+ CATCHUP;
+
+ public String canonicalName()
+ {
+ return name().toLowerCase();
+ }
+ }
+
+ enum Protocols implements Protocol
+ {
+ RAFT_1( Identifier.RAFT, 1 ),
+ CATCHUP_1( Identifier.CATCHUP, 1 );
+
+ private final int version;
+ private final Identifier identifier;
+
+ Protocols( Identifier identifier, int version )
+ {
+ this.identifier = identifier;
+ this.version = version;
+ }
+
+ @Override
+ public String identifier()
+ {
+ return this.identifier.canonicalName();
+ }
+
+ @Override
+ public int version()
+ {
+ return version;
+ }
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/ProtocolInstaller.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/ProtocolInstaller.java
new file mode 100644
index 0000000000000..effd7917987c4
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/ProtocolInstaller.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol;
+
+import io.netty.channel.Channel;
+
+public abstract class ProtocolInstaller
+{
+ private final Protocol protocol;
+
+ protected ProtocolInstaller( Protocol protocol )
+ {
+ this.protocol = protocol;
+ }
+
+ public abstract void install( Channel channel ) throws Exception;
+
+ public final Protocol protocol()
+ {
+ return protocol;
+ }
+
+ public interface Orientation
+ {
+ interface Server extends Orientation
+ {
+ }
+
+ interface Client extends Orientation
+ {
+ }
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/ProtocolInstallerRepository.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/ProtocolInstallerRepository.java
new file mode 100644
index 0000000000000..fc7bb07fad719
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/ProtocolInstallerRepository.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProtocolInstallerRepository
+{
+ private final Map> installers;
+
+ public ProtocolInstallerRepository( Collection> installers )
+ {
+ Map> tempInstallers = new HashMap<>();
+
+ installers.forEach( installer -> addTo( tempInstallers, installer ) );
+
+ this.installers = Collections.unmodifiableMap( tempInstallers );
+ }
+
+ private void addTo( Map> tempServerMap, ProtocolInstaller installer )
+ {
+ Protocol protocol = installer.protocol();
+ ProtocolInstaller old = tempServerMap.put( protocol, installer );
+ if ( old != null )
+ {
+ throw new IllegalArgumentException(
+ String.format( "Duplicate protocol installers for protocol %s", protocol )
+ );
+ }
+ }
+
+ public ProtocolInstaller installerFor( Protocol protocol )
+ {
+ ProtocolInstaller protocolInstaller = installers.get( protocol );
+ if ( protocolInstaller == null )
+ {
+ throw new IllegalStateException( String.format( "Installer for requested protocol %s does not exist", protocol ) );
+ }
+ else
+ {
+ return protocolInstaller;
+ }
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ApplicationProtocolRequest.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ApplicationProtocolRequest.java
new file mode 100644
index 0000000000000..e4f2caa4ffa15
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ApplicationProtocolRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol.handshake;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class ApplicationProtocolRequest implements ServerMessage
+{
+ private final String protocolName;
+ private final Set versions;
+
+ ApplicationProtocolRequest( String protocolName, Set versions )
+ {
+ this.protocolName = protocolName;
+ this.versions = versions;
+ }
+
+ @Override
+ public void dispatch( ServerMessageHandler handler )
+ {
+ handler.handle( this );
+ }
+
+ String protocolName()
+ {
+ return protocolName;
+ }
+
+ public Set versions()
+ {
+ return versions;
+ }
+
+ @Override
+ public boolean equals( Object o )
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+ ApplicationProtocolRequest that = (ApplicationProtocolRequest) o;
+ return Objects.equals( protocolName, that.protocolName ) && Objects.equals( versions, that.versions );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash( protocolName, versions );
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ApplicationProtocolResponse.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ApplicationProtocolResponse.java
new file mode 100644
index 0000000000000..40197ddbcff37
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ApplicationProtocolResponse.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol.handshake;
+
+import java.util.Objects;
+
+import static org.neo4j.causalclustering.protocol.handshake.StatusCode.FAILURE;
+
+public class ApplicationProtocolResponse implements ClientMessage
+{
+ public static final ApplicationProtocolResponse NO_PROTOCOL = new ApplicationProtocolResponse( FAILURE, "", 0 );
+ private final StatusCode statusCode;
+ private final String protocolName;
+ private final int version;
+
+ ApplicationProtocolResponse( StatusCode statusCode, String protocolName, int version )
+ {
+ this.statusCode = statusCode;
+ this.protocolName = protocolName;
+ this.version = version;
+ }
+
+ @Override
+ public void dispatch( ClientMessageHandler handler )
+ {
+ handler.handle( this );
+ }
+
+ @Override
+ public boolean equals( Object o )
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+ ApplicationProtocolResponse that = (ApplicationProtocolResponse) o;
+ return version == that.version && Objects.equals( protocolName, that.protocolName );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash( protocolName, version );
+ }
+
+ public StatusCode statusCode()
+ {
+ return statusCode;
+ }
+
+ public String protocolName()
+ {
+ return protocolName;
+ }
+
+ public int version()
+ {
+ return version;
+ }
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/NoOpPipelineHandlerAppender.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientHandshakeException.java
similarity index 74%
rename from enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/NoOpPipelineHandlerAppender.java
rename to enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientHandshakeException.java
index 99b457a40560c..c7d6e2e9d6a14 100644
--- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/handlers/NoOpPipelineHandlerAppender.java
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientHandshakeException.java
@@ -17,14 +17,12 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-package org.neo4j.causalclustering.handlers;
+package org.neo4j.causalclustering.protocol.handshake;
-import org.neo4j.kernel.configuration.Config;
-import org.neo4j.logging.LogProvider;
-
-public class NoOpPipelineHandlerAppender implements PipelineHandlerAppender
+public class ClientHandshakeException extends Exception
{
- public NoOpPipelineHandlerAppender( Config config, LogProvider logProvider )
+ public ClientHandshakeException( String message )
{
+ super( message );
}
}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientMessage.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientMessage.java
new file mode 100644
index 0000000000000..c327187459605
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientMessage.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol.handshake;
+
+/**
+ * Messages to the client, generally responses.
+ */
+public interface ClientMessage
+{
+ void dispatch( ClientMessageHandler handler );
+}
diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientMessageDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientMessageDecoder.java
new file mode 100644
index 0000000000000..1f4da5e32f14e
--- /dev/null
+++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/protocol/handshake/ClientMessageDecoder.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.causalclustering.protocol.handshake;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.neo4j.causalclustering.messaging.marshalling.StringMarshal;
+
+/**
+ * Decodes messages received by the client.
+ */
+public class ClientMessageDecoder extends ByteToMessageDecoder
+{
+ @Override
+ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List