Skip to content

Commit

Permalink
Prestart core threads and update config options
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Apr 5, 2018
1 parent c632588 commit b15a70b
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 24 deletions.
Expand Up @@ -54,10 +54,15 @@ public CachedThreadPoolExecutorFactory( Log log, RejectedExecutionHandler reject
}

@Override
public ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, ThreadFactory threadFactory )
public ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, boolean startCoreThreads, ThreadFactory threadFactory )
{
ThreadPool result = new ThreadPool( corePoolSize, maxPoolSize, keepAlive, createTaskQueue( queueSize ), threadFactory, rejectionHandler );

if ( startCoreThreads )
{
result.prestartAllCoreThreads();
}

return result;
}

Expand Down
Expand Up @@ -254,7 +254,7 @@ public void handleSchedulingError( Throwable t )
{
error = Neo4jError.from( Status.Request.NoThreadsAvailable, Status.Request.NoThreadsAvailable.code().description() );
message = String.format( "Unable to schedule bolt session '%s' for execution since there are no available threads to " +
"serve it at the moment. You can retry at a later time or consider increasing max pool / queue size for bolt connector(s).", id() );
"serve it at the moment. You can retry at a later time or consider increasing max thread pool size for bolt connector(s).", id() );
}
else
{
Expand Down
Expand Up @@ -83,7 +83,7 @@ public String connector()
@Override
public void start()
{
threadPool = executorFactory.create( corePoolSize, maxPoolSize, keepAlive, queueSize,
threadPool = executorFactory.create( corePoolSize, maxPoolSize, keepAlive, queueSize, true,
new NameAppendingThreadFactory( connector, scheduler.threadFactory( JobScheduler.Groups.boltWorker ) ) );
}

Expand Down
Expand Up @@ -60,7 +60,7 @@ public void start()
BoltScheduler boltScheduler =
new ExecutorBoltScheduler( connector.key(), executorFactory, scheduler, logService, config.get( connector.thread_pool_core_size ),
config.get( connector.thread_pool_max_size ), config.get( connector.thread_pool_keep_alive ),
config.get( connector.thread_pool_queue_size ), forkJoinThreadPool );
config.get( connector.unsupported_thread_pool_queue_size ), forkJoinThreadPool );
boltScheduler.start();
boltSchedulers.put( connector.key(), boltScheduler );
} );
Expand Down
Expand Up @@ -26,6 +26,6 @@
public interface ExecutorFactory
{

ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, ThreadFactory threadFactory );
ExecutorService create( int corePoolSize, int maxPoolSize, Duration keepAlive, int queueSize, boolean startCoreThreads, ThreadFactory threadFactory );

}
Expand Up @@ -55,6 +55,7 @@
import static org.junit.Assert.fail;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.neo4j.bolt.runtime.CachedThreadPoolExecutorFactory.SYNCHRONOUS_QUEUE;
import static org.neo4j.bolt.runtime.CachedThreadPoolExecutorFactory.UNBOUNDED_QUEUE;

