Skip to content

Commit

Permalink
feat: Add tracking of heartbeat results and executions with stale hea…
Browse files Browse the repository at this point in the history
…rtbeat (#432)

Add tracking of heartbeat results and executions with stale heartbeat to
enable acting on failing heartbeats.

* Configurable number of heartbeats before considered dead (minimum is
4)
    * New method `schedulerBuilder.missedHeartbeatsLimit(<limit>)`
* Increase default limit for missed heartbeats before considered dead
from 4 to 6
* Track the number of failed heartbeats and make it available to users
* New ExecutionContext method,
`ctx.getCurrentlyExecuting().getHeartbeatState().getFractionDead()`
* New scheduler methods for checking for stale heartbeats
    * `scheduler.getCurrentlyExecutingWithStaleHeartbeat()`
*
`scheduler.getCurrentlyExecuting().get(0).getHeartbeatState().getFractionDead()`
* New `SchedulerStatsEvent`s in `StatsRegistry`: `FAILED_HEARTBEAT`,
`FAILED_MULTIPLE_HEARTBEATS` (replaces `UNEXPECTED_ERROR` for failed
heartbeats)
* New example `HeartbeatMonitoringMain`



## Fixes
* #402 


## Reminders
- [x] Added/ran automated tests
- [x] Update README and/or examples
- [x] Ran `mvn spotless:apply`
  • Loading branch information
kagkarlsson committed Mar 11, 2024
1 parent cadf433 commit 823c9d2
Show file tree
Hide file tree
Showing 27 changed files with 626 additions and 56 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ this, but testing has shown this is prone to deadlocks and thus not recommended
:gear: `.heartbeatInterval(Duration)`<br/>
How often to update the heartbeat timestamp for running executions. Default `5m`.

:gear: `.missedHeartbeatsLimit(int)`<br/>
How many heartbeats may be missed before the execution is considered dead. Default `1`.

:gear: `.schedulerName(SchedulerName)`<br/>
Name of this scheduler-instance. The name is stored in the database when an execution is picked by a scheduler.
Default `<hostname>`.
Expand Down Expand Up @@ -479,9 +482,13 @@ Use-cases might be:

### Dead executions

During execution, the scheduler regularly updates a heartbeat-time for the task-execution. If an execution is marked as executing, but is not receiving updates to the heartbeat-time, it will be considered a _dead execution_ after time X. That may for example happen if the JVM running the scheduler suddenly exits.
During execution, the scheduler regularly updates a heartbeat-time for the task-execution.
If an execution is marked as executing, but is not receiving updates to the heartbeat-time,
it will be considered a _dead execution_ after time X. That may for example happen if the
JVM running the scheduler suddenly exits.

When a dead execution is found, the `Task`is consulted to see what should be done. A dead `RecurringTask` is typically rescheduled to `now()`.
When a dead execution is found, the `Task`is consulted to see what should be done. A dead
`RecurringTask` is typically rescheduled to `now()`.

## Performance

Expand Down
1 change: 0 additions & 1 deletion db-scheduler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
<java8-matchers.version>1.6</java8-matchers.version>
<equals-verifier.version>3.14.2</equals-verifier.version>
<bytebuddy.version>1.14.5</bytebuddy.version>
<micro-jdbc.version>0.6.0</micro-jdbc.version>
<zonky-pg-embedded.version>2.0.4</zonky-pg-embedded.version>
<postgresql.version>42.5.1</postgresql.version>
<slf4j.version>1.7.36</slf4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ public class CurrentlyExecuting {
private final Execution execution;
private final Clock clock;
private final Instant startTime;
private final HeartbeatState heartbeatState;

public CurrentlyExecuting(Execution execution, Clock clock) {
public CurrentlyExecuting(Execution execution, Clock clock, HeartbeatConfig heartbeatConfig) {
this.execution = execution;
this.clock = clock;
this.startTime = clock.now();
this.heartbeatState = new HeartbeatState(clock, startTime, heartbeatConfig);
}

public Duration getDuration() {
Expand All @@ -42,4 +44,12 @@ public TaskInstance getTaskInstance() {
public Execution getExecution() {
return execution;
}

public HeartbeatState getHeartbeatState() {
return heartbeatState;
}

public void heartbeat(boolean successful, Instant now) {
heartbeatState.heartbeat(successful, now);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class ExecutePicked implements Runnable {
private final SchedulerState schedulerState;
private final ConfigurableLogger failureLogger;
private final Clock clock;
private HeartbeatConfig heartbeatConfig;
private final Execution pickedExecution;

public ExecutePicked(
Expand All @@ -51,6 +52,7 @@ public ExecutePicked(
SchedulerState schedulerState,
ConfigurableLogger failureLogger,
Clock clock,
HeartbeatConfig heartbeatConfig,
Execution pickedExecution) {
this.executor = executor;
this.taskRepository = taskRepository;
Expand All @@ -61,23 +63,26 @@ public ExecutePicked(
this.schedulerState = schedulerState;
this.failureLogger = failureLogger;
this.clock = clock;
this.heartbeatConfig = heartbeatConfig;
this.pickedExecution = pickedExecution;
}

@Override
public void run() {
// FIXLATER: need to cleanup all the references back to scheduler fields
final UUID executionId =
executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock));
CurrentlyExecuting currentlyExecuting =
new CurrentlyExecuting(pickedExecution, clock, heartbeatConfig);
final UUID executionId = executor.addCurrentlyProcessing(currentlyExecuting);

try {
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
executePickedExecution(pickedExecution);
executePickedExecution(pickedExecution, currentlyExecuting);
} finally {
executor.removeCurrentlyProcessing(executionId);
}
}

private void executePickedExecution(Execution execution) {
private void executePickedExecution(Execution execution, CurrentlyExecuting currentlyExecuting) {
final Optional<Task> task = taskResolver.resolve(execution.taskInstance.getTaskName());
if (!task.isPresent()) {
LOG.error(
Expand All @@ -94,7 +99,8 @@ private void executePickedExecution(Execution execution) {
task.get()
.execute(
execution.taskInstance,
new ExecutionContext(schedulerState, execution, schedulerClient));
new ExecutionContext(
schedulerState, execution, schedulerClient, currentlyExecuting));
LOG.debug("Execution done: " + execution);

complete(completion, execution, executionStarted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class FetchCandidates implements PollStrategy {
private final Clock clock;
private final PollingStrategyConfig pollingStrategyConfig;
private final Runnable triggerCheckForNewExecutions;
private HeartbeatConfig heartbeatConfig;
AtomicInteger currentGenerationNumber = new AtomicInteger(0);
private final int lowerLimit;
private final int upperLimit;
Expand All @@ -53,7 +54,8 @@ public FetchCandidates(
TaskResolver taskResolver,
Clock clock,
PollingStrategyConfig pollingStrategyConfig,
Runnable triggerCheckForNewExecutions) {
Runnable triggerCheckForNewExecutions,
HeartbeatConfig heartbeatConfig) {
this.executor = executor;
this.taskRepository = taskRepository;
this.schedulerClient = schedulerClient;
Expand All @@ -65,6 +67,7 @@ public FetchCandidates(
this.clock = clock;
this.pollingStrategyConfig = pollingStrategyConfig;
this.triggerCheckForNewExecutions = triggerCheckForNewExecutions;
this.heartbeatConfig = heartbeatConfig;
lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize);
// FIXLATER: this is not "upper limit", but rather nr of executions to get. those already in
// queue will become stale
Expand Down Expand Up @@ -106,6 +109,7 @@ public void run() {
schedulerState,
failureLogger,
clock,
heartbeatConfig,
picked)
.run());
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;

import java.time.Duration;

public class HeartbeatConfig {

public final Duration heartbeatInterval;
public final int missedHeartbeatsLimit;
public final Duration maxAgeBeforeConsideredDead;

public HeartbeatConfig(
Duration heartbeatInterval, int missedHeartbeatsLimit, Duration maxAgeBeforeConsideredDead) {
this.heartbeatInterval = heartbeatInterval;
this.missedHeartbeatsLimit = missedHeartbeatsLimit;
this.maxAgeBeforeConsideredDead = maxAgeBeforeConsideredDead;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;

import java.time.Duration;
import java.time.Instant;

public class HeartbeatState {
private Clock clock;
private final Instant startTime;
private final HeartbeatConfig heartbeatConfig;
private int heartbeatSuccessesSinceLastFailure = 0;
private int heartbeatFailuresSinceLastSuccess = 0;
private Instant heartbeatLastSuccess;
private Instant heartbeatLastFailure;

public HeartbeatState(Clock clock, Instant startTime, HeartbeatConfig heartbeatConfig) {
this.clock = clock;
this.startTime = startTime;
this.heartbeatConfig = heartbeatConfig;
heartbeatLastSuccess = startTime;
}

public boolean hasStaleHeartbeat() {
Duration sinceLastSuccess = Duration.between(heartbeatLastSuccess, clock.now());

long heartbeatMillis = heartbeatConfig.heartbeatInterval.toMillis();
long millisUntilConsideredStale =
heartbeatMillis + Math.min(10_000, (int) (heartbeatMillis * 0.25));
return heartbeatFailuresSinceLastSuccess > 0
|| sinceLastSuccess.toMillis() > millisUntilConsideredStale;
}

public double getFractionDead() {
Duration sinceLastSuccess = Duration.between(heartbeatLastSuccess, clock.now());
return (double) sinceLastSuccess.toMillis()
/ heartbeatConfig.maxAgeBeforeConsideredDead.toMillis();
}

public int getFailedHeartbeats() {
return heartbeatFailuresSinceLastSuccess;
}

public void heartbeat(boolean successful, Instant lastHeartbeatAttempt) {
if (successful) {
heartbeatLastSuccess = lastHeartbeatAttempt;
heartbeatSuccessesSinceLastFailure++;
heartbeatFailuresSinceLastSuccess = 0;
} else {
heartbeatLastFailure = lastHeartbeatAttempt;
heartbeatSuccessesSinceLastFailure = 0;
heartbeatFailuresSinceLastSuccess++;
}
}

public String describe() {
return "HeartbeatState{"
+ "successesSinceLastFailure="
+ heartbeatSuccessesSinceLastFailure
+ ", failuresSinceLastSuccess="
+ heartbeatFailuresSinceLastSuccess
+ ", lastSuccess="
+ heartbeatLastSuccess
+ ", lastFailure="
+ heartbeatLastFailure
+ ", missedHeartbeatsLimit="
+ heartbeatConfig.missedHeartbeatsLimit
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class LockAndFetchCandidates implements PollStrategy {
private final Clock clock;
private final PollingStrategyConfig pollingStrategyConfig;
private final Runnable triggerCheckForNewExecutions;
private HeartbeatConfig maxAgeBeforeConsideredDead;
private final int lowerLimit;
private final int upperLimit;
private AtomicBoolean moreExecutionsInDatabase = new AtomicBoolean(false);
Expand All @@ -51,7 +52,8 @@ public LockAndFetchCandidates(
TaskResolver taskResolver,
Clock clock,
PollingStrategyConfig pollingStrategyConfig,
Runnable triggerCheckForNewExecutions) {
Runnable triggerCheckForNewExecutions,
HeartbeatConfig maxAgeBeforeConsideredDead) {
this.executor = executor;
this.taskRepository = taskRepository;
this.schedulerClient = schedulerClient;
Expand All @@ -63,6 +65,7 @@ public LockAndFetchCandidates(
this.clock = clock;
this.pollingStrategyConfig = pollingStrategyConfig;
this.triggerCheckForNewExecutions = triggerCheckForNewExecutions;
this.maxAgeBeforeConsideredDead = maxAgeBeforeConsideredDead;
lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize);
upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize);
}
Expand Down Expand Up @@ -106,6 +109,7 @@ public void run() {
schedulerState,
failureLogger,
clock,
maxAgeBeforeConsideredDead,
picked),
() -> {
if (moreExecutionsInDatabase.get()
Expand Down

0 comments on commit 823c9d2

Please sign in to comment.