Skip to content

Commit

Permalink
Extract encryption specific code away from the causal clustering code
Browse files Browse the repository at this point in the history
Encryption is functionality that is not fundamental to the guarantees
 causal clustering provides. As such, it doesn't belong in this
 project. Methods to provide encrypted cluster communication will
 be available through separate offerings.
  • Loading branch information
digitalstain committed Sep 27, 2017
1 parent 27be54a commit 0455e21
Show file tree
Hide file tree
Showing 26 changed files with 966 additions and 262 deletions.
681 changes: 681 additions & 0 deletions enterprise/causal-clustering/LICENSE.txt

Large diffs are not rendered by default.

Expand Up @@ -19,6 +19,10 @@
*/ */
package org.neo4j.causalclustering.catchup; package org.neo4j.causalclustering.catchup;


import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
Expand All @@ -27,18 +31,14 @@
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;


import java.time.Clock; import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.neo4j.causalclustering.messaging.CatchUpRequest; import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress; import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;


import static java.lang.String.format; import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MICROSECONDS;
Expand All @@ -50,21 +50,21 @@ public class CatchUpClient extends LifecycleAdapter
private final Log log; private final Log log;
private final Clock clock; private final Clock clock;
private final Monitors monitors; private final Monitors monitors;
private final SslPolicy sslPolicy; private final PipelineHandlerAppender pipelineAppender;
private final long inactivityTimeoutMillis; private final long inactivityTimeoutMillis;
private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new ); private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new );


private NioEventLoopGroup eventLoopGroup; private NioEventLoopGroup eventLoopGroup;


public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Monitors monitors, public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Monitors monitors,
SslPolicy sslPolicy ) PipelineHandlerAppender pipelineAppender )
{ {
this.logProvider = logProvider; this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.clock = clock; this.clock = clock;
this.inactivityTimeoutMillis = inactivityTimeoutMillis; this.inactivityTimeoutMillis = inactivityTimeoutMillis;
this.monitors = monitors; this.monitors = monitors;
this.sslPolicy = sslPolicy; this.pipelineAppender = pipelineAppender;
} }


public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler ) public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler )
Expand Down Expand Up @@ -110,7 +110,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel
@Override @Override
protected void initChannel( SocketChannel ch ) throws Exception protected void initChannel( SocketChannel ch ) throws Exception
{ {
CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider, monitors, sslPolicy ); CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider, monitors, pipelineAppender );
} }
} ); } );


Expand Down
Expand Up @@ -47,9 +47,9 @@
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler; import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler; import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler; import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;


class CatchUpClientChannelPipeline class CatchUpClientChannelPipeline
{ {
Expand All @@ -58,16 +58,13 @@ private CatchUpClientChannelPipeline()
} }


static void initChannel( SocketChannel ch, CatchUpResponseHandler handler, LogProvider logProvider, static void initChannel( SocketChannel ch, CatchUpResponseHandler handler, LogProvider logProvider,
Monitors monitors, SslPolicy sslPolicy ) throws Exception Monitors monitors, PipelineHandlerAppender pipelineHandlerAppender ) throws Exception
{ {
CatchupClientProtocol protocol = new CatchupClientProtocol(); CatchupClientProtocol protocol = new CatchupClientProtocol();


ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();


if ( sslPolicy != null ) pipelineHandlerAppender.addPipelineHandlerForClient( pipeline, ch );
{
pipeline.addLast( sslPolicy.nettyClientHandler( ch ) );
}


pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) ); pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) ); pipeline.addLast( new LengthFieldPrepender( 4 ) );
Expand Down
Expand Up @@ -19,6 +19,11 @@
*/ */
package org.neo4j.causalclustering.catchup; package org.neo4j.causalclustering.catchup;


import java.net.BindException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandler;
Expand All @@ -33,11 +38,6 @@
import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;


import java.net.BindException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import org.neo4j.causalclustering.VersionDecoder; import org.neo4j.causalclustering.VersionDecoder;
import org.neo4j.causalclustering.VersionPrepender; import org.neo4j.causalclustering.VersionPrepender;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol.State; import org.neo4j.causalclustering.catchup.CatchupServerProtocol.State;
Expand All @@ -61,6 +61,7 @@
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler; import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler; import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler; import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.ListenSocketAddress; import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.helpers.NamedThreadFactory;
Expand All @@ -76,7 +77,6 @@
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;


