diff --git a/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java b/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java index 7af8ac7d6074c..9ee6c9ca52a45 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java @@ -24,8 +24,6 @@ import java.time.Clock; import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; import org.neo4j.bolt.logging.BoltMessageLogging; import org.neo4j.bolt.runtime.BoltConnectionFactory; @@ -47,10 +45,10 @@ import org.neo4j.bolt.v1.runtime.BoltFactoryImpl; import org.neo4j.configuration.Description; import org.neo4j.configuration.LoadableConfig; +import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.config.Setting; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.helpers.ListenSocketAddress; -import org.neo4j.helpers.Service; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.api.bolt.BoltConnectionTracker; @@ -60,196 +58,193 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.ConnectorPortRegister; import org.neo4j.kernel.configuration.ssl.SslPolicyLoader; -import org.neo4j.kernel.extension.KernelExtensionFactory; import org.neo4j.kernel.impl.logging.LogService; -import org.neo4j.kernel.impl.spi.KernelContext; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.lifecycle.LifeSupport; -import org.neo4j.kernel.lifecycle.Lifecycle; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.Log; import org.neo4j.scheduler.JobScheduler; import org.neo4j.udc.UsageData; -import static java.lang.String.format; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; import static org.neo4j.kernel.configuration.Settings.STRING; import static org.neo4j.kernel.configuration.Settings.setting; import static org.neo4j.kernel.configuration.ssl.LegacySslPolicyConfig.LEGACY_POLICY_NAME; import static org.neo4j.scheduler.JobScheduler.Groups.boltNetworkIO; -/** - * Wraps Bolt and exposes it as a Kernel Extension. - */ -@Service.Implementation( KernelExtensionFactory.class ) -public class BoltKernelExtension extends KernelExtensionFactory +public class BoltKernelExtension extends LifecycleAdapter { + // platform dependencies + private final GraphDatabaseAPI db; + private final FileSystemAbstraction fs; + private final JobScheduler jobScheduler; + private final AvailabilityGuard availabilityGuard; + private final ConnectorPortRegister connectorPortRegister; + private final UsageData usageData; + private final Config config; + private final Clock clock; + private final Monitors monitors; + private final LogService logService; + + // edition specific dependencies are resolved dynamically + private final DependencyResolver dependencyResolver; + + private final LifeSupport life = new LifeSupport(); + public static class Settings implements LoadableConfig { @Description( "Specify the SSL policy to use" ) public static final Setting ssl_policy = setting( "bolt.ssl_policy", STRING, LEGACY_POLICY_NAME ); } - public interface Dependencies - { - LogService logService(); - - Config config(); - - GraphDatabaseAPI db(); - - JobScheduler scheduler(); - - UsageData usageData(); - - Monitors monitors(); - - AvailabilityGuard availabilityGuard(); - - BoltConnectionTracker sessionTracker(); - - ConnectorPortRegister connectionRegister(); - - Clock clock(); - - AuthManager authManager(); - - UserManagerSupplier userManagerSupplier(); - - SslPolicyLoader sslPolicyFactory(); - - FileSystemAbstraction fileSystem(); - } - - public BoltKernelExtension() + public BoltKernelExtension( GraphDatabaseAPI db, FileSystemAbstraction fs, JobScheduler jobScheduler, AvailabilityGuard availabilityGuard, + ConnectorPortRegister connectorPortRegister, UsageData usageData, Config config, Clock clock, Monitors monitors, + LogService logService, DependencyResolver dependencyResolver ) { - super( "bolt-server" ); + this.db = db; + this.fs = fs; + this.jobScheduler = jobScheduler; + this.availabilityGuard = availabilityGuard; + this.connectorPortRegister = connectorPortRegister; + this.usageData = usageData; + this.config = config; + this.clock = clock; + this.monitors = monitors; + this.logService = logService; + this.dependencyResolver = dependencyResolver; } @Override - public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) + public void start() throws Throwable { - Config config = dependencies.config(); - LogService logService = dependencies.logService(); - Clock clock = dependencies.clock(); - SslPolicyLoader sslPolicyFactory = dependencies.sslPolicyFactory(); Log log = logService.getInternalLog( BoltKernelExtension.class ); Log userLog = logService.getUserLog( BoltKernelExtension.class ); - LifeSupport life = new LifeSupport(); - - JobScheduler scheduler = dependencies.scheduler(); - InternalLoggerFactory.setDefaultFactory( new Netty4LoggerFactory( logService.getInternalLogProvider() ) ); - BoltMessageLogging boltLogging = BoltMessageLogging.create( dependencies.fileSystem(), scheduler, config, log ); + BoltMessageLogging boltLogging = BoltMessageLogging.create( fs, jobScheduler, config, log ); life.add( boltLogging ); - Authentication authentication = authentication( dependencies.authManager(), dependencies.userManagerSupplier() ); + Authentication authentication = createAuthentication(); TransportThrottleGroup throttleGroup = new TransportThrottleGroup( config, clock ); - BoltFactory boltFactory = new BoltFactoryImpl( dependencies.db(), dependencies.usageData(), dependencies.availabilityGuard(), - authentication, dependencies.sessionTracker(), config, logService ); + BoltFactory boltFactory = createBoltFactory( authentication ); BoltSchedulerProvider boltSchedulerProvider = - life.add( new ExecutorBoltSchedulerProvider( config, new CachedThreadPoolExecutorFactory( log ), scheduler, logService ) ); + life.add( new ExecutorBoltSchedulerProvider( config, new CachedThreadPoolExecutorFactory( log ), jobScheduler, logService ) ); BoltConnectionFactory boltConnectionFactory = - createConnectionFactory( config, boltFactory, boltSchedulerProvider, throttleGroup, dependencies, logService, clock ); - ConnectorPortRegister connectionRegister = dependencies.connectionRegister(); + createConnectionFactory( config, boltFactory, boltSchedulerProvider, throttleGroup, logService, clock ); - BoltProtocolPipelineInstallerFactory handlerFactory = createHandlerFactory( boltConnectionFactory, throttleGroup, logService ); + BoltProtocolPipelineInstallerFactory handlerFactory = createHandlerFactory( boltConnectionFactory, throttleGroup ); if ( !config.enabledBoltConnectors().isEmpty() && !config.get( GraphDatabaseSettings.disconnected ) ) { - NettyServer server = new NettyServer( scheduler.threadFactory( boltNetworkIO ), - createConnectors( config, sslPolicyFactory, logService, log, boltLogging, throttleGroup, handlerFactory ), connectionRegister, - userLog ); + NettyServer server = new NettyServer( jobScheduler.threadFactory( boltNetworkIO ), + createConnectors( handlerFactory, throttleGroup, boltLogging, log ), connectorPortRegister, userLog ); life.add( server ); - log.info( "Bolt Server extension loaded." ); + log.info( "Bolt server loaded" ); } - return life; + life.start(); // init and start the nested lifecycle + } + + @Override + public void stop() throws Throwable + { + life.shutdown(); // stop and shutdown the nested lifecycle } private BoltConnectionFactory createConnectionFactory( Config config, BoltFactory boltFactory, BoltSchedulerProvider schedulerProvider, - TransportThrottleGroup throttleGroup, - Dependencies dependencies, LogService logService, Clock clock ) + TransportThrottleGroup throttleGroup, LogService logService, Clock clock ) { return new DefaultBoltConnectionFactory( boltFactory, schedulerProvider, throttleGroup, logService, clock, new BoltConnectionReadLimiter( logService.getInternalLog( BoltConnectionReadLimiter.class ), config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_low_water_mark ), - config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_high_water_mark ) ), dependencies.monitors() ); + config.get( GraphDatabaseSettings.bolt_inbound_message_throttle_high_water_mark ) ), monitors ); } - private Map createConnectors( Config config, SslPolicyLoader sslPolicyFactory, LogService logService, Log log, - BoltMessageLogging boltLogging, TransportThrottleGroup throttleGroup, BoltProtocolPipelineInstallerFactory handlerFactory ) + private Map createConnectors( BoltProtocolPipelineInstallerFactory handlerFactory, + TransportThrottleGroup throttleGroup, BoltMessageLogging boltLogging, Log log ) { - Map connectors = - config.enabledBoltConnectors().stream().collect( Collectors.toMap( Function.identity(), connConfig -> - { - ListenSocketAddress listenAddress = config.get( connConfig.listen_address ); - SslContext sslCtx; - boolean requireEncryption; - final BoltConnector.EncryptionLevel encryptionLevel = config.get( connConfig.encryption_level ); - switch ( encryptionLevel ) - { - case REQUIRED: - // Encrypted connections are mandatory, a self-signed certificate may be generated. - requireEncryption = true; - sslCtx = createSslContext( sslPolicyFactory, config ); - break; - case OPTIONAL: - // Encrypted connections are optional, a self-signed certificate may be generated. - requireEncryption = false; - sslCtx = createSslContext( sslPolicyFactory, config ); - break; - case DISABLED: - // Encryption is turned off, no self-signed certificate will be generated. - requireEncryption = false; - sslCtx = null; - break; - default: - // In the unlikely event that we happen to fall through to the default option here, - // there is a mismatch between the BoltConnector.EncryptionLevel enum and the options - // handled in this switch statement. In this case, we'll log a warning and default to - // disabling encryption, since this mirrors the functionality introduced in 3.0. - log.warn( format( "Unhandled encryption level %s - assuming DISABLED.", encryptionLevel.name() ) ); - requireEncryption = false; - sslCtx = null; - break; - } - - return new SocketTransport( connConfig.key(), listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), boltLogging, - throttleGroup, handlerFactory ); - } ) ); - - return connectors; + return config.enabledBoltConnectors() + .stream() + .collect( toMap( identity(), connector -> createProtocolInitializer( connector, handlerFactory, throttleGroup, boltLogging, log ) ) ); } - private SslContext createSslContext( SslPolicyLoader sslPolicyFactory, Config config ) + private ProtocolInitializer createProtocolInitializer( BoltConnector connector, BoltProtocolPipelineInstallerFactory handlerFactory, + TransportThrottleGroup throttleGroup, BoltMessageLogging boltLogging, Log log ) + { + SslContext sslCtx; + boolean requireEncryption; + BoltConnector.EncryptionLevel encryptionLevel = config.get( connector.encryption_level ); + SslPolicyLoader sslPolicyLoader = dependencyResolver.resolveDependency( SslPolicyLoader.class ); + + switch ( encryptionLevel ) + { + case REQUIRED: + // Encrypted connections are mandatory, a self-signed certificate may be generated. + requireEncryption = true; + sslCtx = createSslContext( sslPolicyLoader, config ); + break; + case OPTIONAL: + // Encrypted connections are optional, a self-signed certificate may be generated. + requireEncryption = false; + sslCtx = createSslContext( sslPolicyLoader, config ); + break; + case DISABLED: + // Encryption is turned off, no self-signed certificate will be generated. + requireEncryption = false; + sslCtx = null; + break; + default: + // In the unlikely event that we happen to fall through to the default option here, + // there is a mismatch between the BoltConnector.EncryptionLevel enum and the options + // handled in this switch statement. In this case, we'll log a warning and default to + // disabling encryption, since this mirrors the functionality introduced in 3.0. + log.warn( "Unhandled encryption level %s - assuming DISABLED.", encryptionLevel.name() ); + requireEncryption = false; + sslCtx = null; + break; + } + + ListenSocketAddress listenAddress = config.get( connector.listen_address ); + return new SocketTransport( connector.key(), listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), boltLogging, + throttleGroup, handlerFactory ); + } + + private static SslContext createSslContext( SslPolicyLoader sslPolicyFactory, Config config ) { try { String policyName = config.get( Settings.ssl_policy ); if ( policyName == null ) { - throw new IllegalArgumentException( "No SSL policy has been configured for bolt" ); + throw new IllegalArgumentException( "No SSL policy has been configured for Bolt server" ); } return sslPolicyFactory.getPolicy( policyName ).nettyServerContext(); } catch ( Exception e ) { - throw new RuntimeException( "Failed to initialize SSL encryption support, which is required to start " + - "this connector. Error was: " + e.getMessage(), e ); + throw new RuntimeException( "Failed to initialize SSL encryption support, which is required to start this connector. " + + "Error was: " + e.getMessage(), e ); } } - private Authentication authentication( AuthManager authManager, UserManagerSupplier userManagerSupplier ) + private Authentication createAuthentication() { - return new BasicAuthentication( authManager, userManagerSupplier ); + return new BasicAuthentication( dependencyResolver.resolveDependency( AuthManager.class ), + dependencyResolver.resolveDependency( UserManagerSupplier.class ) ); } - private static BoltProtocolPipelineInstallerFactory createHandlerFactory( BoltConnectionFactory connectionFactory, - TransportThrottleGroup throttleGroup, LogService logService ) + private BoltProtocolPipelineInstallerFactory createHandlerFactory( BoltConnectionFactory connectionFactory, TransportThrottleGroup throttleGroup ) { return new DefaultBoltProtocolPipelineInstallerFactory( connectionFactory, throttleGroup, logService ); } + + private BoltFactory createBoltFactory( Authentication authentication ) + { + BoltConnectionTracker connectionTracker = dependencyResolver.resolveDependency( BoltConnectionTracker.class ); + return new BoltFactoryImpl( db, usageData, availabilityGuard, authentication, connectionTracker, config, logService ); + } } diff --git a/community/bolt/src/main/resources/META-INF/services/org.neo4j.kernel.extension.KernelExtensionFactory b/community/bolt/src/main/resources/META-INF/services/org.neo4j.kernel.extension.KernelExtensionFactory deleted file mode 100644 index ef396fbb84e50..0000000000000 --- a/community/bolt/src/main/resources/META-INF/services/org.neo4j.kernel.extension.KernelExtensionFactory +++ /dev/null @@ -1 +0,0 @@ -org.neo4j.bolt.BoltKernelExtension diff --git a/community/neo4j/src/main/java/org/neo4j/graphdb/facade/GraphDatabaseFacadeFactory.java b/community/neo4j/src/main/java/org/neo4j/graphdb/facade/GraphDatabaseFacadeFactory.java index 944383c8aab8f..59613fc889e0f 100644 --- a/community/neo4j/src/main/java/org/neo4j/graphdb/facade/GraphDatabaseFacadeFactory.java +++ b/community/neo4j/src/main/java/org/neo4j/graphdb/facade/GraphDatabaseFacadeFactory.java @@ -25,11 +25,12 @@ import java.util.function.Function; import java.util.function.Supplier; -import org.neo4j.graphdb.facade.spi.ClassicCoreSPI; +import org.neo4j.bolt.BoltKernelExtension; import org.neo4j.graphdb.DependencyResolver; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Path; import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.facade.spi.ClassicCoreSPI; import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.module.DataSourceModule; import org.neo4j.graphdb.factory.module.EditionModule; @@ -177,6 +178,7 @@ public GraphDatabaseFacade initFacade( File storeDir, Config config, final Depen final DataSourceModule dataSource = createDataSource( platform, edition, queryEngine::get, procedures ); + platform.life.add( createBoltServer( platform ) ); platform.life.add( new VmPauseMonitorComponent( config, platform.logging.getInternalLog( VmPauseMonitorComponent.class ), platform.jobScheduler ) ); platform.life.add( new PublishPageCacheTracerMetricsAfterStart( platform.tracers.pageCursorTracerSupplier ) ); platform.life.add( new DatabaseAvailability( platform.availabilityGuard, platform.transactionMonitor, @@ -346,4 +348,11 @@ private static Procedures setupProcedures( PlatformModule platform, EditionModul return procedures; } + + private static BoltKernelExtension createBoltServer( PlatformModule platform ) + { + return new BoltKernelExtension( platform.graphDatabaseFacade, platform.fileSystem, platform.jobScheduler, platform.availabilityGuard, + platform.connectorPortRegister, platform.usageData, platform.config, platform.clock, platform.monitors, platform.logging, + platform.dependencies ); + } } diff --git a/community/neo4j/src/main/java/org/neo4j/graphdb/factory/module/PlatformModule.java b/community/neo4j/src/main/java/org/neo4j/graphdb/factory/module/PlatformModule.java index 89dbf068c19f4..ed051995fced6 100644 --- a/community/neo4j/src/main/java/org/neo4j/graphdb/factory/module/PlatformModule.java +++ b/community/neo4j/src/main/java/org/neo4j/graphdb/factory/module/PlatformModule.java @@ -129,6 +129,10 @@ public class PlatformModule public final CollectionsFactorySupplier collectionsFactorySupplier; + public final UsageData usageData; + + public final ConnectorPortRegister connectorPortRegister; + public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databaseInfo, GraphDatabaseFacadeFactory.Dependencies externalDependencies, GraphDatabaseFacade graphDatabaseFacade ) { @@ -163,7 +167,8 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa dependencies.satisfyDependency( recoveryCleanupWorkCollector ); // Database system information, used by UDC - dependencies.satisfyDependency( life.add( new UsageData( jobScheduler ) ) ); + usageData = new UsageData( jobScheduler ); + dependencies.satisfyDependency( life.add( usageData ) ); // If no logging was passed in from the outside then create logging and register // with this life @@ -212,7 +217,8 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa storeCopyCheckPointMutex = new StoreCopyCheckPointMutex(); dependencies.satisfyDependency( storeCopyCheckPointMutex ); - dependencies.satisfyDependency( new ConnectorPortRegister() ); + connectorPortRegister = new ConnectorPortRegister(); + dependencies.satisfyDependency( connectorPortRegister ); eventHandlers = new KernelEventHandlers( logging.getInternalLog( KernelEventHandlers.class ) );