Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1180: Improve logging when WorkItems run for too long #1222

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -58,7 +58,7 @@ enum TaskPhases {
END
}

private AtomicInteger workIdCounter = new AtomicInteger();
private final AtomicInteger workIdCounter = new AtomicInteger();

private class RunnableWorkItem implements Runnable {
private static final Counter.WithThreeLabels EXCEPTIONS_COUNTER =
@@ -71,6 +71,11 @@ private class RunnableWorkItem implements Runnable {
Gauge.name("skara_runner_allocated_bytes").labels("bot", "work_item").register();

private final WorkItem item;
private final int workId = workIdCounter.incrementAndGet();
private final Instant createTime = Instant.now();
// This gets updated by the watchdog when a timeout occurs to avoid
// repeating the timeout log messages too often.
private Instant timeoutWarningTime = createTime;

RunnableWorkItem(WorkItem wrappedItem) {
item = wrappedItem;
@@ -179,7 +184,7 @@ private void runMeasured() {

Collection<WorkItem> followUpItems = null;
try (var __ = new LogContext(Map.of("work_item", item.toString(),
"work_id", String.valueOf(workIdCounter.incrementAndGet())))) {
"work_id", String.valueOf(workId)))) {
log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN);
try {
followUpItems = item.run(scratchPath);
@@ -225,17 +230,20 @@ private void runMeasured() {

if (maySubmit) {
pending.remove(candidate);
executor.submit(new RunnableWorkItem(candidate));
active.put(candidate, Instant.now());
RunnableWorkItem runnableWorkItem = new RunnableWorkItem(candidate);
executor.submit(runnableWorkItem);
active.put(candidate, runnableWorkItem);
log.finer("Submitting candidate: " + candidate);
}
}
}
}
}

// Mapping of pending items to the active item preventing them from running
private final Map<WorkItem, Optional<WorkItem>> pending;
private final Map<WorkItem, Instant> active;
// Mapping of active WorkItem to their RunnableWorkItem
private final Map<WorkItem, RunnableWorkItem> active;
private final Deque<Path> scratchPaths;

private static final Counter.WithTwoLabels SCHEDULED_COUNTER =
@@ -249,13 +257,13 @@ private void submitOrSchedule(WorkItem item) {
for (var activeItem : active.keySet()) {
if (!activeItem.concurrentWith(item)) {

for (var pendingItem : pending.entrySet()) {
for (var pendingItem : pending.keySet()) {
// If there are pending items of the same type that we cannot run concurrently with, replace them.
if (item.replaces(pendingItem.getKey())) {
log.finer("Discarding obsoleted item " + pendingItem.getKey() +
if (item.replaces(pendingItem)) {
log.finer("Discarding obsoleted item " + pendingItem +
" in favor of item " + item);
DISCARDED_COUNTER.labels(item.botName(), item.workItemName()).inc();
pending.remove(pendingItem.getKey());
pending.remove(pendingItem);
// There can't be more than one
break;
}
@@ -266,8 +274,9 @@ private void submitOrSchedule(WorkItem item) {
}
}

executor.submit(new RunnableWorkItem(item));
active.put(item, Instant.now());
RunnableWorkItem runnableWorkItem = new RunnableWorkItem(item);
executor.submit(runnableWorkItem);
active.put(item, runnableWorkItem);
}
}

@@ -370,13 +379,14 @@ private void checkPeriodicItems() {

private void itemWatchdog() {
synchronized (executor) {
for (var activeItem : active.entrySet()) {
var activeDuration = Duration.between(activeItem.getValue(), Instant.now());
if (activeDuration.compareTo(watchdogWarnTimeout) > 0) {
log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration +
" - this may be an error!");
for (var activeRunnableItem : active.values()) {
Instant now = Instant.now();
var timeoutDuration = Duration.between(activeRunnableItem.timeoutWarningTime, now);
if (timeoutDuration.compareTo(watchdogWarnTimeout) > 0) {
log.severe("Item " + activeRunnableItem.item + " with workId " + activeRunnableItem.workId + " has been active more than " +
Duration.between(activeRunnableItem.createTime, now) + " - this may be an error!");
// Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough
activeItem.setValue(Instant.now());
activeRunnableItem.timeoutWarningTime = now;
}
}
// Inform the global watchdog that the scheduler is still executing items