diff --git a/community/common/src/main/java/org/neo4j/scheduler/JobScheduler.java b/community/common/src/main/java/org/neo4j/scheduler/JobScheduler.java index 49df96e64c599..c8a8a2eed9828 100644 --- a/community/common/src/main/java/org/neo4j/scheduler/JobScheduler.java +++ b/community/common/src/main/java/org/neo4j/scheduler/JobScheduler.java @@ -45,7 +45,7 @@ final class Group private final AtomicInteger threadCounter = new AtomicInteger(); private final String name; - public Group( String name ) + private Group( String name ) { Objects.requireNonNull( name, "Group name cannot be null." ); this.name = name; @@ -98,6 +98,12 @@ public int hashCode() */ class Groups { + /** + * This group is used by the JobScheduler implementation itself, for the thread or threads that are in charge of the timely execution of delayed or + * recurring tasks. + */ + public static final Group taskScheduler = new Group( "Scheduler" ); + /** Background index population */ public static final Group indexPopulation = new Group( "IndexPopulation" ); @@ -122,8 +128,8 @@ class Groups /** * Gathers approximated data about the underlying data store. */ - public static final Group indexSamplingController = new Group( "IndexSamplingController" ); public static final Group indexSampling = new Group( "IndexSampling" ); + public static final Group indexSamplingController = indexSampling; /** * Rotates internal diagnostic logs @@ -133,17 +139,17 @@ class Groups /** * Rotates query logs */ - public static final Group queryLogRotation = new Group( "queryLogRotation" ); + public static final Group queryLogRotation = internalLogRotation; /** * Rotates bolt message logs */ - public static final Group boltLogRotation = new Group( "BoltLogRotation" ); + public static final Group boltLogRotation = internalLogRotation; /** * Rotates metrics csv files */ - public static final Group metricsLogRotations = new Group( "MetricsLogRotations" ); + public static final Group metricsLogRotations = internalLogRotation; /** * Checkpoint and store flush @@ -155,6 +161,17 @@ class Groups */ public static final Group raftLogPruning = new Group( "RaftLogPruning" ); + /** + * Raft timers. + */ + public static final Group raft = new Group( "RaftTimer" ); + public static final Group raftBatchHandler = new Group( "RaftBatchHandler" ); + public static final Group raftReaderPoolPruner = new Group( "RaftReaderPoolPruner" ); + public static final Group topologyHealth = new Group( "HazelcastHealth" ); + public static final Group topologyKeepAlive = new Group( "KeepAlive" ); + public static final Group topologyRefresh = new Group( "TopologyRefresh" ); + public static final Group membershipWaiter = new Group( "MembershipWaiter" ); + /** * Network IO threads for the Bolt protocol. */ @@ -168,7 +185,7 @@ class Groups /** * Snapshot downloader */ - public static final Group downloadSnapshot = new JobScheduler.Group( "DownloadSnapshot" ); + public static final Group downloadSnapshot = new Group( "DownloadSnapshot" ); /** * UDC timed events. @@ -180,11 +197,6 @@ class Groups */ public static final Group storageMaintenance = new Group( "StorageMaintenance" ); - /** - * Raft timers. - */ - public static final Group raft = new Group( "RaftTimer" ); - /** * Native security. */ @@ -223,7 +235,7 @@ class Groups /** * Bolt scheduler worker */ - public static Group boltWorker = new Group( "BoltWorker" ); + public static final Group boltWorker = new Group( "BoltWorker" ); private Groups() { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/CentralJobScheduler.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/CentralJobScheduler.java index affa4c8d4fc2f..0466959824c9f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/CentralJobScheduler.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/scheduler/CentralJobScheduler.java @@ -37,7 +37,6 @@ public class CentralJobScheduler extends LifecycleAdapter implements JobScheduler { private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(); - private static final Group SCHEDULER_GROUP = new Group( "Scheduler" ); private final TimeBasedTaskScheduler scheduler; private final Thread schedulerThread; @@ -73,7 +72,7 @@ public CentralJobScheduler() workStealingExecutors = new ConcurrentHashMap<>( 1 ); topLevelGroup = new TopLevelGroup(); pools = new ThreadPoolManager( topLevelGroup ); - ThreadFactory threadFactory = new GroupedDaemonThreadFactory( SCHEDULER_GROUP, topLevelGroup ); + ThreadFactory threadFactory = new GroupedDaemonThreadFactory( Groups.taskScheduler, topLevelGroup ); scheduler = new TimeBasedTaskScheduler( Clocks.nanoClock(), pools ); // The scheduler thread runs at slightly elevated priority for timeliness, and is started in init(). diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/CentralJobSchedulerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/CentralJobSchedulerTest.java index 01d50634df032..b1348ed7c1ad8 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/CentralJobSchedulerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/CentralJobSchedulerTest.java @@ -127,7 +127,7 @@ public void shouldRunWithDelay() throws Throwable long time = System.nanoTime(); - scheduler.schedule( new JobScheduler.Group( "group" ), () -> + scheduler.schedule( indexPopulation, () -> { runTime.set( System.nanoTime() ); latch.countDown(); @@ -144,7 +144,6 @@ public void longRunningScheduledJobsMustNotDelayOtherLongRunningJobs() life.start(); List handles = new ArrayList<>( 30 ); - JobScheduler.Group group = new JobScheduler.Group( "test" ); AtomicLong startedCounter = new AtomicLong(); BinaryLatch blockLatch = new BinaryLatch(); Runnable task = () -> @@ -155,15 +154,15 @@ public void longRunningScheduledJobsMustNotDelayOtherLongRunningJobs() for ( int i = 0; i < 10; i++ ) { - handles.add( scheduler.schedule( group, task, 0, TimeUnit.MILLISECONDS ) ); + handles.add( scheduler.schedule( indexPopulation, task, 0, TimeUnit.MILLISECONDS ) ); } for ( int i = 0; i < 10; i++ ) { - handles.add( scheduler.scheduleRecurring( group, task, Integer.MAX_VALUE, TimeUnit.MILLISECONDS ) ); + handles.add( scheduler.scheduleRecurring( indexPopulation, task, Integer.MAX_VALUE, TimeUnit.MILLISECONDS ) ); } for ( int i = 0; i < 10; i++ ) { - handles.add( scheduler.scheduleRecurring( group, task, 0, Integer.MAX_VALUE, TimeUnit.MILLISECONDS ) ); + handles.add( scheduler.scheduleRecurring( indexPopulation, task, 0, Integer.MAX_VALUE, TimeUnit.MILLISECONDS ) ); } long deadline = TimeUnit.SECONDS.toNanos( 10 ) + System.nanoTime(); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java index 517c181dad213..11c9df96750aa 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/scheduler/TimeBasedTaskSchedulerTest.java @@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.neo4j.scheduler.JobScheduler.Group; import org.neo4j.scheduler.JobScheduler.JobHandle; import org.neo4j.time.FakeClock; import org.neo4j.util.concurrent.BinaryLatch; @@ -40,6 +39,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import static org.neo4j.scheduler.JobScheduler.Groups.taskScheduler; public class TimeBasedTaskSchedulerTest { @@ -48,7 +48,6 @@ public class TimeBasedTaskSchedulerTest private TimeBasedTaskScheduler scheduler; private AtomicInteger counter; private Semaphore semaphore; - private Group group; @Before public void setUp() @@ -58,7 +57,6 @@ public void setUp() scheduler = new TimeBasedTaskScheduler( clock, pools ); counter = new AtomicInteger(); semaphore = new Semaphore( 0 ); - group = new Group( "test" ); } @After @@ -93,7 +91,7 @@ private void assertSemaphoreAcquire() throws InterruptedException @Test public void mustDelayExecution() throws Exception { - JobHandle handle = scheduler.submit( group, counter::incrementAndGet, 100, 0 ); + JobHandle handle = scheduler.submit( taskScheduler, counter::incrementAndGet, 100, 0 ); scheduler.tick(); assertThat( counter.get(), is( 0 ) ); clock.forward( 99, TimeUnit.NANOSECONDS ); @@ -108,8 +106,8 @@ public void mustDelayExecution() throws Exception @Test public void mustOnlyScheduleTasksThatAreDue() throws Exception { - JobHandle handle1 = scheduler.submit( group, () -> counter.addAndGet( 10 ), 100, 0 ); - JobHandle handle2 = scheduler.submit( group, () -> counter.addAndGet( 100 ), 200, 0 ); + JobHandle handle1 = scheduler.submit( taskScheduler, () -> counter.addAndGet( 10 ), 100, 0 ); + JobHandle handle2 = scheduler.submit( taskScheduler, () -> counter.addAndGet( 100 ), 200, 0 ); scheduler.tick(); assertThat( counter.get(), is( 0 ) ); clock.forward( 199, TimeUnit.NANOSECONDS ); @@ -125,7 +123,7 @@ public void mustOnlyScheduleTasksThatAreDue() throws Exception @Test public void mustNotRescheduleDelayedTasks() throws Exception { - JobHandle handle = scheduler.submit( group, counter::incrementAndGet, 100, 0 ); + JobHandle handle = scheduler.submit( taskScheduler, counter::incrementAndGet, 100, 0 ); clock.forward( 100, TimeUnit.NANOSECONDS ); scheduler.tick(); handle.waitTermination(); @@ -133,14 +131,14 @@ public void mustNotRescheduleDelayedTasks() throws Exception clock.forward( 100, TimeUnit.NANOSECONDS ); scheduler.tick(); handle.waitTermination(); - pools.getThreadPool( group ).shutDown(); + pools.getThreadPool( taskScheduler ).shutDown(); assertThat( counter.get(), is( 1 ) ); } @Test public void mustRescheduleRecurringTasks() throws Exception { - scheduler.submit( group, semaphore::release, 100, 100 ); + scheduler.submit( taskScheduler, semaphore::release, 100, 100 ); clock.forward( 100, TimeUnit.NANOSECONDS ); scheduler.tick(); assertSemaphoreAcquire(); @@ -157,7 +155,7 @@ public void mustNotRescheduleRecurringTasksThatThrows() throws Exception semaphore.release(); throw new RuntimeException( "boom" ); }; - JobHandle handle = scheduler.submit( group, runnable, 100, 100 ); + JobHandle handle = scheduler.submit( taskScheduler, runnable, 100, 100 ); clock.forward( 100, TimeUnit.NANOSECONDS ); scheduler.tick(); assertSemaphoreAcquire(); @@ -183,14 +181,14 @@ public void mustNotStartRecurringTasksWherePriorExecutionHasNotYetFinished() counter.incrementAndGet(); semaphore.acquireUninterruptibly(); }; - scheduler.submit( group, runnable, 100, 100 ); + scheduler.submit( taskScheduler, runnable, 100, 100 ); for ( int i = 0; i < 4; i++ ) { scheduler.tick(); clock.forward( 100, TimeUnit.NANOSECONDS ); } semaphore.release( Integer.MAX_VALUE ); - pools.getThreadPool( group ).shutDown(); + pools.getThreadPool( taskScheduler ).shutDown(); assertThat( counter.get(), is( 1 ) ); } @@ -200,8 +198,8 @@ public void longRunningTasksMustNotDelayExecutionOfOtherTasks() throws Exception BinaryLatch latch = new BinaryLatch(); Runnable longRunning = latch::await; Runnable shortRunning = semaphore::release; - scheduler.submit( group, longRunning, 100, 100 ); - scheduler.submit( group, shortRunning, 100, 100 ); + scheduler.submit( taskScheduler, longRunning, 100, 100 ); + scheduler.submit( taskScheduler, shortRunning, 100, 100 ); for ( int i = 0; i < 4; i++ ) { clock.forward( 100, TimeUnit.NANOSECONDS ); @@ -215,14 +213,14 @@ public void longRunningTasksMustNotDelayExecutionOfOtherTasks() throws Exception public void delayedTasksMustNotRunIfCancelledFirst() throws Exception { List cancelListener = new ArrayList<>(); - JobHandle handle = scheduler.submit( group, counter::incrementAndGet, 100, 0 ); + JobHandle handle = scheduler.submit( taskScheduler, counter::incrementAndGet, 100, 0 ); handle.registerCancelListener( cancelListener::add ); clock.forward( 90, TimeUnit.NANOSECONDS ); scheduler.tick(); handle.cancel( false ); clock.forward( 10, TimeUnit.NANOSECONDS ); scheduler.tick(); - pools.getThreadPool( group ).shutDown(); + pools.getThreadPool( taskScheduler ).shutDown(); assertThat( counter.get(), is( 0 ) ); assertThat( cancelListener, contains( Boolean.FALSE ) ); try @@ -245,7 +243,7 @@ public void recurringTasksMustStopWhenCancelled() throws InterruptedException counter.incrementAndGet(); semaphore.release(); }; - JobHandle handle = scheduler.submit( group, recurring, 100, 100 ); + JobHandle handle = scheduler.submit( taskScheduler, recurring, 100, 100 ); handle.registerCancelListener( cancelListener::add ); clock.forward( 100, TimeUnit.NANOSECONDS ); scheduler.tick(); @@ -258,7 +256,7 @@ public void recurringTasksMustStopWhenCancelled() throws InterruptedException scheduler.tick(); clock.forward( 100, TimeUnit.NANOSECONDS ); scheduler.tick(); - pools.getThreadPool( group ).shutDown(); + pools.getThreadPool( taskScheduler ).shutDown(); assertThat( counter.get(), is( 2 ) ); assertThat( cancelListener, contains( Boolean.TRUE ) ); } @@ -271,7 +269,7 @@ public void overdueRecurringTasksMustStartAsSoonAsPossible() counter.incrementAndGet(); semaphore.acquireUninterruptibly(); }; - JobHandle handle = scheduler.submit( group, recurring, 100, 100 ); + JobHandle handle = scheduler.submit( taskScheduler, recurring, 100, 100 ); clock.forward( 100, TimeUnit.NANOSECONDS ); scheduler.tick(); while ( counter.get() < 1 ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java index 571ec1cc9800a..de428392435dc 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/RaftServerModule.java @@ -170,7 +170,7 @@ private LifecycleMessageHandler> createM private ComposableMessageHandler createBatchingHandler( Config config ) { Function jobFactory = runnable -> new ContinuousJob( - platformModule.jobScheduler.threadFactory( new JobScheduler.Group( "raft-batch-handler" ) ), runnable, + platformModule.jobScheduler.threadFactory( JobScheduler.Groups.raftBatchHandler ), runnable, logProvider ); BoundedPriorityQueue.Config inQueueConfig = new BoundedPriorityQueue.Config( config.get( raft_in_queue_size ), diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLog.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLog.java index c8f1dafa45ab6..b2e14d87a4a02 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLog.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/log/segmented/SegmentedRaftLog.java @@ -118,7 +118,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException, rotateSegment( state.appendIndex, state.appendIndex, state.terms.latest() ); } - readerPoolPruner = scheduler.scheduleRecurring( new JobScheduler.Group( "reader-pool-pruner" ), + readerPoolPruner = scheduler.scheduleRecurring( JobScheduler.Groups.raftReaderPoolPruner, () -> readerPool.prune( READER_POOL_MAX_AGE, MINUTES ), READER_POOL_MAX_AGE, READER_POOL_MAX_AGE, MINUTES ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java index 9a8c8c137b4e5..406ac9100b8d4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/membership/MembershipWaiter.java @@ -91,7 +91,7 @@ CompletableFuture waitUntilCaughtUpMember( RaftMachine raft ) Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier ); JobScheduler.JobHandle jobHandle = jobScheduler.schedule( - new JobScheduler.Group( getClass().toString() ), + JobScheduler.Groups.membershipWaiter, evaluator, currentCatchupDelayInMs, MILLISECONDS ); catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) ); @@ -132,7 +132,7 @@ else if ( iAmAVotingMember() && caughtUpWithLeader() ) { currentCatchupDelayInMs += SECONDS.toMillis( 1 ); long longerDelay = currentCatchupDelayInMs < maxCatchupLag ? currentCatchupDelayInMs : maxCatchupLag; - jobScheduler.schedule( new JobScheduler.Group( MembershipWaiter.class.toString() ), this, + jobScheduler.schedule( JobScheduler.Groups.membershipWaiter, this, longerDelay, MILLISECONDS ); } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java index b8bf917c6d823..e5d6be22fd71c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastClient.java @@ -179,8 +179,8 @@ public void init0() @Override public void start0() { - keepAliveJob = scheduler.scheduleRecurring( "KeepAlive", timeToLive / 3, this::keepReadReplicaAlive ); - refreshTopologyJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, () -> { + keepAliveJob = scheduler.scheduleRecurring( JobScheduler.Groups.topologyKeepAlive, timeToLive / 3, this::keepReadReplicaAlive ); + refreshTopologyJob = scheduler.scheduleRecurring( JobScheduler.Groups.topologyRefresh, refreshPeriod, () -> { this.refreshTopology(); this.refreshRoles(); } ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java index 0e7cbaaeadcd2..55dee53b9503f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/discovery/HazelcastCoreTopologyService.java @@ -185,7 +185,7 @@ public void start0() { return; } - refreshJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, + refreshJob = scheduler.scheduleRecurring( JobScheduler.Groups.topologyRefresh, refreshPeriod, HazelcastCoreTopologyService.this::refreshTopology ); log.info( "Cluster discovery service started" ); } ); @@ -296,7 +296,7 @@ private HazelcastInstance createHazelcastInstance() logConnectionInfo( initialMembers ); c.addListenerConfig( new ListenerConfig( new OurMembershipListener() ) ); - JobScheduler.JobHandle logJob = scheduler.schedule( "HazelcastHealth", HAZELCAST_IS_HEALTHY_TIMEOUT_MS, + JobScheduler.JobHandle logJob = scheduler.schedule( JobScheduler.Groups.topologyHealth, HAZELCAST_IS_HEALTHY_TIMEOUT_MS, () -> log.warn( "The server has not been able to connect in a timely fashion to the " + "cluster. Please consult the logs for more details. Rebooting the server may " + "solve the problem." ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapper.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapper.java index 870c7d16561c0..8994588ce7305 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapper.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapper.java @@ -46,16 +46,14 @@ public RobustJobSchedulerWrapper( JobScheduler delegate, Log log ) this.log = log; } - public JobScheduler.JobHandle schedule( String name, long delayMillis, ThrowingAction action ) + public JobScheduler.JobHandle schedule( JobScheduler.Group group, long delayMillis, ThrowingAction action ) { - return delegate.schedule( new JobScheduler.Group( name ), - () -> withErrorHandling( action ), delayMillis, MILLISECONDS ); + return delegate.schedule( group, () -> withErrorHandling( action ), delayMillis, MILLISECONDS ); } - public JobScheduler.JobHandle scheduleRecurring( String name, long periodMillis, ThrowingAction action ) + public JobScheduler.JobHandle scheduleRecurring( JobScheduler.Group group, long periodMillis, ThrowingAction action ) { - return delegate.scheduleRecurring( new JobScheduler.Group( name ), - () -> withErrorHandling( action ), periodMillis, MILLISECONDS ); + return delegate.scheduleRecurring( group, () -> withErrorHandling( action ), periodMillis, MILLISECONDS ); } /** diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/ContinuousJobTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/ContinuousJobTest.java index 382fa8c3af66a..e23b056fe3e4c 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/ContinuousJobTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/ContinuousJobTest.java @@ -31,17 +31,16 @@ import org.neo4j.kernel.impl.scheduler.CentralJobScheduler; import org.neo4j.kernel.lifecycle.Lifespan; import org.neo4j.logging.NullLogProvider; -import org.neo4j.scheduler.JobScheduler.Group; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.neo4j.scheduler.JobScheduler.Groups.raftBatchHandler; public class ContinuousJobTest { private static final long DEFAULT_TIMEOUT_MS = 15_000; - private final Group jobGroup = new Group( "test" ); private final CentralJobScheduler scheduler = new CentralJobScheduler(); @Test @@ -52,7 +51,7 @@ public void shouldRunJobContinuously() throws Throwable Runnable task = latch::countDown; ContinuousJob continuousJob = - new ContinuousJob( scheduler.threadFactory( jobGroup ), task, NullLogProvider.getInstance() ); + new ContinuousJob( scheduler.threadFactory( raftBatchHandler ), task, NullLogProvider.getInstance() ); // when try ( Lifespan ignored = new Lifespan( scheduler, continuousJob ) ) @@ -75,7 +74,7 @@ public void shouldTerminateOnStop() throws Exception }; ContinuousJob continuousJob = - new ContinuousJob( scheduler.threadFactory( jobGroup ), task, NullLogProvider.getInstance() ); + new ContinuousJob( scheduler.threadFactory( raftBatchHandler ), task, NullLogProvider.getInstance() ); // when long startTime = System.currentTimeMillis(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/TimerServiceTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/TimerServiceTest.java index f325fc5c450ae..a9fcf0f587165 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/TimerServiceTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/TimerServiceTest.java @@ -48,7 +48,7 @@ public class TimerServiceTest { - private final JobScheduler.Group group = new JobScheduler.Group( "Test" ); + private final JobScheduler.Group group = JobScheduler.Groups.raft; private final TimeoutHandler handlerA = mock( TimeoutHandler.class ); private final TimeoutHandler handlerB = mock( TimeoutHandler.class ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/TimerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/TimerTest.java index d12d529edbf7c..292337bb12e07 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/TimerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/schedule/TimerTest.java @@ -46,7 +46,6 @@ public void shouldHandleConcurrentResetAndInvocationOfHandler() { // given CentralJobScheduler scheduler = lifeRule.add( new CentralJobScheduler() ); - JobScheduler.Group group = new JobScheduler.Group( "test" ); BinaryLatch invoked = new BinaryLatch(); BinaryLatch done = new BinaryLatch(); @@ -57,7 +56,7 @@ public void shouldHandleConcurrentResetAndInvocationOfHandler() done.await(); }; - Timer timer = new Timer( () -> "test", scheduler, getInstance(), group, handler ); + Timer timer = new Timer( () -> "test", scheduler, getInstance(), JobScheduler.Groups.raft, handler ); timer.set( new FixedTimeout( 0, SECONDS ) ); invoked.await(); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapperTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapperTest.java index 0377006dfc780..c6bf43d8fda46 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapperTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/helper/RobustJobSchedulerWrapperTest.java @@ -39,6 +39,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.neo4j.scheduler.JobScheduler.Groups.topologyHealth; +import static org.neo4j.scheduler.JobScheduler.Groups.topologyRefresh; import static org.neo4j.test.assertion.Assert.assertEventually; public class RobustJobSchedulerWrapperTest @@ -68,12 +70,11 @@ public void oneOffJobWithExceptionShouldLog() throws Exception IllegalStateException e = new IllegalStateException(); // when - JobHandle jobHandle = robustWrapper.schedule( "JobName", 100, () -> - { - count.incrementAndGet(); - throw e; - } - ); + JobHandle jobHandle = robustWrapper.schedule( topologyHealth, 100, () -> + { + count.incrementAndGet(); + throw e; + } ); // then assertEventually( "run count", count::get, Matchers.equalTo( 1 ), DEFAULT_TIMEOUT_MS, MILLISECONDS ); @@ -92,18 +93,17 @@ public void recurringJobWithExceptionShouldKeepRunning() throws Exception // when int nRuns = 100; - JobHandle jobHandle = robustWrapper.scheduleRecurring( "JobName", 1, () -> - { - if ( count.get() < nRuns ) - { - count.incrementAndGet(); - throw e; - } - } - ); + JobHandle jobHandle = robustWrapper.scheduleRecurring( topologyRefresh, 1, () -> + { + if ( count.get() < nRuns ) + { + count.incrementAndGet(); + throw e; + } + } ); // then - assertEventually( "run count", count::get, Matchers.equalTo( nRuns ), DEFAULT_TIMEOUT_MS , MILLISECONDS ); + assertEventually( "run count", count::get, Matchers.equalTo( nRuns ), DEFAULT_TIMEOUT_MS, MILLISECONDS ); jobHandle.cancel( true ); verify( log, timeout( DEFAULT_TIMEOUT_MS ).times( nRuns ) ).warn( "Uncaught exception", e ); } @@ -118,18 +118,17 @@ public void recurringJobWithErrorShouldStop() throws Exception Error e = new Error(); // when - JobHandle jobHandle = robustWrapper.scheduleRecurring( "JobName", 1, () -> - { - count.incrementAndGet(); - throw e; - } - ); + JobHandle jobHandle = robustWrapper.scheduleRecurring( topologyRefresh, 1, () -> + { + count.incrementAndGet(); + throw e; + } ); // when Thread.sleep( 50 ); // should not keep increasing during this time // then - assertEventually( "run count", count::get, Matchers.equalTo( 1 ), DEFAULT_TIMEOUT_MS , MILLISECONDS ); + assertEventually( "run count", count::get, Matchers.equalTo( 1 ), DEFAULT_TIMEOUT_MS, MILLISECONDS ); jobHandle.cancel( true ); verify( log, timeout( DEFAULT_TIMEOUT_MS ).times( 1 ) ).error( "Uncaught error rethrown", e ); } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/DefaultConversationSPITest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/DefaultConversationSPITest.java index 080f93cc4d65a..1bb60135bd256 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/DefaultConversationSPITest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/cluster/DefaultConversationSPITest.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.neo4j.scheduler.JobScheduler.Groups.slaveLocksTimeout; @RunWith( MockitoJUnitRunner.class ) public class DefaultConversationSPITest @@ -60,9 +61,8 @@ public void testAcquireClient() public void testScheduleRecurringJob() { Runnable job = mock( Runnable.class ); - JobScheduler.Group group = new JobScheduler.Group( "group" ); - conversationSpi.scheduleRecurringJob( group, 0, job ); + conversationSpi.scheduleRecurringJob( slaveLocksTimeout, 0, job ); - verify( jobScheduler ).scheduleRecurring( group, job, 0, TimeUnit.MILLISECONDS ); + verify( jobScheduler ).scheduleRecurring( slaveLocksTimeout, job, 0, TimeUnit.MILLISECONDS ); } }