public class CatchupServer extends LifecycleAdapter public class CatchupServer extends LifecycleAdapter
{ {
Expand All @@ -92,7 +92,7 @@ public class CatchupServer extends LifecycleAdapter
private final BooleanSupplier dataSourceAvailabilitySupplier; private final BooleanSupplier dataSourceAvailabilitySupplier;
private final FileSystemAbstraction fs; private final FileSystemAbstraction fs;
private final PageCache pageCache; private final PageCache pageCache;
private final SslPolicy sslPolicy; private final PipelineHandlerAppender pipelineAppender;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex; private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;


private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" ); private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" );
Expand All @@ -104,12 +104,12 @@ public class CatchupServer extends LifecycleAdapter
private final Supplier<CheckPointer> checkPointerSupplier; private final Supplier<CheckPointer> checkPointerSupplier;


public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supplier<StoreId> storeIdSupplier, public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<TransactionIdStore> transactionIdStoreSupplier,
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier,
CoreSnapshotService snapshotService, Config config, Monitors monitors, Supplier<CheckPointer> checkPointerSupplier, CoreSnapshotService snapshotService, Config config, Monitors monitors, Supplier<CheckPointer> checkPointerSupplier,
FileSystemAbstraction fs, PageCache pageCache, FileSystemAbstraction fs, PageCache pageCache,
StoreCopyCheckPointMutex storeCopyCheckPointMutex, SslPolicy sslPolicy ) StoreCopyCheckPointMutex storeCopyCheckPointMutex, PipelineHandlerAppender pipelineAppender )
{ {
this.snapshotService = snapshotService; this.snapshotService = snapshotService;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex; this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
Expand All @@ -126,7 +126,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
this.checkPointerSupplier = checkPointerSupplier; this.checkPointerSupplier = checkPointerSupplier;
this.fs = fs; this.fs = fs;
this.pageCache = pageCache; this.pageCache = pageCache;
this.sslPolicy = sslPolicy; this.pipelineAppender = pipelineAppender;
} }


@Override @Override
Expand All @@ -150,10 +150,7 @@ protected void initChannel( SocketChannel ch ) throws Exception


ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();


if ( sslPolicy != null ) pipelineAppender.addPipelineHandlerForServer( pipeline, ch );
{
pipeline.addLast( sslPolicy.nettyServerHandler( ch ) );
}


pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) ); pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) ); pipeline.addLast( new LengthFieldPrepender( 4 ) );
Expand Down
Expand Up @@ -35,6 +35,10 @@


public class CoreGraphDatabase extends GraphDatabaseFacade public class CoreGraphDatabase extends GraphDatabaseFacade
{ {
protected CoreGraphDatabase()
{
}

public CoreGraphDatabase( File storeDir, Config config, public CoreGraphDatabase( File storeDir, Config config,
GraphDatabaseFacadeFactory.Dependencies dependencies ) GraphDatabaseFacadeFactory.Dependencies dependencies )
{ {
Expand Down
Expand Up @@ -44,6 +44,9 @@
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory; import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure; import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure; import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure;
import org.neo4j.causalclustering.handlers.NoOpPipelineHandlerAppenderFactory;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppenderFactory;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader; import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader;
import org.neo4j.causalclustering.load_balancing.LoadBalancingProcessor; import org.neo4j.causalclustering.load_balancing.LoadBalancingProcessor;
Expand All @@ -70,7 +73,6 @@
import org.neo4j.kernel.api.bolt.BoltConnectionTracker; import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.api.exceptions.KernelException; import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.ssl.SslPolicyLoader;
import org.neo4j.kernel.enterprise.builtinprocs.EnterpriseBuiltInDbmsProcedures; import org.neo4j.kernel.enterprise.builtinprocs.EnterpriseBuiltInDbmsProcedures;
import org.neo4j.kernel.impl.api.SchemaWriteGuard; import org.neo4j.kernel.impl.api.SchemaWriteGuard;
import org.neo4j.kernel.impl.api.TransactionHeaderInformation; import org.neo4j.kernel.impl.api.TransactionHeaderInformation;
Expand Down Expand Up @@ -99,7 +101,6 @@
import org.neo4j.kernel.lifecycle.LifecycleStatus; import org.neo4j.kernel.lifecycle.LifecycleStatus;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;
import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageData;


import static org.neo4j.causalclustering.core.CausalClusteringSettings.raft_messages_log_path; import static org.neo4j.causalclustering.core.CausalClusteringSettings.raft_messages_log_path;
Expand All @@ -113,8 +114,8 @@ public class EnterpriseCoreEditionModule extends EditionModule
private final ConsensusModule consensusModule; private final ConsensusModule consensusModule;
private final ReplicationModule replicationModule; private final ReplicationModule replicationModule;
private final CoreTopologyService topologyService; private final CoreTopologyService topologyService;
private final LogProvider logProvider; protected final LogProvider logProvider;
private final Config config; protected final Config config;
private CoreStateMachinesModule coreStateMachinesModule; private CoreStateMachinesModule coreStateMachinesModule;


public enum RaftLogImplementation public enum RaftLogImplementation
Expand Down Expand Up @@ -198,17 +199,18 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke


IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory.get() ); IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory.get() );


SslPolicyLoader sslPolicyFactory = dependencies.satisfyDependency( SslPolicyLoader.create( config, logProvider ) ); ClusteringModule clusteringModule = getClusteringModule( platformModule, discoveryServiceFactory,
SslPolicy clusterSslPolicy = sslPolicyFactory.getPolicy( config.get( CausalClusteringSettings.ssl_policy ) ); clusterStateDirectory, identityModule, dependencies );

PipelineHandlerAppenderFactory appenderFactory = appenderFactory();
PipelineHandlerAppender pipelineHandlerAppender = appenderFactory.create( config, dependencies, logProvider );


ClusteringModule clusteringModule = new ClusteringModule( discoveryServiceFactory, identityModule.myself(),
platformModule, clusterStateDirectory.get(), clusterSslPolicy );
topologyService = clusteringModule.topologyService(); topologyService = clusteringModule.topologyService();


long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis(); long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis();


final SenderService raftSender = new SenderService( final SenderService raftSender = new SenderService(
new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider, monitors, clusterSslPolicy ), new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider, monitors, pipelineHandlerAppender ),
logProvider, platformModule.monitors ); logProvider, platformModule.monitors );
life.add( raftSender ); life.add( raftSender );


