From 5b2b0b9645a6bd0ca08b9ab8b05bbe9c283d5bf7 Mon Sep 17 00:00:00 2001 From: Davide Grohmann Date: Fri, 19 May 2017 09:06:47 +0200 Subject: [PATCH] Remove NEW_THREAD scheduling strategy from Noe4jJobScheduler Since the best way to create a new thread by using the scheduler is to use the thread factory, this changes will make more straightforward how to create new thread using the scheduler. And also note that the NEW_THREAD strategy was unsupported in several schedule methods. --- .../org/neo4j/scheduler/JobScheduler.java | 60 +++++--------- .../kernel/impl/util/Neo4jJobScheduler.java | 55 +------------ .../impl/util/Neo4jJobSchedulerTest.java | 81 ++----------------- .../core/consensus/ContinuousJob.java | 32 ++++---- .../log/segmented/SegmentedRaftLog.java | 5 +- .../membership/MembershipWaiter.java | 7 +- .../DelayedRenewableTimeoutService.java | 5 +- .../core/server/CoreServerModule.java | 8 +- .../helper/RobustJobSchedulerWrapper.java | 7 +- .../core/consensus/ContinuousJobTest.java | 14 ++-- 10 files changed, 67 insertions(+), 207 deletions(-) diff --git a/community/common/src/main/java/org/neo4j/scheduler/JobScheduler.java b/community/common/src/main/java/org/neo4j/scheduler/JobScheduler.java index d99b5ac1fcc7d..11e41ff16c94a 100644 --- a/community/common/src/main/java/org/neo4j/scheduler/JobScheduler.java +++ b/community/common/src/main/java/org/neo4j/scheduler/JobScheduler.java @@ -29,22 +29,11 @@ import org.neo4j.kernel.lifecycle.Lifecycle; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.NEW_THREAD; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.POOLED; - /** * To be expanded, the idea here is to have a database-global service for running jobs, handling jobs crashing and so on. */ public interface JobScheduler extends Lifecycle { - enum SchedulingStrategy - { - /** Create a new thread each time a job is scheduled */ - NEW_THREAD, - /** Run the job from a pool of threads, shared among all groups with this strategy */ - POOLED - } - /** * Represents a common group of jobs, defining how they should be scheduled. */ @@ -53,14 +42,12 @@ class Group public static final String THREAD_ID = "thread-id"; public static final Map NO_METADATA = Collections.emptyMap(); + private final AtomicInteger threadCounter = new AtomicInteger(); private final String name; - private final SchedulingStrategy strategy; - private final AtomicInteger threadCounter = new AtomicInteger( 0 ); - public Group( String name, SchedulingStrategy strategy ) + public Group( String name ) { this.name = name; - this.strategy = strategy; } public String name() @@ -68,11 +55,6 @@ public String name() return name; } - public SchedulingStrategy strategy() - { - return strategy; - } - /** * Name a new thread. This method may or may not be used, it is up to the scheduling strategy to decide * to honor this. @@ -98,89 +80,89 @@ public String threadName( Map metadata ) class Groups { /** Session workers, these perform the work of actually executing client queries. */ - public static final Group sessionWorker = new Group( "Session", NEW_THREAD ); + public static final Group sessionWorker = new Group( "Session" ); /** Background index population */ - public static final Group indexPopulation = new Group( "IndexPopulation", POOLED ); + public static final Group indexPopulation = new Group( "IndexPopulation" ); /** Push transactions from master to slaves */ - public static final Group masterTransactionPushing = new Group( "TransactionPushing", POOLED ); + public static final Group masterTransactionPushing = new Group( "TransactionPushing" ); /** * Rolls back idle transactions on the server. */ - public static final Group serverTransactionTimeout = new Group( "ServerTransactionTimeout", POOLED ); + public static final Group serverTransactionTimeout = new Group( "ServerTransactionTimeout" ); /** * Aborts idle slave lock sessions on the master. */ - public static final Group slaveLocksTimeout = new Group( "SlaveLocksTimeout", POOLED ); + public static final Group slaveLocksTimeout = new Group( "SlaveLocksTimeout" ); /** * Pulls updates from the master. */ - public static final Group pullUpdates = new Group( "PullUpdates", POOLED ); + public static final Group pullUpdates = new Group( "PullUpdates" ); /** * Gathers approximated data about the underlying data store. */ - public static final Group indexSamplingController = new Group( "IndexSamplingController", POOLED ); - public static final Group indexSampling = new Group( "IndexSampling", POOLED ); + public static final Group indexSamplingController = new Group( "IndexSamplingController" ); + public static final Group indexSampling = new Group( "IndexSampling" ); /** * Rotates internal diagnostic logs */ - public static final Group internalLogRotation = new Group( "InternalLogRotation", POOLED ); + public static final Group internalLogRotation = new Group( "InternalLogRotation" ); /** * Rotates query logs */ - public static final Group queryLogRotation = new Group( "queryLogRotation", POOLED ); + public static final Group queryLogRotation = new Group( "queryLogRotation" ); /** * Checkpoint and store flush */ - public static final Group checkPoint = new Group( "CheckPoint", POOLED ); + public static final Group checkPoint = new Group( "CheckPoint" ); /** * Raft Log pruning */ - public static final Group raftLogPruning = new Group( "RaftLogPruning", POOLED ); + public static final Group raftLogPruning = new Group( "RaftLogPruning" ); /** * Network IO threads for the Bolt protocol. */ - public static final Group boltNetworkIO = new Group( "BoltNetworkIO", NEW_THREAD ); + public static final Group boltNetworkIO = new Group( "BoltNetworkIO" ); /** * Reporting thread for Metrics events */ - public static final Group metricsEvent = new Group( "MetricsEvent", POOLED ); + public static final Group metricsEvent = new Group( "MetricsEvent" ); /** * UDC timed events. */ - public static Group udc = new Group( "UsageDataCollection", POOLED ); + public static Group udc = new Group( "UsageDataCollection" ); /** * Storage maintenance. */ - public static Group storageMaintenance = new Group( "StorageMaintenance", POOLED ); + public static Group storageMaintenance = new Group( "StorageMaintenance" ); /** * Native security. */ - public static Group nativeSecurity = new Group( "NativeSecurity", POOLED ); + public static Group nativeSecurity = new Group( "NativeSecurity" ); /** * File watch service group */ - public static Group fileWatch = new Group( "FileWatcher", NEW_THREAD ); + public static Group fileWatch = new Group( "FileWatcher" ); /** * Recovery cleanup. */ - public static Group recoveryCleanup = new Group( "RecoveryCleanup", POOLED ); + public static Group recoveryCleanup = new Group( "RecoveryCleanup" ); private Groups() { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/Neo4jJobScheduler.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/Neo4jJobScheduler.java index ff59759056813..6c8e7b11cf83c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/Neo4jJobScheduler.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/Neo4jJobScheduler.java @@ -86,17 +86,7 @@ public JobHandle schedule( Group group, Runnable job, Map metadat throw new RejectedExecutionException( "Scheduler is not started" ); } - switch ( group.strategy() ) - { - case POOLED: - return register( new PooledJobHandle( this.globalPool.submit( job ) ) ); - case NEW_THREAD: - Thread thread = createNewThread( group, job, metadata ); - thread.start(); - return new SingleThreadHandle( thread ); - default: - throw new IllegalArgumentException( "Unsupported strategy for scheduling job: " + group.strategy() ); - } + return register( new PooledJobHandle( this.globalPool.submit( job ) ) ); } private JobHandle register( PooledJobHandle pooledJobHandle ) @@ -136,27 +126,15 @@ public JobHandle scheduleRecurring( Group group, final Runnable runnable, long p @Override public JobHandle scheduleRecurring( Group group, final Runnable runnable, long initialDelay, long period, - TimeUnit timeUnit ) + TimeUnit timeUnit ) { - switch ( group.strategy() ) - { - case POOLED: - return new PooledJobHandle( scheduledExecutor.scheduleAtFixedRate( runnable, initialDelay, period, timeUnit ) ); - default: - throw new IllegalArgumentException( "Unsupported strategy to use for recurring jobs: " + group.strategy() ); - } + return new PooledJobHandle( scheduledExecutor.scheduleAtFixedRate( runnable, initialDelay, period, timeUnit ) ); } @Override public JobHandle schedule( Group group, final Runnable runnable, long initialDelay, TimeUnit timeUnit ) { - switch ( group.strategy() ) - { - case POOLED: - return new PooledJobHandle( scheduledExecutor.schedule( runnable, initialDelay, timeUnit ) ); - default: - throw new IllegalArgumentException( "Unsupported strategy to use for delayed jobs: " + group.strategy() ); - } + return new PooledJobHandle( scheduledExecutor.schedule( runnable, initialDelay, timeUnit ) ); } @Override @@ -267,29 +245,4 @@ public void registerCancelListener( CancelListener listener ) cancelListeners.add( listener ); } } - - private static class SingleThreadHandle implements JobHandle - { - private final Thread thread; - - SingleThreadHandle( Thread thread ) - { - this.thread = thread; - } - - @Override - public void cancel( boolean mayInterruptIfRunning ) - { - if ( mayInterruptIfRunning ) - { - thread.interrupt(); - } - } - - @Override - public void waitTermination() throws InterruptedException - { - thread.join(); - } - } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/util/Neo4jJobSchedulerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/util/Neo4jJobSchedulerTest.java index 7045e93c5d724..8fee03ded7dde 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/util/Neo4jJobSchedulerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/util/Neo4jJobSchedulerTest.java @@ -20,11 +20,8 @@ package org.neo4j.kernel.impl.util; import org.junit.After; -import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -34,9 +31,10 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; +import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.JobScheduler.JobHandle; -import org.neo4j.kernel.lifecycle.LifeSupport; + import static java.lang.Thread.sleep; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; @@ -46,11 +44,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.neo4j.helpers.Exceptions.launderedException; -import static org.neo4j.helpers.collection.MapUtil.stringMap; -import static org.neo4j.scheduler.JobScheduler.Group.THREAD_ID; import static org.neo4j.scheduler.JobScheduler.Groups.indexPopulation; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.NEW_THREAD; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.POOLED; import static org.neo4j.test.ReflectionUtil.replaceValueInPrivateField; public class Neo4jJobSchedulerTest @@ -119,41 +113,6 @@ public void shouldCancelRecurringJob() throws Exception assertThat( invocations.get(), equalTo( recorded ) ); } - @Test - public void shouldRunJobInNewThread() throws Throwable - { - // Given - life.start(); - - // We start a thread that will signal when it's running, and remain running until we tell it to stop. - // This way we can check and make sure a thread with the name we expect is live and well - final CountDownLatch threadStarted = new CountDownLatch( 1 ); - final CountDownLatch unblockThread = new CountDownLatch( 1 ); - - // When - scheduler.schedule( new JobScheduler.Group( "MyGroup", NEW_THREAD ), - waitForLatch( threadStarted, unblockThread ), stringMap( THREAD_ID, "MyTestThread" ) ); - threadStarted.await(); - - // Then - try - { - String threadName = "neo4j.MyGroup-MyTestThread"; - for ( String name : threadNames() ) - { - if ( name.equals( threadName ) ) - { - return; - } - } - Assert.fail( "Expected a thread named '" + threadName + "' in " + threadNames() ); - } - finally - { - unblockThread.countDown(); - } - } - @Test public void shouldRunWithDelay() throws Throwable { @@ -165,14 +124,10 @@ public void shouldRunWithDelay() throws Throwable long time = System.nanoTime(); - scheduler.schedule( new JobScheduler.Group( "group", POOLED ), new Runnable() + scheduler.schedule( new JobScheduler.Group( "group" ), () -> { - @Override - public void run() - { - runTime.set( System.nanoTime() ); - latch.countDown(); - } + runTime.set( System.nanoTime() ); + latch.countDown(); }, 100, TimeUnit.MILLISECONDS ); latch.await(); @@ -236,32 +191,6 @@ public void shouldNotifyCancelListeners() throws Exception neo4jJobScheduler.shutdown(); } - private List threadNames() - { - List names = new ArrayList<>(); - for ( Thread thread : Thread.getAllStackTraces().keySet() ) - { - names.add( thread.getName() ); - } - return names; - } - - private Runnable waitForLatch( final CountDownLatch threadStarted, final CountDownLatch runUntil ) - { - return () -> - { - try - { - threadStarted.countDown(); - runUntil.await(); - } - catch ( InterruptedException e ) - { - throw new RuntimeException( e ); - } - }; - } - private void awaitFirstInvocation() { while ( invocations.get() == 0 ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ContinuousJob.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ContinuousJob.java index cf1bb45cdd702..12ca8cfe290c0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ContinuousJob.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ContinuousJob.java @@ -19,7 +19,8 @@ */ package org.neo4j.causalclustering.core.consensus; -import org.neo4j.scheduler.JobScheduler; +import java.util.concurrent.ThreadFactory; + import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; @@ -30,19 +31,14 @@ */ public class ContinuousJob extends LifecycleAdapter { - private final AbortableJob abortableJob = new AbortableJob(); - private final JobScheduler scheduler; - private final JobScheduler.Group group; - private final Runnable task; + private final AbortableJob abortableJob; private final Log log; + private final Thread thread; - private JobScheduler.JobHandle jobHandle; - - public ContinuousJob( JobScheduler scheduler, JobScheduler.Group group, Runnable task, LogProvider logProvider ) + public ContinuousJob( ThreadFactory threadFactory, Runnable task, LogProvider logProvider ) { - this.scheduler = scheduler; - this.group = group; - this.task = task; + this.abortableJob = new AbortableJob( task ); + this.thread = threadFactory.newThread( abortableJob ); this.log = logProvider.getLog( getClass() ); } @@ -50,21 +46,27 @@ public ContinuousJob( JobScheduler scheduler, JobScheduler.Group group, Runnable public void start() throws Throwable { abortableJob.keepRunning = true; - jobHandle = scheduler.schedule( group, abortableJob ); + thread.start(); } @Override public void stop() throws Throwable { - log.info( "ContinuousJob " + group.name() + " stopping" ); + log.info( "ContinuousJob " + thread.getName() + " stopping" ); abortableJob.keepRunning = false; - jobHandle.waitTermination(); + thread.join(); } - private class AbortableJob implements Runnable + private static class AbortableJob implements Runnable { + private final Runnable task; private volatile boolean keepRunning; + AbortableJob( Runnable task ) + { + this.task = task; + } + @Override public void run() { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLog.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLog.java index 54ecc1f9c0fd3..a443cda68c804 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLog.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLog.java @@ -31,13 +31,12 @@ import org.neo4j.causalclustering.messaging.marshalling.ChannelMarshal; import org.neo4j.cursor.IOCursor; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.scheduler.JobScheduler; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import org.neo4j.scheduler.JobScheduler; import static java.util.concurrent.TimeUnit.MINUTES; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.POOLED; /** * The segmented RAFT log is an append only log supporting the operations required to support @@ -116,7 +115,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException, rotateSegment( state.appendIndex, state.appendIndex, state.terms.latest() ); } - readerPoolPruner = scheduler.scheduleRecurring( new JobScheduler.Group( "reader-pool-pruner", POOLED ), + readerPoolPruner = scheduler.scheduleRecurring( new JobScheduler.Group( "reader-pool-pruner" ), () -> readerPool.prune( READER_POOL_MAX_AGE, MINUTES ), READER_POOL_MAX_AGE, READER_POOL_MAX_AGE, MINUTES ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java index c8e7cec326f6a..056fb59034afc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java @@ -26,14 +26,13 @@ import org.neo4j.causalclustering.core.consensus.RaftMachine; import org.neo4j.causalclustering.core.consensus.state.ExposedRaftState; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.scheduler.JobScheduler; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import org.neo4j.scheduler.JobScheduler; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.POOLED; /** * Waits until member has "fully joined" the raft membership. @@ -77,7 +76,7 @@ CompletableFuture waitUntilCaughtUpMember( RaftMachine raft ) Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier ); JobScheduler.JobHandle jobHandle = jobScheduler.schedule( - new JobScheduler.Group( getClass().toString(), POOLED ), + new JobScheduler.Group( getClass().toString() ), evaluator, currentCatchupDelayInMs, MILLISECONDS ); catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) ); @@ -117,7 +116,7 @@ else if ( iAmAVotingMember() && caughtUpWithLeader() ) { currentCatchupDelayInMs += SECONDS.toMillis( 1 ); long longerDelay = currentCatchupDelayInMs < maxCatchupLag ? currentCatchupDelayInMs : maxCatchupLag; - jobScheduler.schedule( new JobScheduler.Group( MembershipWaiter.class.toString(), POOLED ), this, + jobScheduler.schedule( new JobScheduler.Group( MembershipWaiter.class.toString() ), this, longerDelay, MILLISECONDS ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/schedule/DelayedRenewableTimeoutService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/schedule/DelayedRenewableTimeoutService.java index e1b5bdf98d84e..39336b895cedf 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/schedule/DelayedRenewableTimeoutService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/schedule/DelayedRenewableTimeoutService.java @@ -30,14 +30,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.neo4j.scheduler.JobScheduler; import org.neo4j.kernel.impl.util.Neo4jJobScheduler; import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; +import org.neo4j.scheduler.JobScheduler; import static java.lang.System.nanoTime; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.POOLED; /** * A bare bones, wall clock based implementation of the {@link RenewableTimeoutService}. It uses a scheduled thread @@ -174,7 +173,7 @@ public void init() @Override public void start() { - jobHandle = scheduler.scheduleRecurring( new JobScheduler.Group( "Scheduler", POOLED ), this, TIMER_RESOLUTION, + jobHandle = scheduler.scheduleRecurring( new JobScheduler.Group( "Scheduler" ), this, TIMER_RESOLUTION, TIMER_RESOLUTION_UNIT ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index 829b193cef299..52b3fae48a539 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -69,15 +69,13 @@ import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.state.DataSourceManager; import org.neo4j.kernel.impl.util.Dependencies; -import org.neo4j.scheduler.JobScheduler; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; +import org.neo4j.scheduler.JobScheduler; import org.neo4j.time.Clocks; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.NEW_THREAD; - public class CoreServerModule { public static final String CLUSTER_ID_NAME = "cluster-id"; @@ -222,8 +220,8 @@ private OnlineBackupKernelExtension pickBackupExtension( NeoStoreDataSource data // batches messages from raft server -> core state // core state will drop messages if not ready life.add( batchingMessageHandler ); - life.add( new ContinuousJob( jobScheduler, new JobScheduler.Group( "raft-batch-handler", NEW_THREAD ), - batchingMessageHandler, logProvider ) ); + final JobScheduler.Group group = new JobScheduler.Group( "raft-batch-handler" ); + life.add( new ContinuousJob( jobScheduler.threadFactory( group ), batchingMessageHandler, logProvider ) ); life.add( raftServer ); // must start before core state so that it can trigger snapshot downloads when necessary life.add( coreLife ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapper.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapper.java index d6c0e9a73c05b..d918afc571021 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapper.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapper.java @@ -22,11 +22,10 @@ import java.util.concurrent.CancellationException; import org.neo4j.function.ThrowingAction; -import org.neo4j.scheduler.JobScheduler; import org.neo4j.logging.Log; +import org.neo4j.scheduler.JobScheduler; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.neo4j.scheduler.JobScheduler.SchedulingStrategy.POOLED; /** * A robust job catches and logs any exceptions, but keeps running if the job @@ -48,13 +47,13 @@ public RobustJobSchedulerWrapper( JobScheduler delegate, Log log ) public JobScheduler.JobHandle schedule( String name, long delayMillis, ThrowingAction action ) { - return delegate.schedule( new JobScheduler.Group( name, POOLED ), + return delegate.schedule( new JobScheduler.Group( name ), () -> withErrorHandling( action ), delayMillis, MILLISECONDS ); } public JobScheduler.JobHandle scheduleRecurring( String name, long periodMillis, ThrowingAction action ) { - return delegate.scheduleRecurring( new JobScheduler.Group( name, POOLED ), + return delegate.scheduleRecurring( new JobScheduler.Group( name ), () -> withErrorHandling( action ), periodMillis, MILLISECONDS ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/ContinuousJobTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/ContinuousJobTest.java index 90e4018b3fc19..30973fae69bc2 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/ContinuousJobTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/ContinuousJobTest.java @@ -25,11 +25,10 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.locks.LockSupport; -import org.neo4j.scheduler.JobScheduler.Group; -import org.neo4j.scheduler.JobScheduler.SchedulingStrategy; import org.neo4j.kernel.impl.util.Neo4jJobScheduler; import org.neo4j.kernel.lifecycle.Lifespan; import org.neo4j.logging.NullLogProvider; +import org.neo4j.scheduler.JobScheduler.Group; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.Matchers.lessThan; @@ -39,7 +38,8 @@ public class ContinuousJobTest { private static final long DEFAULT_TIMEOUT_MS = 15_000; - private Group jobGroup = new Group( "test", SchedulingStrategy.NEW_THREAD ); + private final Group jobGroup = new Group( "test" ); + private final Neo4jJobScheduler scheduler = new Neo4jJobScheduler(); @Test public void shouldRunJobContinuously() throws Throwable @@ -48,8 +48,8 @@ public void shouldRunJobContinuously() throws Throwable CountDownLatch latch = new CountDownLatch( 10 ); Runnable task = latch::countDown; - Neo4jJobScheduler scheduler = new Neo4jJobScheduler(); - ContinuousJob continuousJob = new ContinuousJob( scheduler, jobGroup, task, NullLogProvider.getInstance() ); + ContinuousJob continuousJob = + new ContinuousJob( scheduler.threadFactory( jobGroup ), task, NullLogProvider.getInstance() ); // when try ( Lifespan ignored = new Lifespan( scheduler, continuousJob ) ) @@ -71,8 +71,8 @@ public void shouldTerminateOnStop() throws Exception semaphore.release(); }; - Neo4jJobScheduler scheduler = new Neo4jJobScheduler(); - ContinuousJob continuousJob = new ContinuousJob( scheduler, jobGroup, task, NullLogProvider.getInstance() ); + ContinuousJob continuousJob = + new ContinuousJob( scheduler.threadFactory( jobGroup ), task, NullLogProvider.getInstance() ); // when long startTime = System.currentTimeMillis();