Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/3.3' into 3.4
Browse files Browse the repository at this point in the history
  • Loading branch information
pontusmelke committed Sep 28, 2017
2 parents d4a0c59 + e9556cc commit e8b3a5c
Show file tree
Hide file tree
Showing 45 changed files with 285 additions and 4,395 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
*/
package org.neo4j.causalclustering.catchup;

import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -27,18 +31,14 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
Expand All @@ -50,21 +50,21 @@ public class CatchUpClient extends LifecycleAdapter
private final Log log;
private final Clock clock;
private final Monitors monitors;
private final SslPolicy sslPolicy;
private final PipelineHandlerAppender pipelineAppender;
private final long inactivityTimeoutMillis;
private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<>( CatchUpChannel::new );

private NioEventLoopGroup eventLoopGroup;

public CatchUpClient( LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Monitors monitors,
SslPolicy sslPolicy )
PipelineHandlerAppender pipelineAppender )
{
this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() );
this.clock = clock;
this.inactivityTimeoutMillis = inactivityTimeoutMillis;
this.monitors = monitors;
this.sslPolicy = sslPolicy;
this.pipelineAppender = pipelineAppender;
}

public <T> T makeBlockingRequest( AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler )
Expand Down Expand Up @@ -110,7 +110,7 @@ private class CatchUpChannel implements CatchUpChannelPool.Channel
@Override
protected void initChannel( SocketChannel ch ) throws Exception
{
CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider, monitors, sslPolicy );
CatchUpClientChannelPipeline.initChannel( ch, handler, logProvider, monitors, pipelineAppender );
}
} );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;

class CatchUpClientChannelPipeline
{
Expand All @@ -58,16 +58,13 @@ private CatchUpClientChannelPipeline()
}