Expand Down Expand Up @@ -91,7 +92,7 @@ public void cleanup()
@Test
public void createShouldAssignCorrectQueue()
{
executorService = factory.create( 0, 1, Duration.ZERO, queueSize, newThreadFactory() );
executorService = factory.create( 0, 1, Duration.ZERO, queueSize, false, newThreadFactory() );

if ( executorService instanceof ThreadPoolExecutor )
{
Expand Down Expand Up @@ -119,7 +120,7 @@ public void createShouldAssignCorrectQueue()
@Test
public void createShouldCreateExecutor()
{
executorService = factory.create( 0, 1, Duration.ZERO, queueSize, newThreadFactory() );
executorService = factory.create( 0, 1, Duration.ZERO, queueSize, false, newThreadFactory() );

assertNotNull( executorService );
assertFalse( executorService.isShutdown() );
Expand All @@ -131,7 +132,7 @@ public void createShouldNotCreateExecutorWhenCorePoolSizeIsNegative()
{
try
{
factory.create( -1, 10, Duration.ZERO, 0, newThreadFactory() );
factory.create( -1, 10, Duration.ZERO, 0, false, newThreadFactory() );
fail( "should throw exception" );
}
catch ( IllegalArgumentException ex )
Expand All @@ -145,7 +146,7 @@ public void createShouldNotCreateExecutorWhenMaxPoolSizeIsNegative()
{
try
{
factory.create( 0, -1, Duration.ZERO, 0, newThreadFactory() );
factory.create( 0, -1, Duration.ZERO, 0, false, newThreadFactory() );
fail( "should throw exception" );
}
catch ( IllegalArgumentException ex )
Expand All @@ -159,7 +160,7 @@ public void createShouldNotCreateExecutorWhenMaxPoolSizeIsZero()
{
try
{
factory.create( 0, 0, Duration.ZERO, 0, newThreadFactory() );
factory.create( 0, 0, Duration.ZERO, 0, false, newThreadFactory() );
fail( "should throw exception" );
}
catch ( IllegalArgumentException ex )
Expand All @@ -168,12 +169,32 @@ public void createShouldNotCreateExecutorWhenMaxPoolSizeIsZero()
}
}

@Test
public void createShouldStartCoreThreadsIfAsked()
{
AtomicInteger threadCounter = new AtomicInteger();

factory.create( 5, 10, Duration.ZERO, 0, true, newThreadFactoryWithCounter( threadCounter ) );

assertEquals( 5, threadCounter.get() );
}

@Test
public void createShouldNotStartCoreThreadsIfNotAsked()
{
AtomicInteger threadCounter = new AtomicInteger();

factory.create( 5, 10, Duration.ZERO, 0, false, newThreadFactoryWithCounter( threadCounter ) );

assertEquals( 0, threadCounter.get() );
}

@Test
public void createShouldNotCreateExecutorWhenMaxPoolSizeIsLessThanCorePoolSize()
{
try
{
factory.create( 10, 5, Duration.ZERO, 0, newThreadFactory() );
factory.create( 10, 5, Duration.ZERO, 0, false, newThreadFactory() );
fail( "should throw exception" );
}
catch ( IllegalArgumentException ex )
Expand All @@ -188,7 +209,7 @@ public void createdExecutorShouldExecuteSubmittedTasks() throws Exception
AtomicBoolean exitCondition = new AtomicBoolean( false );
AtomicInteger threadCounter = new AtomicInteger( 0 );

executorService = factory.create( 0, 1, Duration.ZERO, 0, newThreadFactoryWithCounter( threadCounter ) );
executorService = factory.create( 0, 1, Duration.ZERO, 0, false, newThreadFactoryWithCounter( threadCounter ) );

assertNotNull( executorService );
assertEquals( 0, threadCounter.get() );
Expand All @@ -209,7 +230,7 @@ public void createdExecutorShouldFavorPoolSizes()
AtomicBoolean exitCondition = new AtomicBoolean( false );
AtomicInteger threadCounter = new AtomicInteger( 0 );

executorService = factory.create( 0, 5, Duration.ZERO, 0, newThreadFactoryWithCounter( threadCounter ) );
executorService = factory.create( 0, 5, Duration.ZERO, 0, false, newThreadFactoryWithCounter( threadCounter ) );

assertNotNull( executorService );
assertEquals( 0, threadCounter.get() );
Expand Down
Expand Up @@ -56,6 +56,7 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -93,23 +94,23 @@ public void cleanup() throws Throwable
public void initShouldCreateThreadPool() throws Throwable
{
ExecutorFactory mockExecutorFactory = mock( ExecutorFactory.class );
when( mockExecutorFactory.create( anyInt(), anyInt(), any(), anyInt(), any() ) ).thenReturn( Executors.newCachedThreadPool() );
when( mockExecutorFactory.create( anyInt(), anyInt(), any(), anyInt(), anyBoolean(), any() ) ).thenReturn( Executors.newCachedThreadPool() );
ExecutorBoltScheduler scheduler =
new ExecutorBoltScheduler( CONNECTOR_KEY, mockExecutorFactory, jobScheduler, logService, 0, 10, Duration.ofMinutes( 1 ), 0,
ForkJoinPool.commonPool() );

scheduler.start();

verify( jobScheduler ).threadFactory( JobScheduler.Groups.boltWorker );
verify( mockExecutorFactory, times( 1 ) ).create( anyInt(), anyInt(), any( Duration.class ), anyInt(), any( ThreadFactory.class ) );
verify( mockExecutorFactory, times( 1 ) ).create( anyInt(), anyInt(), any( Duration.class ), anyInt(), anyBoolean(), any( ThreadFactory.class ) );
}

@Test
public void shutdownShouldTerminateThreadPool() throws Throwable
{
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorFactory mockExecutorFactory = mock( ExecutorFactory.class );
when( mockExecutorFactory.create( anyInt(), anyInt(), any(), anyInt(), any() ) ).thenReturn( cachedThreadPool );
when( mockExecutorFactory.create( anyInt(), anyInt(), any(), anyInt(), anyBoolean(), any() ) ).thenReturn( cachedThreadPool );
ExecutorBoltScheduler scheduler =
new ExecutorBoltScheduler( CONNECTOR_KEY, mockExecutorFactory, jobScheduler, logService, 0, 10, Duration.ofMinutes( 1 ), 0,
ForkJoinPool.commonPool() );
Expand Down Expand Up @@ -234,7 +235,8 @@ public void destroyedShouldCancelActiveWorkItem() throws Throwable
String id = UUID.randomUUID().toString();
BoltConnection connection = newConnection( id );
AtomicBoolean exitCondition = new AtomicBoolean();
when( connection.processNextBatch() ).thenAnswer( inv -> {
when( connection.processNextBatch() ).thenAnswer( inv ->
{
processNextBatchCount.incrementAndGet();
return awaitExit( exitCondition );
} );
Expand Down Expand Up @@ -324,5 +326,4 @@ private static boolean awaitExit( AtomicBoolean exitCondition )
Predicates.awaitForever( () -> Thread.currentThread().isInterrupted() || exitCondition.get(), 500, MILLISECONDS );
return true;
}

}
Expand Up @@ -108,7 +108,7 @@ enum Request implements Status
"The client made a request but did not consume outgoing buffers in a timely fashion." ),
NoThreadsAvailable( TransientError, // TODO: see above
"There are no available threads to serve this request at the moment. You can retry at a later time " +
"or consider increasing max pool / queue size for bolt connector(s)." );
"or consider increasing max thread pool size for bolt connector(s)." );
private final Code code;

@Override
Expand Down
Expand Up @@ -836,15 +836,16 @@ public enum LabelIndex
public static final Setting<Boolean> bolt_write_throttle = setting( "unsupported.dbms.bolt.write_throttle", BOOLEAN, TRUE );

@Description( "When the size (in bytes) of write buffers, used by bolt's network layer, " +
"grows beyond this value bolt channel will advertise itself as unwritable and bolt worker " +
"threads will block until it becomes writable again." )
"grows beyond this value bolt channel will advertise itself as unwritable and will block " +
"related processing thread until it becomes writable again." )
@Internal
public static final Setting<Integer> bolt_write_buffer_high_water_mark =
buildSetting( "unsupported.dbms.bolt.write_throttle.high_watermark", INTEGER, String.valueOf( ByteUnit.kibiBytes( 512 ) ) ).constraint(
range( (int) ByteUnit.kibiBytes( 64 ), Integer.MAX_VALUE ) ).build();

@Description( "When the size (in bytes) of write buffers, previously advertised as unwritable, " +
"gets below this value bolt channel will re-advertise itself as writable and blocked bolt worker " + "threads will resume execution." )
"gets below this value bolt channel will re-advertise itself as writable and blocked processing " +
"thread will resume execution." )
@Internal
public static final Setting<Integer> bolt_write_buffer_low_water_mark =
buildSetting( "unsupported.dbms.bolt.write_throttle.low_watermark", INTEGER, String.valueOf( ByteUnit.kibiBytes( 128 ) ) ).constraint(
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;

import org.neo4j.configuration.Description;
import org.neo4j.configuration.Internal;
import org.neo4j.configuration.ReplacedBy;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.helpers.AdvertisedSocketAddress;
Expand Down Expand Up @@ -66,7 +67,8 @@ public class BoltConnector extends Connector
public final Setting<Duration> thread_pool_keep_alive;

@Description( "The queue size of the thread pool bound to this connector (-1 for unbounded, 0 for direct handoff, > 0 for bounded)" )
public final Setting<Integer> thread_pool_queue_size;
@Internal
public final Setting<Integer> unsupported_thread_pool_queue_size;

// Used by config doc generator
public BoltConnector()
Expand All @@ -89,7 +91,7 @@ public BoltConnector( String key )
this.thread_pool_core_size = group.scope( setting( "thread_pool_core_size", INTEGER, String.valueOf( 10 ) ) );
this.thread_pool_max_size = group.scope( setting( "thread_pool_max_size", INTEGER, String.valueOf( 400 ) ) );
this.thread_pool_keep_alive = group.scope( setting( "thread_pool_keep_alive", DURATION, "5m" ) );
this.thread_pool_queue_size = group.scope( setting( "thread_pool_queue_size", INTEGER, String.valueOf( 0 ) ) );
this.unsupported_thread_pool_queue_size = group.scope( setting( "unsupported_thread_pool_queue_size", INTEGER, String.valueOf( 0 ) ) );
}

public enum EncryptionLevel
Expand Down

0 comments on commit b15a70b

Please sign in to comment.