Skip to content

Commit

Permalink
Make Bolt server not a kernel extension
Browse files Browse the repository at this point in the history
It is now a regular component that lives in the platform lifecycle.
Part of its dependencies is statically defined and retrieved from the
platform module. Other edition-specific dependencies are resolved
dynamically using the dependency resolver. This is because some
dependencies, like `AuthManager`, are not exposed in the edition module.
  • Loading branch information
lutovich committed Jun 18, 2018
1 parent 074eb23 commit a13b8a6
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 124 deletions.
235 changes: 115 additions & 120 deletions community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java
Expand Up @@ -24,8 +24,6 @@

import java.time.Clock;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.neo4j.bolt.logging.BoltMessageLogging;
import org.neo4j.bolt.runtime.BoltConnectionFactory;
Expand All @@ -47,10 +45,10 @@
import org.neo4j.bolt.v1.runtime.BoltFactoryImpl;
import org.neo4j.configuration.Description;
import org.neo4j.configuration.LoadableConfig;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.Service;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.bolt.BoltConnectionTracker;
Expand All @@ -60,196 +58,193 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.configuration.ConnectorPortRegister;
import org.neo4j.kernel.configuration.ssl.SslPolicyLoader;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.spi.KernelContext;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.udc.UsageData;

import static java.lang.String.format;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static org.neo4j.kernel.configuration.Settings.STRING;
import static org.neo4j.kernel.configuration.Settings.setting;
import static org.neo4j.kernel.configuration.ssl.LegacySslPolicyConfig.LEGACY_POLICY_NAME;
import static org.neo4j.scheduler.JobScheduler.Groups.boltNetworkIO;