Expand Down Expand Up @@ -246,7 +248,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke


CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule, CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule,
coreStateMachinesModule, replicationModule, clusterStateDirectory.get(), clusteringModule, localDatabase, coreStateMachinesModule, replicationModule, clusterStateDirectory.get(), clusteringModule, localDatabase,
messageLogger, databaseHealthSupplier, clusterSslPolicy ); messageLogger, databaseHealthSupplier, pipelineHandlerAppender );


editionInvariants( platformModule, dependencies, config, logging, life ); editionInvariants( platformModule, dependencies, config, logging, life );


Expand All @@ -256,6 +258,20 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
life.add( coreServerModule.membershipWaiterLifecycle ); life.add( coreServerModule.membershipWaiterLifecycle );
} }


protected ClusteringModule getClusteringModule( PlatformModule platformModule,
DiscoveryServiceFactory discoveryServiceFactory,
ClusterStateDirectory clusterStateDirectory,
IdentityModule identityModule, Dependencies dependencies )
{
return new ClusteringModule( discoveryServiceFactory, identityModule.myself(),
platformModule, clusterStateDirectory.get() );
}

protected PipelineHandlerAppenderFactory appenderFactory()
{
return new NoOpPipelineHandlerAppenderFactory();
}

@Override @Override
protected void createIdComponents( PlatformModule platformModule, Dependencies dependencies, protected void createIdComponents( PlatformModule platformModule, Dependencies dependencies,
IdGeneratorFactory editionIdGeneratorFactory ) IdGeneratorFactory editionIdGeneratorFactory )
Expand Down
Expand Up @@ -19,6 +19,9 @@
*/ */
package org.neo4j.causalclustering.core.consensus; package org.neo4j.causalclustering.core.consensus;


import java.net.BindException;
import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
Expand All @@ -33,16 +36,14 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.LengthFieldPrepender;


import java.net.BindException;
import java.util.concurrent.TimeUnit;

import org.neo4j.causalclustering.VersionDecoder; import org.neo4j.causalclustering.VersionDecoder;
import org.neo4j.causalclustering.VersionPrepender; import org.neo4j.causalclustering.VersionPrepender;
import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.replication.ReplicatedContent; import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler; import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler; import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler; import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.messaging.Inbound; import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
import org.neo4j.causalclustering.messaging.marshalling.RaftMessageDecoder; import org.neo4j.causalclustering.messaging.marshalling.RaftMessageDecoder;
Expand All @@ -54,15 +55,14 @@
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;


import static java.lang.String.format; import static java.lang.String.format;


public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.ClusterIdAwareMessage> public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.ClusterIdAwareMessage>
{ {
private static final Setting<ListenSocketAddress> setting = CausalClusteringSettings.raft_listen_address; private static final Setting<ListenSocketAddress> setting = CausalClusteringSettings.raft_listen_address;
private final ChannelMarshal<ReplicatedContent> marshal; private final ChannelMarshal<ReplicatedContent> marshal;
private final SslPolicy sslPolicy; private final PipelineHandlerAppender pipelineAppender;
private final ListenSocketAddress listenAddress; private final ListenSocketAddress listenAddress;
private final LogProvider logProvider; private final LogProvider logProvider;
private final Log log; private final Log log;
Expand All @@ -75,11 +75,11 @@ public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages


private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" ); private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" );


public RaftServer( ChannelMarshal<ReplicatedContent> marshal, SslPolicy sslPolicy, Config config, public RaftServer( ChannelMarshal<ReplicatedContent> marshal, PipelineHandlerAppender pipelineAppender, Config config,
LogProvider logProvider, LogProvider userLogProvider, Monitors monitors ) LogProvider logProvider, LogProvider userLogProvider, Monitors monitors )
{ {
this.marshal = marshal; this.marshal = marshal;
this.sslPolicy = sslPolicy; this.pipelineAppender = pipelineAppender;
this.listenAddress = config.get( setting ); this.listenAddress = config.get( setting );
this.logProvider = logProvider; this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -131,10 +131,7 @@ protected void initChannel( SocketChannel ch ) throws Exception
{ {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();


if ( sslPolicy != null ) pipelineAppender.addPipelineHandlerForServer( pipeline, ch );
{
pipeline.addLast( sslPolicy.nettyServerHandler( ch ) );
}


pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) ); pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) ); pipeline.addLast( new LengthFieldPrepender( 4 ) );
Expand Down

0 comments on commit 0455e21

Please sign in to comment.