@@ -58,7 +58,7 @@ enum TaskPhases {
END
}
private AtomicInteger workIdCounter = new AtomicInteger ();
private final AtomicInteger workIdCounter = new AtomicInteger ();
private class RunnableWorkItem implements Runnable {
private static final Counter .WithThreeLabels EXCEPTIONS_COUNTER =
@@ -71,6 +71,11 @@ private class RunnableWorkItem implements Runnable {
Gauge .name ("skara_runner_allocated_bytes" ).labels ("bot" , "work_item" ).register ();
private final WorkItem item ;
private final int workId = workIdCounter .incrementAndGet ();
private final Instant createTime = Instant .now ();
// This gets updated by the watchdog when a timeout occurs to avoid
// repeating the timeout log messages too often.
private Instant timeoutWarningTime = createTime ;
RunnableWorkItem (WorkItem wrappedItem ) {
item = wrappedItem ;
@@ -179,7 +184,7 @@ private void runMeasured() {
Collection <WorkItem > followUpItems = null ;
try (var __ = new LogContext (Map .of ("work_item" , item .toString (),
"work_id" , String .valueOf (workIdCounter . incrementAndGet () )))) {
"work_id" , String .valueOf (workId )))) {
log .log (Level .FINE , "Executing item " + item + " on repository " + scratchPath , TaskPhases .BEGIN );
try {
followUpItems = item .run (scratchPath );
@@ -225,17 +230,20 @@ private void runMeasured() {
if (maySubmit ) {
pending .remove (candidate );
executor .submit (new RunnableWorkItem (candidate ));
active .put (candidate , Instant .now ());
RunnableWorkItem runnableWorkItem = new RunnableWorkItem (candidate );
executor .submit (runnableWorkItem );
active .put (candidate , runnableWorkItem );
log .finer ("Submitting candidate: " + candidate );
}
}
}
}
}
// Mapping of pending items to the active item preventing them from running
private final Map <WorkItem , Optional <WorkItem >> pending ;
private final Map <WorkItem , Instant > active ;
// Mapping of active WorkItem to their RunnableWorkItem
private final Map <WorkItem , RunnableWorkItem > active ;
private final Deque <Path > scratchPaths ;
private static final Counter .WithTwoLabels SCHEDULED_COUNTER =
@@ -249,13 +257,13 @@ private void submitOrSchedule(WorkItem item) {
for (var activeItem : active .keySet ()) {
if (!activeItem .concurrentWith (item )) {
for (var pendingItem : pending .entrySet ()) {
for (var pendingItem : pending .keySet ()) {
// If there are pending items of the same type that we cannot run concurrently with, replace them.
if (item .replaces (pendingItem . getKey () )) {
log .finer ("Discarding obsoleted item " + pendingItem . getKey () +
if (item .replaces (pendingItem )) {
log .finer ("Discarding obsoleted item " + pendingItem +
" in favor of item " + item );
DISCARDED_COUNTER .labels (item .botName (), item .workItemName ()).inc ();
pending .remove (pendingItem . getKey () );
pending .remove (pendingItem );
// There can't be more than one
break ;
}
@@ -266,8 +274,9 @@ private void submitOrSchedule(WorkItem item) {
}
}
executor .submit (new RunnableWorkItem (item ));
active .put (item , Instant .now ());
RunnableWorkItem runnableWorkItem = new RunnableWorkItem (item );
executor .submit (runnableWorkItem );
active .put (item , runnableWorkItem );
}
}
@@ -370,13 +379,14 @@ private void checkPeriodicItems() {
private void itemWatchdog () {
synchronized (executor ) {
for (var activeItem : active .entrySet ()) {
var activeDuration = Duration .between (activeItem .getValue (), Instant .now ());
if (activeDuration .compareTo (watchdogWarnTimeout ) > 0 ) {
log .severe ("Item " + activeItem .getKey () + " has been active more than " + activeDuration +
" - this may be an error!" );
for (var activeRunnableItem : active .values ()) {
Instant now = Instant .now ();
var timeoutDuration = Duration .between (activeRunnableItem .timeoutWarningTime , now );
if (timeoutDuration .compareTo (watchdogWarnTimeout ) > 0 ) {
log .severe ("Item " + activeRunnableItem .item + " with workId " + activeRunnableItem .workId + " has been active more than " +
Duration .between (activeRunnableItem .createTime , now ) + " - this may be an error!" );
// Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough
activeItem . setValue ( Instant . now ()) ;
activeRunnableItem . timeoutWarningTime = now ;
}
}
// Inform the global watchdog that the scheduler is still executing items
03e8d58
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review
Issues