/**
* Wraps Bolt and exposes it as a Kernel Extension.
*/
@Service.Implementation( KernelExtensionFactory.class )
public class BoltKernelExtension extends KernelExtensionFactory<BoltKernelExtension.Dependencies>
public class BoltKernelExtension extends LifecycleAdapter
{
// platform dependencies
private final GraphDatabaseAPI db;
private final FileSystemAbstraction fs;
private final JobScheduler jobScheduler;
private final AvailabilityGuard availabilityGuard;
private final ConnectorPortRegister connectorPortRegister;
private final UsageData usageData;
private final Config config;
private final Clock clock;
private final Monitors monitors;
private final LogService logService;

// edition specific dependencies are resolved dynamically
private final DependencyResolver dependencyResolver;

private final LifeSupport life = new LifeSupport();

public static class Settings implements LoadableConfig
{
@Description( "Specify the SSL policy to use" )
public static final Setting<String> ssl_policy = setting( "bolt.ssl_policy", STRING, LEGACY_POLICY_NAME );
}

public interface Dependencies
{
LogService logService();

Config config();

GraphDatabaseAPI db();

JobScheduler scheduler();

UsageData usageData();

Monitors monitors();

AvailabilityGuard availabilityGuard();

BoltConnectionTracker sessionTracker();

ConnectorPortRegister connectionRegister();

Clock clock();

AuthManager authManager();

UserManagerSupplier userManagerSupplier();

SslPolicyLoader sslPolicyFactory();

FileSystemAbstraction fileSystem();
}

public BoltKernelExtension()
public BoltKernelExtension( GraphDatabaseAPI db, FileSystemAbstraction fs, JobScheduler jobScheduler, AvailabilityGuard availabilityGuard,
ConnectorPortRegister connectorPortRegister, UsageData usageData, Config config, Clock clock, Monitors monitors,
LogService logService, DependencyResolver dependencyResolver )
{
super( "bolt-server" );
this.db = db;
this.fs = fs;
this.jobScheduler = jobScheduler;
this.availabilityGuard = availabilityGuard;
this.connectorPortRegister = connectorPortRegister;
this.usageData = usageData;
this.config = config;
this.clock = clock;
this.monitors = monitors;
this.logService = logService;
this.dependencyResolver = dependencyResolver;
}

@Override
public Lifecycle newInstance( KernelContext context, Dependencies dependencies )
public void start() throws Throwable
{
Config config = dependencies.config();
LogService logService = dependencies.logService();
Clock clock = dependencies.clock();
SslPolicyLoader sslPolicyFactory = dependencies.sslPolicyFactory();
Log log = logService.getInternalLog( BoltKernelExtension.class );
Log userLog = logService.getUserLog( BoltKernelExtension.class );

LifeSupport life = new LifeSupport();

JobScheduler scheduler = dependencies.scheduler();

InternalLoggerFactory.setDefaultFactory( new Netty4LoggerFactory( logService.getInternalLogProvider() ) );
BoltMessageLogging boltLogging = BoltMessageLogging.create( dependencies.fileSystem(), scheduler, config, log );
BoltMessageLogging boltLogging = BoltMessageLogging.create( fs, jobScheduler, config, log );
life.add( boltLogging );

Authentication authentication = authentication( dependencies.authManager(), dependencies.userManagerSupplier() );
Authentication authentication = createAuthentication();

TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config, clock );

BoltFactory boltFactory = new BoltFactoryImpl( dependencies.db(), dependencies.usageData(), dependencies.availabilityGuard(),
authentication, dependencies.sessionTracker(), config, logService );
BoltFactory boltFactory = createBoltFactory( authentication );
BoltSchedulerProvider boltSchedulerProvider =
life.add( new ExecutorBoltSchedulerProvider( config, new CachedThreadPoolExecutorFactory( log ), scheduler, logService ) );
life.add( new ExecutorBoltSchedulerProvider( config, new CachedThreadPoolExecutorFactory( log ), jobScheduler, logService ) );
BoltConnectionFactory boltConnectionFactory =
createConnectionFactory( config, boltFactory, boltSchedulerProvider, throttleGroup, dependencies, logService, clock );
ConnectorPortRegister connectionRegister = dependencies.connectionRegister();
createConnectionFactory( config, boltFactory, boltSchedulerProvider, throttleGroup, logService, clock );

BoltProtocolPipelineInstallerFactory handlerFactory = createHandlerFactory( boltConnectionFactory, throttleGroup, logService );
BoltProtocolPipelineInstallerFactory handlerFactory = createHandlerFactory( boltConnectionFactory, throttleGroup );

if ( !config.enabledBoltConnectors().isEmpty() && !config.get( GraphDatabaseSettings.disconnected ) )
{
NettyServer server = new NettyServer( scheduler.threadFactory( boltNetworkIO ),
createConnectors( config, sslPolicyFactory, logService, log, boltLogging, throttleGroup, handlerFactory ), connectionRegister,
userLog );
NettyServer server = new NettyServer( jobScheduler.threadFactory( boltNetworkIO ),
createConnectors( handlerFactory, throttleGroup, boltLogging, log ), connectorPortRegister, userLog );
life.add( server );
log.info( "Bolt Server extension loaded." );
log.info( "Bolt server loaded" );
}

return life;
life.start(); // init and start the nested lifecycle
}

@Override
public void stop() throws Throwable
{
life.shutdown(); // stop and shutdown the nested lifecycle
}

private BoltConnectionFactory createConnectionFactory( Config config, BoltFactory boltFactory, BoltSchedulerProvider schedulerProvider,
TransportThrottleGroup throttleGroup,
Dependencies dependencies, LogService logService, Clock clock )
TransportThrottleGroup throttleGroup, LogService logService, Clock clock )
{
return new DefaultBoltConnectionFactory( boltFactory, schedulerProvider, throttleGroup, logService, clock,
new BoltConnectionReadLimiter( logService.getInternalLog( BoltConnectionReadLimiter.class ),
config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_low_water_mark ),
config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_high_water_mark ) ), dependencies.monitors() );
config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_high_water_mark ) ), monitors );
}

