Skip to content

Commit

Permalink
Collect all JobScheduler groups and make them constants.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Aug 6, 2018
1 parent 89509b7 commit 27a629e
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 85 deletions.
Expand Up @@ -45,7 +45,7 @@ final class Group
private final AtomicInteger threadCounter = new AtomicInteger(); private final AtomicInteger threadCounter = new AtomicInteger();
private final String name; private final String name;


public Group( String name ) private Group( String name )
{ {
Objects.requireNonNull( name, "Group name cannot be null." ); Objects.requireNonNull( name, "Group name cannot be null." );
this.name = name; this.name = name;
Expand Down Expand Up @@ -98,6 +98,12 @@ public int hashCode()
*/ */
class Groups class Groups
{ {
/**
* This group is used by the JobScheduler implementation itself, for the thread or threads that are in charge of the timely execution of delayed or
* recurring tasks.
*/
public static final Group taskScheduler = new Group( "Scheduler" );

/** Background index population */ /** Background index population */
public static final Group indexPopulation = new Group( "IndexPopulation" ); public static final Group indexPopulation = new Group( "IndexPopulation" );


Expand All @@ -122,8 +128,8 @@ class Groups
/** /**
* Gathers approximated data about the underlying data store. * Gathers approximated data about the underlying data store.
*/ */
public static final Group indexSamplingController = new Group( "IndexSamplingController" );
public static final Group indexSampling = new Group( "IndexSampling" ); public static final Group indexSampling = new Group( "IndexSampling" );
public static final Group indexSamplingController = indexSampling;


/** /**
* Rotates internal diagnostic logs * Rotates internal diagnostic logs
Expand All @@ -133,17 +139,17 @@ class Groups
/** /**
* Rotates query logs * Rotates query logs
*/ */
public static final Group queryLogRotation = new Group( "queryLogRotation" ); public static final Group queryLogRotation = internalLogRotation;


/** /**
* Rotates bolt message logs * Rotates bolt message logs
*/ */
public static final Group boltLogRotation = new Group( "BoltLogRotation" ); public static final Group boltLogRotation = internalLogRotation;


/** /**
* Rotates metrics csv files * Rotates metrics csv files
*/ */
public static final Group metricsLogRotations = new Group( "MetricsLogRotations" ); public static final Group metricsLogRotations = internalLogRotation;


/** /**
* Checkpoint and store flush * Checkpoint and store flush
Expand All @@ -155,6 +161,17 @@ class Groups
*/ */
public static final Group raftLogPruning = new Group( "RaftLogPruning" ); public static final Group raftLogPruning = new Group( "RaftLogPruning" );


/**
* Raft timers.
*/
public static final Group raft = new Group( "RaftTimer" );
public static final Group raftBatchHandler = new Group( "RaftBatchHandler" );
public static final Group raftReaderPoolPruner = new Group( "RaftReaderPoolPruner" );
public static final Group topologyHealth = new Group( "HazelcastHealth" );
public static final Group topologyKeepAlive = new Group( "KeepAlive" );
public static final Group topologyRefresh = new Group( "TopologyRefresh" );
public static final Group membershipWaiter = new Group( "MembershipWaiter" );

/** /**
* Network IO threads for the Bolt protocol. * Network IO threads for the Bolt protocol.
*/ */
Expand All @@ -168,7 +185,7 @@ class Groups
/** /**
* Snapshot downloader * Snapshot downloader
*/ */
public static final Group downloadSnapshot = new JobScheduler.Group( "DownloadSnapshot" ); public static final Group downloadSnapshot = new Group( "DownloadSnapshot" );


/** /**
* UDC timed events. * UDC timed events.
Expand All @@ -180,11 +197,6 @@ class Groups
*/ */
public static final Group storageMaintenance = new Group( "StorageMaintenance" ); public static final Group storageMaintenance = new Group( "StorageMaintenance" );


/**
* Raft timers.
*/
public static final Group raft = new Group( "RaftTimer" );

/** /**
* Native security. * Native security.
*/ */
Expand Down Expand Up @@ -223,7 +235,7 @@ class Groups
/** /**
* Bolt scheduler worker * Bolt scheduler worker
*/ */
public static Group boltWorker = new Group( "BoltWorker" ); public static final Group boltWorker = new Group( "BoltWorker" );


private Groups() private Groups()
{ {
Expand Down
Expand Up @@ -37,7 +37,6 @@
public class CentralJobScheduler extends LifecycleAdapter implements JobScheduler public class CentralJobScheduler extends LifecycleAdapter implements JobScheduler
{ {
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(); private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final Group SCHEDULER_GROUP = new Group( "Scheduler" );


private final TimeBasedTaskScheduler scheduler; private final TimeBasedTaskScheduler scheduler;
private final Thread schedulerThread; private final Thread schedulerThread;
Expand Down Expand Up @@ -73,7 +72,7 @@ public CentralJobScheduler()
workStealingExecutors = new ConcurrentHashMap<>( 1 ); workStealingExecutors = new ConcurrentHashMap<>( 1 );
topLevelGroup = new TopLevelGroup(); topLevelGroup = new TopLevelGroup();
pools = new ThreadPoolManager( topLevelGroup ); pools = new ThreadPoolManager( topLevelGroup );
ThreadFactory threadFactory = new GroupedDaemonThreadFactory( SCHEDULER_GROUP, topLevelGroup ); ThreadFactory threadFactory = new GroupedDaemonThreadFactory( Groups.taskScheduler, topLevelGroup );
scheduler = new TimeBasedTaskScheduler( Clocks.nanoClock(), pools ); scheduler = new TimeBasedTaskScheduler( Clocks.nanoClock(), pools );


// The scheduler thread runs at slightly elevated priority for timeliness, and is started in init(). // The scheduler thread runs at slightly elevated priority for timeliness, and is started in init().
Expand Down
Expand Up @@ -127,7 +127,7 @@ public void shouldRunWithDelay() throws Throwable


long time = System.nanoTime(); long time = System.nanoTime();


scheduler.schedule( new JobScheduler.Group( "group" ), () -> scheduler.schedule( indexPopulation, () ->
{ {
runTime.set( System.nanoTime() ); runTime.set( System.nanoTime() );
latch.countDown(); latch.countDown();
Expand All @@ -144,7 +144,6 @@ public void longRunningScheduledJobsMustNotDelayOtherLongRunningJobs()
life.start(); life.start();


List<JobHandle> handles = new ArrayList<>( 30 ); List<JobHandle> handles = new ArrayList<>( 30 );
JobScheduler.Group group = new JobScheduler.Group( "test" );
AtomicLong startedCounter = new AtomicLong(); AtomicLong startedCounter = new AtomicLong();
BinaryLatch blockLatch = new BinaryLatch(); BinaryLatch blockLatch = new BinaryLatch();
Runnable task = () -> Runnable task = () ->
Expand All @@ -155,15 +154,15 @@ public void longRunningScheduledJobsMustNotDelayOtherLongRunningJobs()


for ( int i = 0; i < 10; i++ ) for ( int i = 0; i < 10; i++ )
{ {
handles.add( scheduler.schedule( group, task, 0, TimeUnit.MILLISECONDS ) ); handles.add( scheduler.schedule( indexPopulation, task, 0, TimeUnit.MILLISECONDS ) );
} }
for ( int i = 0; i < 10; i++ ) for ( int i = 0; i < 10; i++ )
{ {
handles.add( scheduler.scheduleRecurring( group, task, Integer.MAX_VALUE, TimeUnit.MILLISECONDS ) ); handles.add( scheduler.scheduleRecurring( indexPopulation, task, Integer.MAX_VALUE, TimeUnit.MILLISECONDS ) );
} }
for ( int i = 0; i < 10; i++ ) for ( int i = 0; i < 10; i++ )
{ {
handles.add( scheduler.scheduleRecurring( group, task, 0, Integer.MAX_VALUE, TimeUnit.MILLISECONDS ) ); handles.add( scheduler.scheduleRecurring( indexPopulation, task, 0, Integer.MAX_VALUE, TimeUnit.MILLISECONDS ) );
} }


long deadline = TimeUnit.SECONDS.toNanos( 10 ) + System.nanoTime(); long deadline = TimeUnit.SECONDS.toNanos( 10 ) + System.nanoTime();
Expand Down
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.neo4j.scheduler.JobScheduler.Group;
import org.neo4j.scheduler.JobScheduler.JobHandle; import org.neo4j.scheduler.JobScheduler.JobHandle;
import org.neo4j.time.FakeClock; import org.neo4j.time.FakeClock;
import org.neo4j.util.concurrent.BinaryLatch; import org.neo4j.util.concurrent.BinaryLatch;
Expand All @@ -40,6 +39,7 @@
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.neo4j.scheduler.JobScheduler.Groups.taskScheduler;


public class TimeBasedTaskSchedulerTest public class TimeBasedTaskSchedulerTest
{ {
Expand All @@ -48,7 +48,6 @@ public class TimeBasedTaskSchedulerTest
private TimeBasedTaskScheduler scheduler; private TimeBasedTaskScheduler scheduler;
private AtomicInteger counter; private AtomicInteger counter;
private Semaphore semaphore; private Semaphore semaphore;
private Group group;


@Before @Before
public void setUp() public void setUp()
Expand All @@ -58,7 +57,6 @@ public void setUp()
scheduler = new TimeBasedTaskScheduler( clock, pools ); scheduler = new TimeBasedTaskScheduler( clock, pools );
counter = new AtomicInteger(); counter = new AtomicInteger();
semaphore = new Semaphore( 0 ); semaphore = new Semaphore( 0 );
group = new Group( "test" );
} }


@After @After
Expand Down Expand Up @@ -93,7 +91,7 @@ private void assertSemaphoreAcquire() throws InterruptedException
@Test @Test
public void mustDelayExecution() throws Exception public void mustDelayExecution() throws Exception
{ {
JobHandle handle = scheduler.submit( group, counter::incrementAndGet, 100, 0 ); JobHandle handle = scheduler.submit( taskScheduler, counter::incrementAndGet, 100, 0 );
scheduler.tick(); scheduler.tick();
assertThat( counter.get(), is( 0 ) ); assertThat( counter.get(), is( 0 ) );
clock.forward( 99, TimeUnit.NANOSECONDS ); clock.forward( 99, TimeUnit.NANOSECONDS );
Expand All @@ -108,8 +106,8 @@ public void mustDelayExecution() throws Exception
@Test @Test
public void mustOnlyScheduleTasksThatAreDue() throws Exception public void mustOnlyScheduleTasksThatAreDue() throws Exception
{ {
JobHandle handle1 = scheduler.submit( group, () -> counter.addAndGet( 10 ), 100, 0 ); JobHandle handle1 = scheduler.submit( taskScheduler, () -> counter.addAndGet( 10 ), 100, 0 );
JobHandle handle2 = scheduler.submit( group, () -> counter.addAndGet( 100 ), 200, 0 ); JobHandle handle2 = scheduler.submit( taskScheduler, () -> counter.addAndGet( 100 ), 200, 0 );
scheduler.tick(); scheduler.tick();
assertThat( counter.get(), is( 0 ) ); assertThat( counter.get(), is( 0 ) );
clock.forward( 199, TimeUnit.NANOSECONDS ); clock.forward( 199, TimeUnit.NANOSECONDS );
Expand All @@ -125,22 +123,22 @@ public void mustOnlyScheduleTasksThatAreDue() throws Exception
@Test @Test
public void mustNotRescheduleDelayedTasks() throws Exception public void mustNotRescheduleDelayedTasks() throws Exception
{ {
JobHandle handle = scheduler.submit( group, counter::incrementAndGet, 100, 0 ); JobHandle handle = scheduler.submit( taskScheduler, counter::incrementAndGet, 100, 0 );
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
handle.waitTermination(); handle.waitTermination();
assertThat( counter.get(), is( 1 ) ); assertThat( counter.get(), is( 1 ) );
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
handle.waitTermination(); handle.waitTermination();
pools.getThreadPool( group ).shutDown(); pools.getThreadPool( taskScheduler ).shutDown();
assertThat( counter.get(), is( 1 ) ); assertThat( counter.get(), is( 1 ) );
} }


@Test @Test
public void mustRescheduleRecurringTasks() throws Exception public void mustRescheduleRecurringTasks() throws Exception
{ {
scheduler.submit( group, semaphore::release, 100, 100 ); scheduler.submit( taskScheduler, semaphore::release, 100, 100 );
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
assertSemaphoreAcquire(); assertSemaphoreAcquire();
Expand All @@ -157,7 +155,7 @@ public void mustNotRescheduleRecurringTasksThatThrows() throws Exception
semaphore.release(); semaphore.release();
throw new RuntimeException( "boom" ); throw new RuntimeException( "boom" );
}; };
JobHandle handle = scheduler.submit( group, runnable, 100, 100 ); JobHandle handle = scheduler.submit( taskScheduler, runnable, 100, 100 );
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
assertSemaphoreAcquire(); assertSemaphoreAcquire();
Expand All @@ -183,14 +181,14 @@ public void mustNotStartRecurringTasksWherePriorExecutionHasNotYetFinished()
counter.incrementAndGet(); counter.incrementAndGet();
semaphore.acquireUninterruptibly(); semaphore.acquireUninterruptibly();
}; };
scheduler.submit( group, runnable, 100, 100 ); scheduler.submit( taskScheduler, runnable, 100, 100 );
for ( int i = 0; i < 4; i++ ) for ( int i = 0; i < 4; i++ )
{ {
scheduler.tick(); scheduler.tick();
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
} }
semaphore.release( Integer.MAX_VALUE ); semaphore.release( Integer.MAX_VALUE );
pools.getThreadPool( group ).shutDown(); pools.getThreadPool( taskScheduler ).shutDown();
assertThat( counter.get(), is( 1 ) ); assertThat( counter.get(), is( 1 ) );
} }


Expand All @@ -200,8 +198,8 @@ public void longRunningTasksMustNotDelayExecutionOfOtherTasks() throws Exception
BinaryLatch latch = new BinaryLatch(); BinaryLatch latch = new BinaryLatch();
Runnable longRunning = latch::await; Runnable longRunning = latch::await;
Runnable shortRunning = semaphore::release; Runnable shortRunning = semaphore::release;
scheduler.submit( group, longRunning, 100, 100 ); scheduler.submit( taskScheduler, longRunning, 100, 100 );
scheduler.submit( group, shortRunning, 100, 100 ); scheduler.submit( taskScheduler, shortRunning, 100, 100 );
for ( int i = 0; i < 4; i++ ) for ( int i = 0; i < 4; i++ )
{ {
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
Expand All @@ -215,14 +213,14 @@ public void longRunningTasksMustNotDelayExecutionOfOtherTasks() throws Exception
public void delayedTasksMustNotRunIfCancelledFirst() throws Exception public void delayedTasksMustNotRunIfCancelledFirst() throws Exception
{ {
List<Boolean> cancelListener = new ArrayList<>(); List<Boolean> cancelListener = new ArrayList<>();
JobHandle handle = scheduler.submit( group, counter::incrementAndGet, 100, 0 ); JobHandle handle = scheduler.submit( taskScheduler, counter::incrementAndGet, 100, 0 );
handle.registerCancelListener( cancelListener::add ); handle.registerCancelListener( cancelListener::add );
clock.forward( 90, TimeUnit.NANOSECONDS ); clock.forward( 90, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
handle.cancel( false ); handle.cancel( false );
clock.forward( 10, TimeUnit.NANOSECONDS ); clock.forward( 10, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
pools.getThreadPool( group ).shutDown(); pools.getThreadPool( taskScheduler ).shutDown();
assertThat( counter.get(), is( 0 ) ); assertThat( counter.get(), is( 0 ) );
assertThat( cancelListener, contains( Boolean.FALSE ) ); assertThat( cancelListener, contains( Boolean.FALSE ) );
try try
Expand All @@ -245,7 +243,7 @@ public void recurringTasksMustStopWhenCancelled() throws InterruptedException
counter.incrementAndGet(); counter.incrementAndGet();
semaphore.release(); semaphore.release();
}; };
JobHandle handle = scheduler.submit( group, recurring, 100, 100 ); JobHandle handle = scheduler.submit( taskScheduler, recurring, 100, 100 );
handle.registerCancelListener( cancelListener::add ); handle.registerCancelListener( cancelListener::add );
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
Expand All @@ -258,7 +256,7 @@ public void recurringTasksMustStopWhenCancelled() throws InterruptedException
scheduler.tick(); scheduler.tick();
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
pools.getThreadPool( group ).shutDown(); pools.getThreadPool( taskScheduler ).shutDown();
assertThat( counter.get(), is( 2 ) ); assertThat( counter.get(), is( 2 ) );
assertThat( cancelListener, contains( Boolean.TRUE ) ); assertThat( cancelListener, contains( Boolean.TRUE ) );
} }
Expand All @@ -271,7 +269,7 @@ public void overdueRecurringTasksMustStartAsSoonAsPossible()
counter.incrementAndGet(); counter.incrementAndGet();
semaphore.acquireUninterruptibly(); semaphore.acquireUninterruptibly();
}; };
JobHandle handle = scheduler.submit( group, recurring, 100, 100 ); JobHandle handle = scheduler.submit( taskScheduler, recurring, 100, 100 );
clock.forward( 100, TimeUnit.NANOSECONDS ); clock.forward( 100, TimeUnit.NANOSECONDS );
scheduler.tick(); scheduler.tick();
while ( counter.get() < 1 ) while ( counter.get() < 1 )
Expand Down
Expand Up @@ -170,7 +170,7 @@ private LifecycleMessageHandler<ReceivedInstantClusterIdAwareMessage<?>> createM
private ComposableMessageHandler createBatchingHandler( Config config ) private ComposableMessageHandler createBatchingHandler( Config config )
{ {
Function<Runnable,ContinuousJob> jobFactory = runnable -> new ContinuousJob( Function<Runnable,ContinuousJob> jobFactory = runnable -> new ContinuousJob(
platformModule.jobScheduler.threadFactory( new JobScheduler.Group( "raft-batch-handler" ) ), runnable, platformModule.jobScheduler.threadFactory( JobScheduler.Groups.raftBatchHandler ), runnable,
logProvider ); logProvider );


BoundedPriorityQueue.Config inQueueConfig = new BoundedPriorityQueue.Config( config.get( raft_in_queue_size ), BoundedPriorityQueue.Config inQueueConfig = new BoundedPriorityQueue.Config( config.get( raft_in_queue_size ),
Expand Down
Expand Up @@ -118,7 +118,7 @@ public synchronized void start() throws IOException, DamagedLogStorageException,
rotateSegment( state.appendIndex, state.appendIndex, state.terms.latest() ); rotateSegment( state.appendIndex, state.appendIndex, state.terms.latest() );
} }


readerPoolPruner = scheduler.scheduleRecurring( new JobScheduler.Group( "reader-pool-pruner" ), readerPoolPruner = scheduler.scheduleRecurring( JobScheduler.Groups.raftReaderPoolPruner,
() -> readerPool.prune( READER_POOL_MAX_AGE, MINUTES ), READER_POOL_MAX_AGE, READER_POOL_MAX_AGE, MINUTES ); () -> readerPool.prune( READER_POOL_MAX_AGE, MINUTES ), READER_POOL_MAX_AGE, READER_POOL_MAX_AGE, MINUTES );
} }


Expand Down
Expand Up @@ -91,7 +91,7 @@ CompletableFuture<Boolean> waitUntilCaughtUpMember( RaftMachine raft )
Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier ); Evaluator evaluator = new Evaluator( raft, catchUpFuture, dbHealthSupplier );


JobScheduler.JobHandle jobHandle = jobScheduler.schedule( JobScheduler.JobHandle jobHandle = jobScheduler.schedule(
new JobScheduler.Group( getClass().toString() ), JobScheduler.Groups.membershipWaiter,
evaluator, currentCatchupDelayInMs, MILLISECONDS ); evaluator, currentCatchupDelayInMs, MILLISECONDS );


catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) ); catchUpFuture.whenComplete( ( result, e ) -> jobHandle.cancel( true ) );
Expand Down Expand Up @@ -132,7 +132,7 @@ else if ( iAmAVotingMember() && caughtUpWithLeader() )
{ {
currentCatchupDelayInMs += SECONDS.toMillis( 1 ); currentCatchupDelayInMs += SECONDS.toMillis( 1 );
long longerDelay = currentCatchupDelayInMs < maxCatchupLag ? currentCatchupDelayInMs : maxCatchupLag; long longerDelay = currentCatchupDelayInMs < maxCatchupLag ? currentCatchupDelayInMs : maxCatchupLag;
jobScheduler.schedule( new JobScheduler.Group( MembershipWaiter.class.toString() ), this, jobScheduler.schedule( JobScheduler.Groups.membershipWaiter, this,
longerDelay, MILLISECONDS ); longerDelay, MILLISECONDS );
} }
} }
Expand Down
Expand Up @@ -179,8 +179,8 @@ public void init0()
@Override @Override
public void start0() public void start0()
{ {
keepAliveJob = scheduler.scheduleRecurring( "KeepAlive", timeToLive / 3, this::keepReadReplicaAlive ); keepAliveJob = scheduler.scheduleRecurring( JobScheduler.Groups.topologyKeepAlive, timeToLive / 3, this::keepReadReplicaAlive );
refreshTopologyJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, () -> { refreshTopologyJob = scheduler.scheduleRecurring( JobScheduler.Groups.topologyRefresh, refreshPeriod, () -> {
this.refreshTopology(); this.refreshTopology();
this.refreshRoles(); this.refreshRoles();
} ); } );
Expand Down
Expand Up @@ -185,7 +185,7 @@ public void start0()
{ {
return; return;
} }
refreshJob = scheduler.scheduleRecurring( "TopologyRefresh", refreshPeriod, refreshJob = scheduler.scheduleRecurring( JobScheduler.Groups.topologyRefresh, refreshPeriod,
HazelcastCoreTopologyService.this::refreshTopology ); HazelcastCoreTopologyService.this::refreshTopology );
log.info( "Cluster discovery service started" ); log.info( "Cluster discovery service started" );
} ); } );
Expand Down Expand Up @@ -296,7 +296,7 @@ private HazelcastInstance createHazelcastInstance()
logConnectionInfo( initialMembers ); logConnectionInfo( initialMembers );
c.addListenerConfig( new ListenerConfig( new OurMembershipListener() ) ); c.addListenerConfig( new ListenerConfig( new OurMembershipListener() ) );


JobScheduler.JobHandle logJob = scheduler.schedule( "HazelcastHealth", HAZELCAST_IS_HEALTHY_TIMEOUT_MS, JobScheduler.JobHandle logJob = scheduler.schedule( JobScheduler.Groups.topologyHealth, HAZELCAST_IS_HEALTHY_TIMEOUT_MS,
() -> log.warn( "The server has not been able to connect in a timely fashion to the " + () -> log.warn( "The server has not been able to connect in a timely fashion to the " +
"cluster. Please consult the logs for more details. Rebooting the server may " + "cluster. Please consult the logs for more details. Rebooting the server may " +
"solve the problem." ) ); "solve the problem." ) );
Expand Down
Expand Up @@ -46,16 +46,14 @@ public RobustJobSchedulerWrapper( JobScheduler delegate, Log log )
this.log = log; this.log = log;
} }


public JobScheduler.JobHandle schedule( String name, long delayMillis, ThrowingAction<Exception> action ) public JobScheduler.JobHandle schedule( JobScheduler.Group group, long delayMillis, ThrowingAction<Exception> action )
{ {
return delegate.schedule( new JobScheduler.Group( name ), return delegate.schedule( group, () -> withErrorHandling( action ), delayMillis, MILLISECONDS );
() -> withErrorHandling( action ), delayMillis, MILLISECONDS );
} }


public JobScheduler.JobHandle scheduleRecurring( String name, long periodMillis, ThrowingAction<Exception> action ) public JobScheduler.JobHandle scheduleRecurring( JobScheduler.Group group, long periodMillis, ThrowingAction<Exception> action )
{ {
return delegate.scheduleRecurring( new JobScheduler.Group( name ), return delegate.scheduleRecurring( group, () -> withErrorHandling( action ), periodMillis, MILLISECONDS );
() -> withErrorHandling( action ), periodMillis, MILLISECONDS );
} }


/** /**
Expand Down

0 comments on commit 27a629e

Please sign in to comment.