From b15a70bc012be4bbfc838f3c7e1c0256be35ef77 Mon Sep 17 00:00:00 2001 From: Ali Ince Date: Mon, 26 Mar 2018 14:36:49 +0100 Subject: [PATCH] Prestart core threads and update config options --- .../CachedThreadPoolExecutorFactory.java | 7 +++- .../bolt/runtime/DefaultBoltConnection.java | 2 +- .../bolt/runtime/ExecutorBoltScheduler.java | 2 +- .../ExecutorBoltSchedulerProvider.java | 2 +- .../neo4j/bolt/runtime/ExecutorFactory.java | 2 +- .../CachedThreadPoolExecutorFactoryTest.java | 37 +++++++++++++++---- .../runtime/ExecutorBoltSchedulerTest.java | 11 +++--- .../neo4j/kernel/api/exceptions/Status.java | 2 +- .../factory/GraphDatabaseSettings.java | 7 ++-- .../kernel/configuration/BoltConnector.java | 6 ++- 10 files changed, 54 insertions(+), 24 deletions(-) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/runtime/CachedThreadPoolExecutorFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/runtime/CachedThreadPoolExecutorFactory.java index 8590463d33daf..7f7ba525f9a93 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/runtime/CachedThreadPoolExecutorFactory.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/runtime/CachedThreadPoolExecutorFactory.java @@ -54,10 +54,15 @@ public CachedThreadPoolExecutorFactory( Log log, RejectedExecutionHandler reject } @Override - public ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, ThreadFactory threadFactory ) + public ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, boolean startCoreThreads, ThreadFactory threadFactory ) { ThreadPool result = new ThreadPool( corePoolSize, maxPoolSize, keepAlive, createTaskQueue( queueSize ), threadFactory, rejectionHandler ); + if ( startCoreThreads ) + { + result.prestartAllCoreThreads(); + } + return result; } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/runtime/DefaultBoltConnection.java b/community/bolt/src/main/java/org/neo4j/bolt/runtime/DefaultBoltConnection.java index dae4def227ea4..dfbcd0ef44e0e 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/runtime/DefaultBoltConnection.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/runtime/DefaultBoltConnection.java @@ -254,7 +254,7 @@ public void handleSchedulingError( Throwable t ) { error = Neo4jError.from( Status.Request.NoThreadsAvailable, Status.Request.NoThreadsAvailable.code().description() ); message = String.format( "Unable to schedule bolt session '%s' for execution since there are no available threads to " + - "serve it at the moment. You can retry at a later time or consider increasing max pool / queue size for bolt connector(s).", id() ); + "serve it at the moment. You can retry at a later time or consider increasing max thread pool size for bolt connector(s).", id() ); } else { diff --git a/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorBoltScheduler.java b/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorBoltScheduler.java index 6555ec24c954c..b7fae347fa15c 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorBoltScheduler.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorBoltScheduler.java @@ -83,7 +83,7 @@ public String connector() @Override public void start() { - threadPool = executorFactory.create( corePoolSize, maxPoolSize, keepAlive, queueSize, + threadPool = executorFactory.create( corePoolSize, maxPoolSize, keepAlive, queueSize, true, new NameAppendingThreadFactory( connector, scheduler.threadFactory( JobScheduler.Groups.boltWorker ) ) ); } diff --git a/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerProvider.java b/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerProvider.java index a6a88ed141e0a..5a3e53676630f 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerProvider.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerProvider.java @@ -60,7 +60,7 @@ public void start() BoltScheduler boltScheduler = new ExecutorBoltScheduler( connector.key(), executorFactory, scheduler, logService, config.get( connector.thread_pool_core_size ), config.get( connector.thread_pool_max_size ), config.get( connector.thread_pool_keep_alive ), - config.get( connector.thread_pool_queue_size ), forkJoinThreadPool ); + config.get( connector.unsupported_thread_pool_queue_size ), forkJoinThreadPool ); boltScheduler.start(); boltSchedulers.put( connector.key(), boltScheduler ); } ); diff --git a/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorFactory.java b/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorFactory.java index a96dc8e9d5b76..5f8ea5185f0ca 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorFactory.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/runtime/ExecutorFactory.java @@ -26,6 +26,6 @@ public interface ExecutorFactory { - ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, ThreadFactory threadFactory ); + ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, boolean startCoreThreads, ThreadFactory threadFactory ); } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/runtime/CachedThreadPoolExecutorFactoryTest.java b/community/bolt/src/test/java/org/neo4j/bolt/runtime/CachedThreadPoolExecutorFactoryTest.java index 8676719b73e64..fc12346e29d35 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/runtime/CachedThreadPoolExecutorFactoryTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/runtime/CachedThreadPoolExecutorFactoryTest.java @@ -55,6 +55,7 @@ import static org.junit.Assert.fail; import static org.junit.runners.Parameterized.Parameter; import static org.junit.runners.Parameterized.Parameters; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.neo4j.bolt.runtime.CachedThreadPoolExecutorFactory.SYNCHRONOUS_QUEUE; import static org.neo4j.bolt.runtime.CachedThreadPoolExecutorFactory.UNBOUNDED_QUEUE; @@ -91,7 +92,7 @@ public void cleanup() @Test public void createShouldAssignCorrectQueue() { - executorService = factory.create( 0, 1, Duration.ZERO, queueSize, newThreadFactory() ); + executorService = factory.create( 0, 1, Duration.ZERO, queueSize, false, newThreadFactory() ); if ( executorService instanceof ThreadPoolExecutor ) { @@ -119,7 +120,7 @@ public void createShouldAssignCorrectQueue() @Test public void createShouldCreateExecutor() { - executorService = factory.create( 0, 1, Duration.ZERO, queueSize, newThreadFactory() ); + executorService = factory.create( 0, 1, Duration.ZERO, queueSize, false, newThreadFactory() ); assertNotNull( executorService ); assertFalse( executorService.isShutdown() ); @@ -131,7 +132,7 @@ public void createShouldNotCreateExecutorWhenCorePoolSizeIsNegative() { try { - factory.create( -1, 10, Duration.ZERO, 0, newThreadFactory() ); + factory.create( -1, 10, Duration.ZERO, 0, false, newThreadFactory() ); fail( "should throw exception" ); } catch ( IllegalArgumentException ex ) @@ -145,7 +146,7 @@ public void createShouldNotCreateExecutorWhenMaxPoolSizeIsNegative() { try { - factory.create( 0, -1, Duration.ZERO, 0, newThreadFactory() ); + factory.create( 0, -1, Duration.ZERO, 0, false, newThreadFactory() ); fail( "should throw exception" ); } catch ( IllegalArgumentException ex ) @@ -159,7 +160,7 @@ public void createShouldNotCreateExecutorWhenMaxPoolSizeIsZero() { try { - factory.create( 0, 0, Duration.ZERO, 0, newThreadFactory() ); + factory.create( 0, 0, Duration.ZERO, 0, false, newThreadFactory() ); fail( "should throw exception" ); } catch ( IllegalArgumentException ex ) @@ -168,12 +169,32 @@ public void createShouldNotCreateExecutorWhenMaxPoolSizeIsZero() } } + @Test + public void createShouldStartCoreThreadsIfAsked() + { + AtomicInteger threadCounter = new AtomicInteger(); + + factory.create( 5, 10, Duration.ZERO, 0, true, newThreadFactoryWithCounter( threadCounter ) ); + + assertEquals( 5, threadCounter.get() ); + } + + @Test + public void createShouldNotStartCoreThreadsIfNotAsked() + { + AtomicInteger threadCounter = new AtomicInteger(); + + factory.create( 5, 10, Duration.ZERO, 0, false, newThreadFactoryWithCounter( threadCounter ) ); + + assertEquals( 0, threadCounter.get() ); + } + @Test public void createShouldNotCreateExecutorWhenMaxPoolSizeIsLessThanCorePoolSize() { try { - factory.create( 10, 5, Duration.ZERO, 0, newThreadFactory() ); + factory.create( 10, 5, Duration.ZERO, 0, false, newThreadFactory() ); fail( "should throw exception" ); } catch ( IllegalArgumentException ex ) @@ -188,7 +209,7 @@ public void createdExecutorShouldExecuteSubmittedTasks() throws Exception AtomicBoolean exitCondition = new AtomicBoolean( false ); AtomicInteger threadCounter = new AtomicInteger( 0 ); - executorService = factory.create( 0, 1, Duration.ZERO, 0, newThreadFactoryWithCounter( threadCounter ) ); + executorService = factory.create( 0, 1, Duration.ZERO, 0, false, newThreadFactoryWithCounter( threadCounter ) ); assertNotNull( executorService ); assertEquals( 0, threadCounter.get() ); @@ -209,7 +230,7 @@ public void createdExecutorShouldFavorPoolSizes() AtomicBoolean exitCondition = new AtomicBoolean( false ); AtomicInteger threadCounter = new AtomicInteger( 0 ); - executorService = factory.create( 0, 5, Duration.ZERO, 0, newThreadFactoryWithCounter( threadCounter ) ); + executorService = factory.create( 0, 5, Duration.ZERO, 0, false, newThreadFactoryWithCounter( threadCounter ) ); assertNotNull( executorService ); assertEquals( 0, threadCounter.get() ); diff --git a/community/bolt/src/test/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.java index 19c4e11a292f4..15ec6b1cf871a 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/runtime/ExecutorBoltSchedulerTest.java @@ -56,6 +56,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -93,7 +94,7 @@ public void cleanup() throws Throwable public void initShouldCreateThreadPool() throws Throwable { ExecutorFactory mockExecutorFactory = mock( ExecutorFactory.class ); - when( mockExecutorFactory.create( anyInt(), anyInt(), any(), anyInt(), any() ) ).thenReturn( Executors.newCachedThreadPool() ); + when( mockExecutorFactory.create( anyInt(), anyInt(), any(), anyInt(), anyBoolean(), any() ) ).thenReturn( Executors.newCachedThreadPool() ); ExecutorBoltScheduler scheduler = new ExecutorBoltScheduler( CONNECTOR_KEY, mockExecutorFactory, jobScheduler, logService, 0, 10, Duration.ofMinutes( 1 ), 0, ForkJoinPool.commonPool() ); @@ -101,7 +102,7 @@ public void initShouldCreateThreadPool() throws Throwable scheduler.start(); verify( jobScheduler ).threadFactory( JobScheduler.Groups.boltWorker ); - verify( mockExecutorFactory, times( 1 ) ).create( anyInt(), anyInt(), any( Duration.class ), anyInt(), any( ThreadFactory.class ) ); + verify( mockExecutorFactory, times( 1 ) ).create( anyInt(), anyInt(), any( Duration.class ), anyInt(), anyBoolean(), any( ThreadFactory.class ) ); } @Test @@ -109,7 +110,7 @@ public void shutdownShouldTerminateThreadPool() throws Throwable { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ExecutorFactory mockExecutorFactory = mock( ExecutorFactory.class ); - when( mockExecutorFactory.create( anyInt(), anyInt(), any(), anyInt(), any() ) ).thenReturn( cachedThreadPool ); + when( mockExecutorFactory.create( anyInt(), anyInt(), any(), anyInt(), anyBoolean(), any() ) ).thenReturn( cachedThreadPool ); ExecutorBoltScheduler scheduler = new ExecutorBoltScheduler( CONNECTOR_KEY, mockExecutorFactory, jobScheduler, logService, 0, 10, Duration.ofMinutes( 1 ), 0, ForkJoinPool.commonPool() ); @@ -234,7 +235,8 @@ public void destroyedShouldCancelActiveWorkItem() throws Throwable String id = UUID.randomUUID().toString(); BoltConnection connection = newConnection( id ); AtomicBoolean exitCondition = new AtomicBoolean(); - when( connection.processNextBatch() ).thenAnswer( inv -> { + when( connection.processNextBatch() ).thenAnswer( inv -> + { processNextBatchCount.incrementAndGet(); return awaitExit( exitCondition ); } ); @@ -324,5 +326,4 @@ private static boolean awaitExit( AtomicBoolean exitCondition ) Predicates.awaitForever( () -> Thread.currentThread().isInterrupted() || exitCondition.get(), 500, MILLISECONDS ); return true; } - } diff --git a/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java b/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java index 2d9808936fe93..ae64e406d2f50 100644 --- a/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java +++ b/community/common/src/main/java/org/neo4j/kernel/api/exceptions/Status.java @@ -108,7 +108,7 @@ enum Request implements Status "The client made a request but did not consume outgoing buffers in a timely fashion." ), NoThreadsAvailable( TransientError, // TODO: see above "There are no available threads to serve this request at the moment. You can retry at a later time " + - "or consider increasing max pool / queue size for bolt connector(s)." ); + "or consider increasing max thread pool size for bolt connector(s)." ); private final Code code; @Override diff --git a/community/kernel/src/main/java/org/neo4j/graphdb/factory/GraphDatabaseSettings.java b/community/kernel/src/main/java/org/neo4j/graphdb/factory/GraphDatabaseSettings.java index 2fc78870d0d8f..fa43a8001e377 100644 --- a/community/kernel/src/main/java/org/neo4j/graphdb/factory/GraphDatabaseSettings.java +++ b/community/kernel/src/main/java/org/neo4j/graphdb/factory/GraphDatabaseSettings.java @@ -836,15 +836,16 @@ public enum LabelIndex public static final Setting bolt_write_throttle = setting( "unsupported.dbms.bolt.write_throttle", BOOLEAN, TRUE ); @Description( "When the size (in bytes) of write buffers, used by bolt's network layer, " + - "grows beyond this value bolt channel will advertise itself as unwritable and bolt worker " + - "threads will block until it becomes writable again." ) + "grows beyond this value bolt channel will advertise itself as unwritable and will block " + + "related processing thread until it becomes writable again." ) @Internal public static final Setting bolt_write_buffer_high_water_mark = buildSetting( "unsupported.dbms.bolt.write_throttle.high_watermark", INTEGER, String.valueOf( ByteUnit.kibiBytes( 512 ) ) ).constraint( range( (int) ByteUnit.kibiBytes( 64 ), Integer.MAX_VALUE ) ).build(); @Description( "When the size (in bytes) of write buffers, previously advertised as unwritable, " + - "gets below this value bolt channel will re-advertise itself as writable and blocked bolt worker " + "threads will resume execution." ) + "gets below this value bolt channel will re-advertise itself as writable and blocked processing " + + "thread will resume execution." ) @Internal public static final Setting bolt_write_buffer_low_water_mark = buildSetting( "unsupported.dbms.bolt.write_throttle.low_watermark", INTEGER, String.valueOf( ByteUnit.kibiBytes( 128 ) ) ).constraint( diff --git a/community/kernel/src/main/java/org/neo4j/kernel/configuration/BoltConnector.java b/community/kernel/src/main/java/org/neo4j/kernel/configuration/BoltConnector.java index 8b697093b03d8..7c5397893161e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/configuration/BoltConnector.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/configuration/BoltConnector.java @@ -22,6 +22,7 @@ import java.time.Duration; import org.neo4j.configuration.Description; +import org.neo4j.configuration.Internal; import org.neo4j.configuration.ReplacedBy; import org.neo4j.graphdb.config.Setting; import org.neo4j.helpers.AdvertisedSocketAddress; @@ -66,7 +67,8 @@ public class BoltConnector extends Connector public final Setting thread_pool_keep_alive; @Description( "The queue size of the thread pool bound to this connector (-1 for unbounded, 0 for direct handoff, > 0 for bounded)" ) - public final Setting thread_pool_queue_size; + @Internal + public final Setting unsupported_thread_pool_queue_size; // Used by config doc generator public BoltConnector() @@ -89,7 +91,7 @@ public BoltConnector( String key ) this.thread_pool_core_size = group.scope( setting( "thread_pool_core_size", INTEGER, String.valueOf( 10 ) ) ); this.thread_pool_max_size = group.scope( setting( "thread_pool_max_size", INTEGER, String.valueOf( 400 ) ) ); this.thread_pool_keep_alive = group.scope( setting( "thread_pool_keep_alive", DURATION, "5m" ) ); - this.thread_pool_queue_size = group.scope( setting( "thread_pool_queue_size", INTEGER, String.valueOf( 0 ) ) ); + this.unsupported_thread_pool_queue_size = group.scope( setting( "unsupported_thread_pool_queue_size", INTEGER, String.valueOf( 0 ) ) ); } public enum EncryptionLevel