private Map<BoltConnector,ProtocolInitializer> createConnectors( Config config, SslPolicyLoader sslPolicyFactory, LogService logService, Log log,
BoltMessageLogging boltLogging, TransportThrottleGroup throttleGroup, BoltProtocolPipelineInstallerFactory handlerFactory )
private Map<BoltConnector,ProtocolInitializer> createConnectors( BoltProtocolPipelineInstallerFactory handlerFactory,
TransportThrottleGroup throttleGroup, BoltMessageLogging boltLogging, Log log )
{
Map<BoltConnector,ProtocolInitializer> connectors =
config.enabledBoltConnectors().stream().collect( Collectors.toMap( Function.identity(), connConfig ->
{
ListenSocketAddress listenAddress = config.get( connConfig.listen_address );
SslContext sslCtx;
boolean requireEncryption;
final BoltConnector.EncryptionLevel encryptionLevel = config.get( connConfig.encryption_level );
switch ( encryptionLevel )
{
case REQUIRED:
// Encrypted connections are mandatory, a self-signed certificate may be generated.
requireEncryption = true;
sslCtx = createSslContext( sslPolicyFactory, config );
break;
case OPTIONAL:
// Encrypted connections are optional, a self-signed certificate may be generated.
requireEncryption = false;
sslCtx = createSslContext( sslPolicyFactory, config );
break;
case DISABLED:
// Encryption is turned off, no self-signed certificate will be generated.
requireEncryption = false;
sslCtx = null;
break;
default:
// In the unlikely event that we happen to fall through to the default option here,
// there is a mismatch between the BoltConnector.EncryptionLevel enum and the options
// handled in this switch statement. In this case, we'll log a warning and default to
// disabling encryption, since this mirrors the functionality introduced in 3.0.
log.warn( format( "Unhandled encryption level %s - assuming DISABLED.", encryptionLevel.name() ) );
requireEncryption = false;
sslCtx = null;
break;
}

return new SocketTransport( connConfig.key(), listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), boltLogging,
throttleGroup, handlerFactory );
} ) );

return connectors;
return config.enabledBoltConnectors()
.stream()
.collect( toMap( identity(), connector -> createProtocolInitializer( connector, handlerFactory, throttleGroup, boltLogging, log ) ) );
}

private SslContext createSslContext( SslPolicyLoader sslPolicyFactory, Config config )
private ProtocolInitializer createProtocolInitializer( BoltConnector connector, BoltProtocolPipelineInstallerFactory handlerFactory,
TransportThrottleGroup throttleGroup, BoltMessageLogging boltLogging, Log log )
{
SslContext sslCtx;
boolean requireEncryption;
BoltConnector.EncryptionLevel encryptionLevel = config.get( connector.encryption_level );
SslPolicyLoader sslPolicyLoader = dependencyResolver.resolveDependency( SslPolicyLoader.class );

switch ( encryptionLevel )
{
case REQUIRED:
// Encrypted connections are mandatory, a self-signed certificate may be generated.
requireEncryption = true;
sslCtx = createSslContext( sslPolicyLoader, config );
break;
case OPTIONAL:
// Encrypted connections are optional, a self-signed certificate may be generated.
requireEncryption = false;
sslCtx = createSslContext( sslPolicyLoader, config );
break;
case DISABLED:
// Encryption is turned off, no self-signed certificate will be generated.
requireEncryption = false;
sslCtx = null;
break;
default:
// In the unlikely event that we happen to fall through to the default option here,
// there is a mismatch between the BoltConnector.EncryptionLevel enum and the options
// handled in this switch statement. In this case, we'll log a warning and default to
// disabling encryption, since this mirrors the functionality introduced in 3.0.
log.warn( "Unhandled encryption level %s - assuming DISABLED.", encryptionLevel.name() );
requireEncryption = false;
sslCtx = null;
break;
}

ListenSocketAddress listenAddress = config.get( connector.listen_address );
return new SocketTransport( connector.key(), listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), boltLogging,
throttleGroup, handlerFactory );
}

private static SslContext createSslContext( SslPolicyLoader sslPolicyFactory, Config config )
{
try
{
String policyName = config.get( Settings.ssl_policy );
if ( policyName == null )
{
throw new IllegalArgumentException( "No SSL policy has been configured for bolt" );
throw new IllegalArgumentException( "No SSL policy has been configured for Bolt server" );
}
return sslPolicyFactory.getPolicy( policyName ).nettyServerContext();
}
catch ( Exception e )
{
throw new RuntimeException( "Failed to initialize SSL encryption support, which is required to start " +
"this connector. Error was: " + e.getMessage(), e );
throw new RuntimeException( "Failed to initialize SSL encryption support, which is required to start this connector. " +
"Error was: " + e.getMessage(), e );
}
}