static void initChannel( SocketChannel ch, CatchUpResponseHandler handler, LogProvider logProvider,
Monitors monitors, SslPolicy sslPolicy ) throws Exception
Monitors monitors, PipelineHandlerAppender pipelineHandlerAppender ) throws Exception
{
CatchupClientProtocol protocol = new CatchupClientProtocol();

ChannelPipeline pipeline = ch.pipeline();

if ( sslPolicy != null )
{
pipeline.addLast( sslPolicy.nettyClientHandler( ch ) );
}
pipelineHandlerAppender.addPipelineHandlerForClient( pipeline, ch );

pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
*/
package org.neo4j.causalclustering.catchup;

import java.net.BindException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandler;
Expand All @@ -33,11 +38,6 @@
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.net.BindException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import org.neo4j.causalclustering.VersionDecoder;
import org.neo4j.causalclustering.VersionPrepender;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol.State;
Expand All @@ -61,6 +61,7 @@
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
Expand All @@ -76,7 +77,6 @@
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;

public class CatchupServer extends LifecycleAdapter
{
Expand All @@ -92,7 +92,7 @@ public class CatchupServer extends LifecycleAdapter
private final BooleanSupplier dataSourceAvailabilitySupplier;
private final FileSystemAbstraction fs;
private final PageCache pageCache;
private final SslPolicy sslPolicy;
private final PipelineHandlerAppender pipelineAppender;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "catchup-server" );
Expand All @@ -104,12 +104,12 @@ public class CatchupServer extends LifecycleAdapter
private final Supplier<CheckPointer> checkPointerSupplier;

public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supplier<StoreId> storeIdSupplier,
Supplier<TransactionIdStore> transactionIdStoreSupplier,
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier,
CoreSnapshotService snapshotService, Config config, Monitors monitors, Supplier<CheckPointer> checkPointerSupplier,
FileSystemAbstraction fs, PageCache pageCache,
StoreCopyCheckPointMutex storeCopyCheckPointMutex, SslPolicy sslPolicy )
Supplier<TransactionIdStore> transactionIdStoreSupplier,
Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier,
Supplier<NeoStoreDataSource> dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier,
CoreSnapshotService snapshotService, Config config, Monitors monitors, Supplier<CheckPointer> checkPointerSupplier,
FileSystemAbstraction fs, PageCache pageCache,
StoreCopyCheckPointMutex storeCopyCheckPointMutex, PipelineHandlerAppender pipelineAppender )
{
this.snapshotService = snapshotService;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
Expand All @@ -126,7 +126,7 @@ public CatchupServer( LogProvider logProvider, LogProvider userLogProvider, Supp
this.checkPointerSupplier = checkPointerSupplier;
this.fs = fs;
this.pageCache = pageCache;
this.sslPolicy = sslPolicy;
this.pipelineAppender = pipelineAppender;
}

@Override
Expand All @@ -150,10 +150,7 @@ protected void initChannel( SocketChannel ch ) throws Exception

ChannelPipeline pipeline = ch.pipeline();

if ( sslPolicy != null )
{
pipeline.addLast( sslPolicy.nettyServerHandler( ch ) );
}
pipelineAppender.addPipelineHandlerForServer( pipeline, ch );

pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@

public class CoreGraphDatabase extends GraphDatabaseFacade
{
protected CoreGraphDatabase()
{
}

public CoreGraphDatabase( File storeDir, Config config,
GraphDatabaseFacadeFactory.Dependencies dependencies )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.procedures.ClusterOverviewProcedure;
import org.neo4j.causalclustering.discovery.procedures.CoreRoleProcedure;
import org.neo4j.causalclustering.handlers.NoOpPipelineHandlerAppenderFactory;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppenderFactory;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.load_balancing.LoadBalancingPluginLoader;
import org.neo4j.causalclustering.load_balancing.LoadBalancingProcessor;
Expand All @@ -70,7 +73,6 @@
import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.ssl.SslPolicyLoader;
import org.neo4j.kernel.enterprise.builtinprocs.EnterpriseBuiltInDbmsProcedures;
import org.neo4j.kernel.impl.api.SchemaWriteGuard;
import org.neo4j.kernel.impl.api.TransactionHeaderInformation;
Expand Down Expand Up @@ -99,7 +101,6 @@
import org.neo4j.kernel.lifecycle.LifecycleStatus;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;
import org.neo4j.udc.UsageData;

import static org.neo4j.causalclustering.core.CausalClusteringSettings.raft_messages_log_path;
Expand All @@ -113,8 +114,8 @@ public class EnterpriseCoreEditionModule extends EditionModule
private final ConsensusModule consensusModule;
private final ReplicationModule replicationModule;
private final CoreTopologyService topologyService;
private final LogProvider logProvider;
private final Config config;
protected final LogProvider logProvider;
protected final Config config;
private CoreStateMachinesModule coreStateMachinesModule;

public enum RaftLogImplementation
Expand Down Expand Up @@ -198,17 +199,18 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke

IdentityModule identityModule = new IdentityModule( platformModule, clusterStateDirectory.get() );

SslPolicyLoader sslPolicyFactory = dependencies.satisfyDependency( SslPolicyLoader.create( config, logProvider ) );
SslPolicy clusterSslPolicy = sslPolicyFactory.getPolicy( config.get( CausalClusteringSettings.ssl_policy ) );
ClusteringModule clusteringModule = getClusteringModule( platformModule, discoveryServiceFactory,
clusterStateDirectory, identityModule, dependencies );

PipelineHandlerAppenderFactory appenderFactory = appenderFactory();
PipelineHandlerAppender pipelineHandlerAppender = appenderFactory.create( config, dependencies, logProvider );

ClusteringModule clusteringModule = new ClusteringModule( discoveryServiceFactory, identityModule.myself(),
platformModule, clusterStateDirectory.get(), clusterSslPolicy );
topologyService = clusteringModule.topologyService();

long logThresholdMillis = config.get( CausalClusteringSettings.unknown_address_logging_throttle ).toMillis();

final SenderService raftSender = new SenderService(
new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider, monitors, clusterSslPolicy ),
new RaftChannelInitializer( new CoreReplicatedContentMarshal(), logProvider, monitors, pipelineHandlerAppender ),
logProvider, platformModule.monitors );
life.add( raftSender );

Expand Down Expand Up @@ -246,7 +248,7 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke

CoreServerModule coreServerModule = new CoreServerModule( identityModule, platformModule, consensusModule,
coreStateMachinesModule, replicationModule, clusterStateDirectory.get(), clusteringModule, localDatabase,
messageLogger, databaseHealthSupplier, clusterSslPolicy );
messageLogger, databaseHealthSupplier, pipelineHandlerAppender );

