diff --git a/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java b/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java index 86b948d61c820..61367085c357c 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/BoltKernelExtension.java @@ -45,9 +45,6 @@ import org.neo4j.bolt.transport.TransportThrottleGroup; import org.neo4j.bolt.v1.runtime.BoltFactory; import org.neo4j.bolt.v1.runtime.BoltFactoryImpl; -import org.neo4j.bolt.v1.runtime.MonitoredWorkerFactory; -import org.neo4j.bolt.v1.runtime.WorkerFactory; -import org.neo4j.bolt.v1.runtime.concurrent.ThreadedWorkerFactory; import org.neo4j.configuration.Description; import org.neo4j.configuration.LoadableConfig; import org.neo4j.graphdb.GraphDatabaseService; @@ -138,7 +135,8 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) LogService logService = dependencies.logService(); Clock clock = dependencies.clock(); SslPolicyLoader sslPolicyFactory = dependencies.sslPolicyFactory(); - Log log = logService.getInternalLog( WorkerFactory.class ); + Log log = logService.getInternalLog( BoltKernelExtension.class ); + Log userLog = logService.getUserLog( BoltKernelExtension.class ); LifeSupport life = new LifeSupport(); @@ -163,7 +161,7 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) { NettyServer server = new NettyServer( scheduler.threadFactory( boltNetworkIO ), createConnectors( config, sslPolicyFactory, logService, log, boltLogging, throttleGroup, handlerFactory ), connectionRegister, - logService.getUserLog( WorkerFactory.class ) ); + userLog ); life.add( server ); log.info( "Bolt Server extension loaded." ); } @@ -171,13 +169,6 @@ public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) return life; } - protected WorkerFactory createWorkerFactory( BoltFactory boltFactory, JobScheduler scheduler, - Dependencies dependencies, LogService logService, Clock clock ) - { - WorkerFactory threadedWorkerFactory = new ThreadedWorkerFactory( boltFactory, scheduler, logService, clock ); - return new MonitoredWorkerFactory( dependencies.monitors(), threadedWorkerFactory, clock ); - } - private BoltConnectionFactory createConnectionFactory( BoltFactory boltFactory, BoltSchedulerProvider schedulerProvider, Dependencies dependencies, LogService logService, Clock clock ) { @@ -223,8 +214,6 @@ private Map createConnectors( Config config, break; } - logService.getUserLog( WorkerFactory.class ).info( "Bolt enabled on %s.", listenAddress ); - return new SocketTransport( connConfig.key(), listenAddress, sslCtx, requireEncryption, logService.getInternalLogProvider(), boltLogging, throttleGroup, handlerFactory ); } ) ); diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltMessageRouter.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltMessageRouter.java index 62f0c0a11f123..7df2efc00649d 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltMessageRouter.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltMessageRouter.java @@ -24,7 +24,6 @@ import org.neo4j.bolt.logging.BoltMessageLogger; import org.neo4j.bolt.runtime.BoltConnection; -import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.Neo4jError; import org.neo4j.bolt.v1.runtime.spi.BoltResult; import org.neo4j.cypher.result.QueryResult; diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandler.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandler.java index 6f29cf67379f8..33682fff0ae37 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandler.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandler.java @@ -26,7 +26,6 @@ import org.neo4j.bolt.runtime.BoltConnection; import org.neo4j.bolt.v1.packstream.PackOutputClosedException; import org.neo4j.bolt.v1.runtime.BoltResponseHandler; -import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.Neo4jError; import org.neo4j.bolt.v1.runtime.spi.BoltResult; import org.neo4j.logging.Log; diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltWorker.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltWorker.java deleted file mode 100644 index cf77dde681888..0000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltWorker.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime; - -public interface BoltWorker -{ - /** - * Add a new job to the job queue. - * - * @param job the {@link Job} to add - */ - void enqueue( Job job ); - - /** - * Interrupt and stop the current action but remain open for new actions. - */ - void interrupt(); - - /** - * Shut down the worker. - */ - void halt(); - -} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltWorkerQueueMonitor.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltWorkerQueueMonitor.java deleted file mode 100644 index a8b548d9c2469..0000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltWorkerQueueMonitor.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime; - -import java.util.Collection; - -public interface BoltWorkerQueueMonitor -{ - - void enqueued( Job job ); - - void dequeued( Job job ); - - void drained( Collection jobs ); - -} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/MonitoredWorkerFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/MonitoredWorkerFactory.java deleted file mode 100644 index ee5a0f33b1c65..0000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/MonitoredWorkerFactory.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime; - -import java.time.Clock; - -import org.neo4j.bolt.BoltChannel; -import org.neo4j.kernel.monitoring.Monitors; - -/** - * Thin wrapper around {@link WorkerFactory} that adds monitoring capabilities, which - * means Bolt can be introspected at runtime by adding Monitor listeners. - * - * This adds no overhead if no listeners are registered. - */ -public class MonitoredWorkerFactory implements WorkerFactory -{ - private final SessionMonitor monitor; - private final WorkerFactory delegate; - private final Clock clock; - private final Monitors monitors; - - public MonitoredWorkerFactory( Monitors monitors, WorkerFactory delegate, Clock clock ) - { - this.delegate = delegate; - this.clock = clock; - this.monitors = monitors; - this.monitor = this.monitors.newMonitor( SessionMonitor.class ); - } - - @Override - public BoltWorker newWorker( BoltChannel boltChannel, BoltWorkerQueueMonitor queueMonitor ) - { - if ( monitors.hasListeners( SessionMonitor.class ) ) - { - return new MonitoredBoltWorker( monitor, delegate.newWorker( boltChannel, queueMonitor ), clock ); - } - return delegate.newWorker( boltChannel, queueMonitor ); - } - - static class MonitoredBoltWorker implements BoltWorker - { - private final SessionMonitor monitor; - private final BoltWorker delegate; - private final Clock clock; - - MonitoredBoltWorker( SessionMonitor monitor, BoltWorker delegate, Clock clock ) - { - this.monitor = monitor; - this.delegate = delegate; - this.clock = clock; - - this.monitor.sessionStarted(); - } - - @Override - public void enqueue( Job job ) - { - monitor.messageReceived(); - long start = clock.millis(); - delegate.enqueue( session -> - { - long queueTime = clock.millis() - start; - monitor.processingStarted( queueTime ); - job.perform( session ); - monitor.processingDone( (clock.millis() - start) - queueTime ); - } ); - } - - @Override - public void interrupt() - { - delegate.interrupt(); - } - - @Override - public void halt() - { - delegate.halt(); - } - } - - /** - * For monitoring the Bolt protocol, implementing and registering this monitor allows - * tracking requests arriving via the Bolt protocol and the queuing and processing times - * of those requests. - */ - public interface SessionMonitor - { - /** - * Called when a new Bolt session (backed by a {@link BoltWorker}) is started. - */ - void sessionStarted(); - - /** - * Called whenever a request is received. This happens after a request is - * deserialized, but before it is queued pending processing. - */ - void messageReceived(); - - /** - * Called after a request is done queueing, right before the worker thread takes on the request - * @param queueTime time between {@link #messageReceived()} and this call, in milliseconds - */ - void processingStarted( long queueTime ); - - /** - * Called after a request has been processed by the worker thread - this will - * be called independent of if the request is successful, failed or ignored. - * @param processingTime time between {@link #processingStarted(long)} and this call, in milliseconds - */ - void processingDone( long processingTime ); - } -} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/WorkerFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/WorkerFactory.java deleted file mode 100644 index 1605017965817..0000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/WorkerFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime; - -import org.neo4j.bolt.BoltChannel; - -/** - * Creates {@link BoltWorker}s. Implementations of this interface can decorate queues and their jobs - * to monitor activity and enforce constraints. - */ -public interface WorkerFactory -{ - default BoltWorker newWorker( BoltChannel boltChannel ) - { - return newWorker( boltChannel, null ); - } - - /** - * @param boltChannel channel over which Bolt messages can be exchanged - * @return a new job queue - */ - BoltWorker newWorker( BoltChannel boltChannel, BoltWorkerQueueMonitor queueMonitor ); -} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorker.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorker.java deleted file mode 100644 index 2074210a18500..0000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorker.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime.concurrent; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.neo4j.bolt.v1.runtime.BoltConnectionAuthFatality; -import org.neo4j.bolt.v1.runtime.BoltConnectionFatality; -import org.neo4j.bolt.v1.runtime.BoltProtocolBreachFatality; -import org.neo4j.bolt.v1.runtime.BoltStateMachine; -import org.neo4j.bolt.v1.runtime.BoltWorker; -import org.neo4j.bolt.v1.runtime.BoltWorkerQueueMonitor; -import org.neo4j.bolt.v1.runtime.Job; -import org.neo4j.kernel.impl.logging.LogService; -import org.neo4j.logging.Log; - -/** - * Executes incoming Bolt requests for a given connection. - */ -class RunnableBoltWorker implements Runnable, BoltWorker -{ - private static final int workQueueMaxBatchSize = Integer.getInteger( "org.neo4j.bolt.workQueueMaxBatchSize", 100 ); - static final int workQueuePollDuration = Integer.getInteger( "org.neo4j.bolt.workQueuePollDuration", 10 ); - - private final BlockingQueue jobQueue = new LinkedBlockingQueue<>(); - private final BoltStateMachine machine; - private final Log log; - private final Log userLog; - private final BoltWorkerQueueMonitor queueMonitor; - - private volatile boolean keepRunning = true; - - RunnableBoltWorker( BoltStateMachine machine, LogService logging ) - { - this( machine, logging, null ); - } - - RunnableBoltWorker( BoltStateMachine machine, LogService logging, BoltWorkerQueueMonitor queueMonitor ) - { - this.machine = machine; - this.log = logging.getInternalLog( getClass() ); - this.userLog = logging.getUserLog( getClass() ); - this.queueMonitor = queueMonitor; - } - - /** - * Accept a command to be executed at some point in the future. This will get queued and executed as soon as - * possible. - * - * @param job an operation to be performed on the session - */ - @Override - public void enqueue( Job job ) - { - try - { - jobQueue.put( job ); - notifyEnqueued( job ); - } - catch ( InterruptedException e ) - { - Thread.currentThread().interrupt(); - throw new RuntimeException( "Worker interrupted while queueing request, the session may have been " + - "forcibly closed, or the database may be shutting down." ); - } - } - - @Override - public void run() - { - List batch = new ArrayList<>( workQueueMaxBatchSize ); - - try - { - while ( keepRunning ) - { - Job job = jobQueue.poll( workQueuePollDuration, TimeUnit.SECONDS ); - if ( job != null ) - { - notifyDequeued( job ); - execute( job ); - - for ( int jobCount = jobQueue.drainTo( batch, workQueueMaxBatchSize ); keepRunning && jobCount > 0; - jobCount = jobQueue.drainTo( batch, workQueueMaxBatchSize ) ) - { - notifyDrained( batch ); - executeBatch( batch ); - } - } - else - { - machine.validateTransaction(); - } - } - } - catch ( BoltConnectionAuthFatality e ) - { - // this is logged in the SecurityLog - } - catch ( BoltProtocolBreachFatality e ) - { - log.error( "Bolt protocol breach in session '" + machine.key() + "'", e ); - } - catch ( InterruptedException e ) - { - log.info( "Worker for session '" + machine.key() + "' interrupted probably due to server shutdown." ); - } - catch ( Throwable t ) - { - userLog.error( "Worker for session '" + machine.key() + "' crashed.", t ); - } - finally - { - closeStateMachine(); - } - } - - private void executeBatch( List batch ) throws BoltConnectionFatality - { - for ( int i = 0; keepRunning && i < batch.size(); i++ ) - { - execute( batch.get( i ) ); - } - batch.clear(); - } - - private void execute( Job job ) throws BoltConnectionFatality - { - job.perform( machine ); - } - - @Override - public void interrupt() - { - machine.interrupt(); - } - - @Override - public void halt() - { - try - { - // Notify the state machine that it should terminate. - // We can't close it here because this method can be called from a different thread. - // State machine will be closed when this worker exits. - machine.terminate(); - } - finally - { - keepRunning = false; - } - } - - private void closeStateMachine() - { - try - { - // Attempt to close the state machine, as an effort to release locks and other resources - machine.close(); - } - catch ( Throwable t ) - { - log.error( "Unable to close Bolt session '" + machine.key() + "'", t ); - } - } - - private void notifyEnqueued( Job job ) - { - if ( queueMonitor != null ) - { - queueMonitor.enqueued( job ); - } - } - - private void notifyDequeued( Job job ) - { - if ( queueMonitor != null ) - { - queueMonitor.dequeued( job ); - } - } - - private void notifyDrained( List jobs ) - { - if ( queueMonitor != null ) - { - queueMonitor.drained( jobs ); - } - } - -} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/ThreadedWorkerFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/ThreadedWorkerFactory.java deleted file mode 100644 index c18e2d0a5ba00..0000000000000 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/ThreadedWorkerFactory.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime.concurrent; - -import java.time.Clock; - -import org.neo4j.bolt.BoltChannel; -import org.neo4j.bolt.v1.runtime.BoltFactory; -import org.neo4j.bolt.v1.runtime.BoltStateMachine; -import org.neo4j.bolt.v1.runtime.BoltWorker; -import org.neo4j.bolt.v1.runtime.BoltWorkerQueueMonitor; -import org.neo4j.bolt.v1.runtime.WorkerFactory; -import org.neo4j.kernel.impl.logging.LogService; -import org.neo4j.scheduler.JobScheduler; - -import static org.neo4j.helpers.collection.MapUtil.stringMap; -import static org.neo4j.scheduler.JobScheduler.Group.THREAD_ID; -import static org.neo4j.scheduler.JobScheduler.Groups.sessionWorker; - -/** - * A {@link WorkerFactory} implementation that creates one thread for every session started, requests are then executed - * in the session-specific thread. - * - * This resolves a tricky issue where sharing threads for multiple sessions can cause complex deadlocks. It does so - * at the expense of creating, potentially, many threads. However, this approach is much less complex than using - * a thread pool, and is the preferred approach of several highly scalable relational databases. - * - * If we find ourselves with tens of thousands of concurrent sessions per neo4j instance, we may want to introduce an - * alternate strategy. - */ -public class ThreadedWorkerFactory implements WorkerFactory -{ - private final BoltFactory connector; - private final JobScheduler scheduler; - private final LogService logging; - private final Clock clock; - - public ThreadedWorkerFactory( BoltFactory connector, JobScheduler scheduler, LogService logging, Clock clock ) - { - this.connector = connector; - this.scheduler = scheduler; - this.logging = logging; - this.clock = clock; - } - - @Override - public BoltWorker newWorker( BoltChannel boltChannel, BoltWorkerQueueMonitor queueMonitor ) - { - BoltStateMachine machine = connector.newMachine( boltChannel, clock ); - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logging, queueMonitor ); - - scheduler.schedule( sessionWorker, worker, stringMap( THREAD_ID, machine.key() ) ); - - return worker; - } -} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImpl.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImpl.java index a7b625ac7b063..9652e635d0cd7 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImpl.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/transport/BoltMessagingProtocolHandlerImpl.java @@ -32,7 +32,6 @@ import org.neo4j.bolt.v1.messaging.BoltMessageRouter; import org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter; import org.neo4j.bolt.v1.messaging.Neo4jPack; -import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.logging.Log; diff --git a/community/bolt/src/test/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactoryTest.java b/community/bolt/src/test/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactoryTest.java index 19c6170dc039b..e710477815246 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactoryTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/transport/DefaultBoltProtocolHandlerFactoryTest.java @@ -34,7 +34,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandlerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandlerTest.java index 0d37dc4cd27d8..81de88e0f0d0e 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandlerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandlerTest.java @@ -25,7 +25,6 @@ import org.neo4j.bolt.runtime.BoltConnection; import org.neo4j.bolt.v1.packstream.PackOutputClosedException; -import org.neo4j.bolt.v1.runtime.BoltWorker; import org.neo4j.bolt.v1.runtime.Neo4jError; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.logging.AssertableLogProvider; diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/MonitoredBoltWorkerFactoryTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/MonitoredBoltWorkerFactoryTest.java deleted file mode 100644 index 532f2e2acbbe1..0000000000000 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/MonitoredBoltWorkerFactoryTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime; - -import org.junit.Test; - -import java.util.concurrent.TimeUnit; - -import org.neo4j.bolt.BoltChannel; -import org.neo4j.bolt.v1.runtime.MonitoredWorkerFactory.MonitoredBoltWorker; -import org.neo4j.kernel.monitoring.Monitors; -import org.neo4j.time.Clocks; -import org.neo4j.time.FakeClock; - -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.neo4j.bolt.testing.NullResponseHandler.nullResponseHandler; -import static org.neo4j.time.Clocks.systemClock; - -public class MonitoredBoltWorkerFactoryTest -{ - private static final BoltChannel boltChannel = mock( BoltChannel.class ); - - @Test - public void shouldSignalReceivedStartAndComplete() - { - // given - FakeClock clock = Clocks.fakeClock(); - - WorkerFactory delegate = mock( WorkerFactory.class ); - BoltStateMachine machine = mock( BoltStateMachine.class ); - when( delegate.newWorker( same( boltChannel ), any() ) ) - .thenReturn( new BoltWorker() - { - @Override - public void enqueue( Job job ) - { - clock.forward( 1337, TimeUnit.MILLISECONDS ); - try - { - job.perform( machine ); - } - catch ( BoltConnectionFatality connectionFatality ) - { - throw new RuntimeException( connectionFatality ); - } - } - - @Override - public void interrupt() - { - throw new RuntimeException(); - } - - @Override - public void halt() - { - throw new RuntimeException(); - } - - } ); - - Monitors monitors = new Monitors(); - CountingSessionMonitor monitor = new CountingSessionMonitor(); - monitors.addMonitorListener( monitor ); - - MonitoredWorkerFactory workerFactory = new MonitoredWorkerFactory( monitors, delegate, clock ); - BoltWorker worker = workerFactory.newWorker( boltChannel ); - - // when - worker.enqueue( stateMachine -> - { - stateMachine.run( "hello", null, nullResponseHandler() ); - clock.forward( 1338, TimeUnit.MILLISECONDS ); - } ); - - // then - assertEquals( 1, monitor.messagesReceived ); - assertEquals( 1337, monitor.queueTime ); - assertEquals( 1338, monitor.processingTime ); - } - - @Test - public void shouldReportStartedSessions() - { - int workersCount = 42; - - Monitors monitors = new Monitors(); - CountingSessionMonitor monitor = new CountingSessionMonitor(); - monitors.addMonitorListener( monitor ); - - WorkerFactory mockWorkers = mock( WorkerFactory.class ); - when( mockWorkers.newWorker( boltChannel ) ).thenReturn( mock( BoltWorker.class ) ); - - MonitoredWorkerFactory workerFactory = new MonitoredWorkerFactory( monitors, mockWorkers, systemClock() ); - - for ( int i = 0; i < workersCount; i++ ) - { - workerFactory.newWorker( boltChannel ); - } - - assertEquals( workersCount, monitor.sessionsStarted ); - } - - @Test - public void shouldNotWrapWithMonitoredSessionIfNobodyIsListening() - { - // Given - // Monitoring adds GC overhead, so we only want to do the work involved - // if someone has actually registered a listener. We still allow plugging - // monitoring in at runtime, but it will only apply to sessions started - // after monitor listeners are added - WorkerFactory workerFactory = mock( WorkerFactory.class ); - BoltWorker boltWorker = mock( BoltWorker.class ); - when( workerFactory.newWorker( boltChannel, null ) ).thenReturn( boltWorker ); - - Monitors monitors = new Monitors(); - MonitoredWorkerFactory monitoredWorkerFactory = new MonitoredWorkerFactory( monitors, workerFactory, Clocks.fakeClock() ); - - // When - - BoltWorker worker = monitoredWorkerFactory.newWorker( boltChannel ); - - // Then - assertEquals( boltWorker, worker ); - - // But when I register a listener - monitors.addMonitorListener( new CountingSessionMonitor() ); - - // Then new sessions should be monitored - assertThat( monitoredWorkerFactory.newWorker( boltChannel ), instanceOf( MonitoredBoltWorker.class ) ); - } - - private static class CountingSessionMonitor implements MonitoredWorkerFactory.SessionMonitor - { - long sessionsStarted; - long messagesReceived; - long queueTime; - long processingTime; - - @Override - public void sessionStarted() - { - sessionsStarted++; - } - - @Override - public void messageReceived() - { - messagesReceived++; - } - - @Override - public void processingStarted( long queueTime ) - { - this.queueTime += queueTime; - } - - @Override - public void processingDone( long processingTime ) - { - this.processingTime += processingTime; - } - } -} diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/SynchronousBoltWorker.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/SynchronousBoltWorker.java deleted file mode 100644 index c2850509fdb03..0000000000000 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/SynchronousBoltWorker.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime; - -public class SynchronousBoltWorker implements BoltWorker -{ - private final BoltStateMachine machine; - - public SynchronousBoltWorker( BoltStateMachine machine ) - { - this.machine = machine; - } - - @Override - public void enqueue( Job job ) - { - try - { - job.perform( machine ); - } - catch ( BoltConnectionFatality connectionFatality ) - { - throw new RuntimeException( connectionFatality ); - } - } - - @Override - public void interrupt() - { - machine.interrupt(); - } - - @Override - public void halt() - { - machine.close(); - } -} diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorkerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorkerTest.java deleted file mode 100644 index 873328967735a..0000000000000 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorkerTest.java +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.bolt.v1.runtime.concurrent; - -import org.apache.commons.lang3.mutable.MutableBoolean; -import org.junit.Before; -import org.junit.Test; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.neo4j.bolt.v1.runtime.BoltConnectionAuthFatality; -import org.neo4j.bolt.v1.runtime.BoltProtocolBreachFatality; -import org.neo4j.bolt.v1.runtime.BoltStateMachine; -import org.neo4j.bolt.v1.runtime.BoltWorkerQueueMonitor; -import org.neo4j.bolt.v1.runtime.Job; -import org.neo4j.kernel.impl.logging.LogService; -import org.neo4j.kernel.impl.logging.NullLogService; -import org.neo4j.logging.AssertableLogProvider; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollection; -import static org.mockito.Matchers.anyListOf; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; -import static org.neo4j.logging.AssertableLogProvider.inLog; - -public class RunnableBoltWorkerTest -{ - - private AssertableLogProvider internalLog; - private AssertableLogProvider userLog; - private LogService logService; - private BoltStateMachine machine; - - @Before - public void setup() - { - internalLog = new AssertableLogProvider(); - userLog = new AssertableLogProvider(); - logService = mock( LogService.class ); - when( logService.getUserLogProvider() ).thenReturn( userLog ); - when( logService.getUserLog( RunnableBoltWorker.class ) ) - .thenReturn( userLog.getLog( RunnableBoltWorker.class ) ); - when( logService.getInternalLogProvider() ).thenReturn( internalLog ); - when( logService.getInternalLog( RunnableBoltWorker.class ) ) - .thenReturn( internalLog.getLog( RunnableBoltWorker.class ) ); - machine = mock( BoltStateMachine.class ); - when( machine.key() ).thenReturn( "test-session" ); - } - - @Test - public void shouldExecuteWorkWhenRun() throws Throwable - { - // Given - RunnableBoltWorker worker = new RunnableBoltWorker( machine, NullLogService.getInstance() ); - worker.enqueue( s -> s.run( "Hello, world!", null, null ) ); - worker.enqueue( s -> worker.halt() ); - - // When - worker.run(); - - // Then - verify( machine ).run( "Hello, world!", null, null ); - verify( machine ).terminate(); - verify( machine ).close(); - verifyNoMoreInteractions( machine ); - } - - @Test - public void errorThrownDuringExecutionShouldCauseSessionClose() - { - // Given - RunnableBoltWorker worker = new RunnableBoltWorker( machine, NullLogService.getInstance() ); - worker.enqueue( s -> - { - throw new RuntimeException( "It didn't work out." ); - } ); - - // When - worker.run(); - - // Then - verify( machine ).close(); - } - - @Test - public void authExceptionShouldNotBeLoggedHere() - { - // Given - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - worker.enqueue( s -> - { - throw new BoltConnectionAuthFatality( "fatality" ); - } ); - - // When - worker.run(); - - // Then - verify( machine ).close(); - internalLog.assertNone( inLog( RunnableBoltWorker.class ).any() ); - userLog.assertNone( inLog( RunnableBoltWorker.class ).any() ); - } - - @Test - public void protocolBreachesShouldBeLoggedWithStackTraces() - { - // Given - BoltProtocolBreachFatality error = new BoltProtocolBreachFatality( "protocol breach fatality" ); - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - worker.enqueue( s -> - { - throw error; - } ); - - // When - worker.run(); - - // Then - verify( machine ).close(); - internalLog.assertExactly( inLog( RunnableBoltWorker.class ) - .error( equalTo( "Bolt protocol breach in session 'test-session'" ), equalTo( error ) ) ); - userLog.assertNone( inLog( RunnableBoltWorker.class ).any() ); - } - - @Test - public void haltShouldTerminateButNotCloseTheStateMachine() - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - - worker.halt(); - - verify( machine ).terminate(); - verify( machine, never() ).close(); - } - - @Test - public void workerCanBeHaltedMultipleTimes() - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - - worker.halt(); - worker.halt(); - worker.halt(); - - verify( machine, times( 3 ) ).terminate(); - verify( machine, never() ).close(); - - worker.run(); - - verify( machine ).close(); - } - - @Test - public void stateMachineIsClosedOnExit() - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - - worker.enqueue( machine1 -> - { - machine1.run( "RETURN 1", null, null ); - worker.enqueue( machine2 -> - { - machine2.run( "RETURN 1", null, null ); - worker.enqueue( machine3 -> - { - worker.halt(); - worker.enqueue( machine4 -> fail( "Should not be executed" ) ); - } ); - } ); - } ); - - worker.run(); - - verify( machine ).close(); - } - - @Test - public void stateMachineNotClosedOnHalt() - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - - worker.halt(); - - verify( machine, never() ).close(); - } - - @Test - public void stateMachineInterrupted() - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - - worker.interrupt(); - - verify( machine ).interrupt(); - } - - @Test - public void stateMachineCloseFailureIsLogged() - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - - RuntimeException closeError = new RuntimeException( "Oh!" ); - doThrow( closeError ).when( machine ).close(); - - worker.enqueue( s -> worker.halt() ); - worker.run(); - - internalLog.assertExactly( inLog( RunnableBoltWorker.class ).error( - equalTo( "Unable to close Bolt session 'test-session'" ), - equalTo( closeError ) ) ); - } - - @Test - public void haltIsRespected() - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - - worker.enqueue( machine1 -> - worker.enqueue( machine2 -> - worker.enqueue( machine3 -> - { - worker.halt(); - verify( machine ).terminate(); - worker.enqueue( machine4 -> fail( "Should not be executed" ) ); - } ) ) ); - - worker.run(); - - verify( machine ).close(); - } - - @Test - public void runDoesNothingAfterHalt() - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - MutableBoolean jobWasExecuted = new MutableBoolean(); - worker.enqueue( machine1 -> - { - jobWasExecuted.setTrue(); - fail( "Should not be executed" ); - } ); - - worker.halt(); - worker.run(); - - assertFalse( jobWasExecuted.booleanValue() ); - verify( machine ).close(); - } - - @Test - public void shouldValidateTransaction() throws Exception - { - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); - Future workerFuture = Executors.newSingleThreadExecutor().submit( worker ); - - Thread.sleep( Duration.ofSeconds( RunnableBoltWorker.workQueuePollDuration ).toMillis() ); - - worker.halt(); - workerFuture.get(); - - verify( machine, atLeastOnce() ).validateTransaction(); - } - - @Test - public void shouldNotNotifyMonitorWhenNothingEnqueued() - { - BoltWorkerQueueMonitor monitor = mock( BoltWorkerQueueMonitor.class ); - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService, monitor ); - - verify( monitor, never() ).enqueued( any( Job.class ) ); - verify( monitor, never() ).dequeued( any( Job.class ) ); - verify( monitor, never() ).drained( any( Collection.class ) ); - } - - @Test - public void shouldNotifyMonitorWhenQueued() - { - BoltWorkerQueueMonitor monitor = mock( BoltWorkerQueueMonitor.class ); - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService, monitor ); - Job job = s -> s.run( "Hello world", null, null ); - - worker.enqueue( job ); - - verify( monitor ).enqueued( job ); - } - - @Test - public void shouldNotifyMonitorWhenDequeued() - { - BoltWorkerQueueMonitor monitor = mock( BoltWorkerQueueMonitor.class ); - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService, monitor ); - Job job = s -> s.run( "Hello world", null, null ); - - worker.enqueue( job ); - worker.enqueue( s -> worker.halt() ); - worker.run(); - - verify( monitor ).enqueued( job ); - verify( monitor ).dequeued( job ); - } - - @Test - public void shouldNotifyMonitorWhenDrained() - { - List drainedJobs = new ArrayList<>(); - BoltWorkerQueueMonitor monitor = newMonitor( drainedJobs ); - RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService, monitor ); - Job job1 = s -> s.run( "Hello world 1", null, null ); - Job job2 = s -> s.run( "Hello world 1", null, null ); - Job job3 = s -> s.run( "Hello world 1", null, null ); - Job haltJob = s -> worker.halt(); - - worker.enqueue( job1 ); - worker.enqueue( job2 ); - worker.enqueue( job3 ); - worker.enqueue( haltJob ); - worker.run(); - - verify( monitor ).enqueued( job1 ); - verify( monitor ).enqueued( job2 ); - verify( monitor ).enqueued( job3 ); - verify( monitor ).dequeued( job1 ); - verify( monitor ).drained( anyCollection() ); - - assertThat( drainedJobs, hasSize( 3 ) ); - assertThat( drainedJobs, contains( job2, job3, haltJob ) ); - } - - private static BoltWorkerQueueMonitor newMonitor( final List drained ) - { - BoltWorkerQueueMonitor monitor = mock( BoltWorkerQueueMonitor.class ); - - doAnswer( invocation -> - { - final Collection jobs = invocation.getArgument( 0 ); - drained.addAll( jobs ); - return null; - } ).when( monitor ).drained( anyListOf( Job.class ) ); - - return monitor; - } - -} diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltThrottleMaxDurationIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltThrottleMaxDurationIT.java index b42c2ce8ffc44..70e2df1cace6a 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltThrottleMaxDurationIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/BoltThrottleMaxDurationIT.java @@ -39,7 +39,6 @@ import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.runtime.BoltConnection; -import org.neo4j.bolt.v1.runtime.WorkerFactory; import org.neo4j.bolt.v1.transport.socket.client.SecureSocketConnection; import org.neo4j.bolt.v1.transport.socket.client.SocketConnection; import org.neo4j.bolt.v1.transport.socket.client.TransportConnection; diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java index ccc9a9c824e01..5af0bd2dbd50e 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/FragmentedMessageDeliveryTest.java @@ -44,7 +44,6 @@ import org.neo4j.bolt.v1.packstream.BufferedChannelOutput; import org.neo4j.bolt.v1.runtime.BoltResponseHandler; import org.neo4j.bolt.v1.runtime.BoltStateMachine; -import org.neo4j.bolt.v1.runtime.SynchronousBoltWorker; import org.neo4j.bolt.v1.transport.BoltMessagingProtocolHandlerImpl; import org.neo4j.bolt.v2.messaging.Neo4jPackV2; import org.neo4j.kernel.impl.logging.NullLogService; diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java index 3455e0f16d606..21b1ffcb2eb9b 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/SocketTransportHandlerTest.java @@ -41,7 +41,6 @@ import org.neo4j.bolt.v1.messaging.Neo4jPack; import org.neo4j.bolt.v1.messaging.Neo4jPackV1; import org.neo4j.bolt.v1.runtime.BoltStateMachine; -import org.neo4j.bolt.v1.runtime.SynchronousBoltWorker; import org.neo4j.bolt.v1.transport.BoltMessagingProtocolHandlerImpl; import org.neo4j.bolt.v2.messaging.Neo4jPackV2; import org.neo4j.kernel.impl.logging.NullLogService; diff --git a/community/shell/src/test/java/org/neo4j/shell/StartClientIT.java b/community/shell/src/test/java/org/neo4j/shell/StartClientIT.java index e6263101f1812..30235da32e1e1 100644 --- a/community/shell/src/test/java/org/neo4j/shell/StartClientIT.java +++ b/community/shell/src/test/java/org/neo4j/shell/StartClientIT.java @@ -33,7 +33,7 @@ import java.io.Serializable; import java.rmi.RemoteException; -import org.neo4j.bolt.v1.runtime.WorkerFactory; +import org.neo4j.bolt.BoltKernelExtension; import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Transaction; import org.neo4j.logging.AssertableLogProvider; @@ -216,7 +216,7 @@ protected GraphDatabaseShellServer getGraphDatabaseShellServer( File path, boole getClass().getResource( "/config-with-bolt-connector.conf" ).getFile()}, mock( CtrlCHandler.class ) ); try { - log.assertNone( inLog( startsWith( WorkerFactory.class.getPackage().getName() ) ).any() ); + log.assertNone( inLog( startsWith( BoltKernelExtension.class.getPackage().getName() ) ).any() ); } finally { diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/db/BoltMetrics.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/db/BoltMetrics.java index b3a01363c63af..c513fe65e512a 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/db/BoltMetrics.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/db/BoltMetrics.java @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.neo4j.bolt.runtime.BoltConnectionMetricsMonitor; -import org.neo4j.bolt.v1.runtime.MonitoredWorkerFactory; import org.neo4j.kernel.impl.annotations.Documented; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.monitoring.Monitors; diff --git a/integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java b/integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java index 1ea57550cb2ac..5d8b3087e0d31 100644 --- a/integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java +++ b/integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java @@ -28,9 +28,8 @@ import java.time.Clock; import java.util.function.Consumer; +import org.neo4j.bolt.runtime.BoltConnectionMetricsMonitor; import org.neo4j.bolt.v1.runtime.BoltFactory; -import org.neo4j.bolt.v1.runtime.MonitoredWorkerFactory.SessionMonitor; -import org.neo4j.bolt.v1.runtime.WorkerFactory; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; @@ -54,8 +53,6 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.Connector.ConnectorType.BOLT; @@ -84,35 +81,11 @@ public void shutdownDb() IOUtils.closeAllSilently( driver ); } - @Test - public void throwsWhenWorkerCreationFails() - { - WorkerFactory workerFactory = mock( WorkerFactory.class ); - when( workerFactory.newWorker( any(), any() ) ).thenThrow( new IllegalStateException( "Oh!" ) ); - - BoltKernelExtension extension = new BoltKernelExtensionWithWorkerFactory( workerFactory ); - - int port = PortAuthority.allocatePort(); - - db = startDbWithBolt( new GraphDatabaseFactoryWithCustomBoltKernelExtension( extension ), port ); - - try - { - // attempt to create a driver when server is unavailable - driver = createDriver( port ); - fail( "Exception expected" ); - } - catch ( Exception e ) - { - assertThat( e, instanceOf( ServiceUnavailableException.class ) ); - } - } - @Test public void throwsWhenMonitoredWorkerCreationFails() { ThrowingSessionMonitor sessionMonitor = new ThrowingSessionMonitor(); - sessionMonitor.throwInSessionStarted(); + sessionMonitor.throwInConnectionOpened(); Monitors monitors = newMonitorsSpy( sessionMonitor ); int port = PortAuthority.allocatePort(); @@ -139,13 +112,13 @@ public void throwsWhenInitMessageReceiveFails() @Test public void throwsWhenInitMessageProcessingFailsToStart() { - throwsWhenInitMessageFails( ThrowingSessionMonitor::throwInProcessingStarted, false ); + throwsWhenInitMessageFails( ThrowingSessionMonitor::throwInMessageProcessingStarted, false ); } @Test public void throwsWhenInitMessageProcessingFailsToComplete() { - throwsWhenInitMessageFails( ThrowingSessionMonitor::throwInProcessingDone, true ); + throwsWhenInitMessageFails( ThrowingSessionMonitor::throwInMessageProcessingCompleted, true ); } @Test @@ -157,13 +130,13 @@ public void throwsWhenRunMessageReceiveFails() @Test public void throwsWhenRunMessageProcessingFailsToStart() { - throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInProcessingStarted ); + throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInMessageProcessingStarted ); } @Test public void throwsWhenRunMessageProcessingFailsToComplete() { - throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInProcessingDone ); + throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInMessageProcessingCompleted ); } private void throwsWhenInitMessageFails( Consumer monitorSetup, @@ -266,39 +239,34 @@ private static Monitors newMonitorsSpy( ThrowingSessionMonitor sessionMonitor ) Monitors monitors = spy( new Monitors() ); // it is not allowed to throw exceptions from monitors // make the given sessionMonitor be returned as is, without any proxying - when( monitors.newMonitor( SessionMonitor.class ) ).thenReturn( sessionMonitor ); - when( monitors.hasListeners( SessionMonitor.class ) ).thenReturn( true ); + when( monitors.newMonitor( BoltConnectionMetricsMonitor.class ) ).thenReturn( sessionMonitor ); + when( monitors.hasListeners( BoltConnectionMetricsMonitor.class ) ).thenReturn( true ); return monitors; } - private static class BoltKernelExtensionWithWorkerFactory extends BoltKernelExtension + private static class ThrowingSessionMonitor implements BoltConnectionMetricsMonitor { - final WorkerFactory workerFactory; + volatile boolean throwInConnectionOpened; + volatile boolean throwInMessageReceived; + volatile boolean throwInMessageProcessingStarted; + volatile boolean throwInMessageProcessingCompleted; - BoltKernelExtensionWithWorkerFactory( WorkerFactory workerFactory ) + @Override + public void connectionOpened() { - this.workerFactory = workerFactory; + throwIfNeeded( throwInConnectionOpened ); } @Override - protected WorkerFactory createWorkerFactory( BoltFactory boltFactory, JobScheduler scheduler, - Dependencies dependencies, LogService logService, Clock clock ) + public void connectionActivated() { - return workerFactory; - } - } - private static class ThrowingSessionMonitor implements SessionMonitor - { - volatile boolean throwInSessionStarted; - volatile boolean throwInMessageReceived; - volatile boolean throwInProcessingStarted; - volatile boolean throwInProcessingDone; + } @Override - public void sessionStarted() + public void connectionWaiting() { - throwIfNeeded( throwInSessionStarted ); + } @Override @@ -308,20 +276,32 @@ public void messageReceived() } @Override - public void processingStarted( long queueTime ) + public void messageProcessingStarted( long queueTime ) + { + throwIfNeeded( throwInMessageProcessingStarted ); + } + + @Override + public void messageProcessingCompleted( long processingTime ) { - throwIfNeeded( throwInProcessingStarted ); + throwIfNeeded( throwInMessageProcessingCompleted ); } @Override - public void processingDone( long processingTime ) + public void messageProcessingFailed() { - throwIfNeeded( throwInProcessingDone ); + + } + + @Override + public void connectionClosed() + { + } - void throwInSessionStarted() + void throwInConnectionOpened() { - throwInSessionStarted = true; + throwInConnectionOpened = true; } void throwInMessageReceived() @@ -329,14 +309,14 @@ void throwInMessageReceived() throwInMessageReceived = true; } - void throwInProcessingStarted() + void throwInMessageProcessingStarted() { - throwInProcessingStarted = true; + throwInMessageProcessingStarted = true; } - void throwInProcessingDone() + void throwInMessageProcessingCompleted() { - throwInProcessingDone = true; + throwInMessageProcessingCompleted = true; } void throwIfNeeded( boolean shouldThrow )