private Authentication authentication( AuthManager authManager, UserManagerSupplier userManagerSupplier )
private Authentication createAuthentication()
{
return new BasicAuthentication( authManager, userManagerSupplier );
return new BasicAuthentication( dependencyResolver.resolveDependency( AuthManager.class ),
dependencyResolver.resolveDependency( UserManagerSupplier.class ) );
}

private static BoltProtocolPipelineInstallerFactory createHandlerFactory( BoltConnectionFactory connectionFactory,
TransportThrottleGroup throttleGroup, LogService logService )
private BoltProtocolPipelineInstallerFactory createHandlerFactory( BoltConnectionFactory connectionFactory, TransportThrottleGroup throttleGroup )
{
return new DefaultBoltProtocolPipelineInstallerFactory( connectionFactory, throttleGroup, logService );
}

private BoltFactory createBoltFactory( Authentication authentication )
{
BoltConnectionTracker connectionTracker = dependencyResolver.resolveDependency( BoltConnectionTracker.class );
return new BoltFactoryImpl( db, usageData, availabilityGuard, authentication, connectionTracker, config, logService );
}
}

This file was deleted.

Expand Up @@ -25,11 +25,12 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.neo4j.graphdb.facade.spi.ClassicCoreSPI;
import org.neo4j.bolt.BoltKernelExtension;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Path;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.facade.spi.ClassicCoreSPI;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.factory.module.DataSourceModule;
import org.neo4j.graphdb.factory.module.EditionModule;
Expand Down Expand Up @@ -177,6 +178,7 @@ public GraphDatabaseFacade initFacade( File storeDir, Config config, final Depen

final DataSourceModule dataSource = createDataSource( platform, edition, queryEngine::get, procedures );

platform.life.add( createBoltServer( platform ) );
platform.life.add( new VmPauseMonitorComponent( config, platform.logging.getInternalLog( VmPauseMonitorComponent.class ), platform.jobScheduler ) );
platform.life.add( new PublishPageCacheTracerMetricsAfterStart( platform.tracers.pageCursorTracerSupplier ) );
platform.life.add( new DatabaseAvailability( platform.availabilityGuard, platform.transactionMonitor,
Expand Down Expand Up @@ -346,4 +348,11 @@ private static Procedures setupProcedures( PlatformModule platform, EditionModul

return procedures;
}

private static BoltKernelExtension createBoltServer( PlatformModule platform )
{
return new BoltKernelExtension( platform.graphDatabaseFacade, platform.fileSystem, platform.jobScheduler, platform.availabilityGuard,
platform.connectorPortRegister, platform.usageData, platform.config, platform.clock, platform.monitors, platform.logging,
platform.dependencies );
}
}
Expand Up @@ -129,6 +129,10 @@ public class PlatformModule

public final CollectionsFactorySupplier collectionsFactorySupplier;

public final UsageData usageData;

public final ConnectorPortRegister connectorPortRegister;

public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databaseInfo,
GraphDatabaseFacadeFactory.Dependencies externalDependencies, GraphDatabaseFacade graphDatabaseFacade )
{
Expand Down Expand Up @@ -163,7 +167,8 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
dependencies.satisfyDependency( recoveryCleanupWorkCollector );

// Database system information, used by UDC
dependencies.satisfyDependency( life.add( new UsageData( jobScheduler ) ) );
usageData = new UsageData( jobScheduler );
dependencies.satisfyDependency( life.add( usageData ) );

// If no logging was passed in from the outside then create logging and register
// with this life
Expand Down Expand Up @@ -212,7 +217,8 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
storeCopyCheckPointMutex = new StoreCopyCheckPointMutex();
dependencies.satisfyDependency( storeCopyCheckPointMutex );

dependencies.satisfyDependency( new ConnectorPortRegister() );
connectorPortRegister = new ConnectorPortRegister();
dependencies.satisfyDependency( connectorPortRegister );

eventHandlers = new KernelEventHandlers( logging.getInternalLog( KernelEventHandlers.class ) );

Expand Down

0 comments on commit a13b8a6

Please sign in to comment.