editionInvariants( platformModule, dependencies, config, logging, life );

Expand All @@ -256,6 +258,20 @@ public void registerEditionSpecificProcedures( Procedures procedures ) throws Ke
life.add( coreServerModule.membershipWaiterLifecycle );
}

protected ClusteringModule getClusteringModule( PlatformModule platformModule,
DiscoveryServiceFactory discoveryServiceFactory,
ClusterStateDirectory clusterStateDirectory,
IdentityModule identityModule, Dependencies dependencies )
{
return new ClusteringModule( discoveryServiceFactory, identityModule.myself(),
platformModule, clusterStateDirectory.get() );
}

protected PipelineHandlerAppenderFactory appenderFactory()
{
return new NoOpPipelineHandlerAppenderFactory();
}

@Override
protected void createIdComponents( PlatformModule platformModule, Dependencies dependencies,
IdGeneratorFactory editionIdGeneratorFactory )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
*/
package org.neo4j.causalclustering.core.consensus;

import java.net.BindException;
import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -33,16 +36,14 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import java.net.BindException;
import java.util.concurrent.TimeUnit;

import org.neo4j.causalclustering.VersionDecoder;
import org.neo4j.causalclustering.VersionPrepender;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.handlers.ExceptionLoggingHandler;
import org.neo4j.causalclustering.handlers.ExceptionMonitoringHandler;
import org.neo4j.causalclustering.handlers.ExceptionSwallowingHandler;
import org.neo4j.causalclustering.handlers.PipelineHandlerAppender;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal;
import org.neo4j.causalclustering.messaging.marshalling.RaftMessageDecoder;
Expand All @@ -54,15 +55,14 @@
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.ssl.SslPolicy;

import static java.lang.String.format;

public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages.ClusterIdAwareMessage>
{
private static final Setting<ListenSocketAddress> setting = CausalClusteringSettings.raft_listen_address;
private final ChannelMarshal<ReplicatedContent> marshal;
private final SslPolicy sslPolicy;
private final PipelineHandlerAppender pipelineAppender;
private final ListenSocketAddress listenAddress;
private final LogProvider logProvider;
private final Log log;
Expand All @@ -75,11 +75,11 @@ public class RaftServer extends LifecycleAdapter implements Inbound<RaftMessages

private final NamedThreadFactory threadFactory = new NamedThreadFactory( "raft-server" );

public RaftServer( ChannelMarshal<ReplicatedContent> marshal, SslPolicy sslPolicy, Config config,
LogProvider logProvider, LogProvider userLogProvider, Monitors monitors )
public RaftServer( ChannelMarshal<ReplicatedContent> marshal, PipelineHandlerAppender pipelineAppender, Config config,
LogProvider logProvider, LogProvider userLogProvider, Monitors monitors )
{
this.marshal = marshal;
this.sslPolicy = sslPolicy;
this.pipelineAppender = pipelineAppender;
this.listenAddress = config.get( setting );
this.logProvider = logProvider;
this.log = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -131,10 +131,7 @@ protected void initChannel( SocketChannel ch ) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();

if ( sslPolicy != null )
{
pipeline.addLast( sslPolicy.nettyServerHandler( ch ) );
}
pipelineAppender.addPipelineHandlerForServer( pipeline, ch );

pipeline.addLast( new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, 0, 4 ) );
pipeline.addLast( new LengthFieldPrepender( 4 ) );
Expand Down

0 comments on commit e8b3a5c

Please sign in to comment.