Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log errors when WorkItems execute for longer than a specified time #164

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -44,7 +44,6 @@
}

public class BotRunner {

enum TaskPhases {
BEGIN,
END
@@ -91,14 +90,14 @@ public void run() {

// Some of the pending items may now be eligible for execution
var candidateItems = pending.entrySet().stream()
.filter(e -> !e.getValue().isPresent() || !active.contains(e.getValue().get()))
.filter(e -> e.getValue().isEmpty() || !active.containsKey(e.getValue().get()))
.map(Map.Entry::getKey)
.collect(Collectors.toList());

// Try the candidates against the current active set
for (var candidate : candidateItems) {
boolean maySubmit = true;
for (var activeItem : active) {
for (var activeItem : active.keySet()) {
if (!activeItem.concurrentWith(candidate)) {
// Still can't run this candidate, leave it pending
log.finer("Cannot submit candidate " + candidate + " - not concurrent with " + activeItem);
@@ -110,23 +109,21 @@ public void run() {
if (maySubmit) {
pending.remove(candidate);
executor.submit(new RunnableWorkItem(candidate));
active.add(candidate);
active.put(candidate, Instant.now());
log.finer("Submitting candidate: " + candidate);
}
}
}

}
}

private final Map<WorkItem, Optional<WorkItem>> pending;
private final Set<WorkItem> active;
private final Map<WorkItem, Instant> active;
private final Deque<Path> scratchPaths;

private void submitOrSchedule(WorkItem item) {

synchronized (executor) {
for (var activeItem : active) {
for (var activeItem : active.keySet()) {
if (!activeItem.concurrentWith(item)) {

for (var pendingItem : pending.entrySet()) {
@@ -146,12 +143,11 @@ private void submitOrSchedule(WorkItem item) {
}

executor.submit(new RunnableWorkItem(item));
active.add(item);
active.put(item, Instant.now());
}
}

private void drain(Duration timeout) throws TimeoutException {

Instant start = Instant.now();

while (Instant.now().isBefore(start.plus(timeout))) {
@@ -181,6 +177,7 @@ private void drain(Duration timeout) throws TimeoutException {
}
try {
Thread.sleep(1);
watchdog();
} catch (InterruptedException e) {
log.warning("Exception during queue drain");
log.throwing("BotRunner", "drain", e);
@@ -200,7 +197,7 @@ public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
this.bots = bots;

pending = new HashMap<>();
active = new HashSet<>();
active = new HashMap<>();
scratchPaths = new LinkedList<>();

for (int i = 0; i < config.concurrency(); ++i) {
@@ -229,6 +226,20 @@ private void checkPeriodicItems() {
}
}

private void watchdog() {
synchronized (executor) {
for (var activeItem : active.entrySet()) {
var activeDuration = Duration.between(activeItem.getValue(), Instant.now());
if (activeDuration.compareTo(config.watchdogTimeout()) > 0) {
log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration +
" - this may be an error!");
// Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough
activeItem.setValue(Instant.now());
}
}
}
}

private void processRestRequest(JSONValue request) {
log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
log.fine("Request: " + request);
@@ -263,6 +274,8 @@ public void run() {
}
}

executor.scheduleAtFixedRate(this::watchdog, 0,
config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);
executor.scheduleAtFixedRate(this::checkPeriodicItems, 0,
config.scheduledExecutionPeriod().toMillis(), TimeUnit.MILLISECONDS);

@@ -263,4 +263,13 @@ Path scratchFolder() {
}
return Optional.of(config.get("webhooks").get("port").asInt());
}

Duration watchdogTimeout() {
if (!config.contains("runner") || !config.get("runner").contains("watchdog")) {
log.info("No WorkItem watchdog timeout defined, using default value");
return Duration.ofHours(1);
} else {
return Duration.parse(config.get("runner").get("watchdog").asString());
}
}
}
@@ -22,17 +22,19 @@
*/
package org.openjdk.skara.bot;

import org.openjdk.skara.json.JSON;

import org.junit.jupiter.api.*;
import org.openjdk.skara.host.HostedRepository;
import org.openjdk.skara.json.*;

import java.nio.file.*;
import java.nio.file.Path;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.logging.*;

import static org.junit.jupiter.api.Assertions.*;

class TestWorkItem implements WorkItem {
private final ConcurrencyCheck concurrencyCheck;
private final String description;
@@ -75,12 +77,36 @@ public String toString() {
}
}

class TestBlockedWorkItem implements WorkItem {
private final CountDownLatch countDownLatch;

TestBlockedWorkItem(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public boolean concurrentWith(WorkItem other) {
return false;
}

@Override
public void run(Path scratchPath) {
System.out.println("Starting to wait...");;
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Done waiting");
}
}

class TestBot implements Bot {

private final List<WorkItem> items;
private final Supplier<List<WorkItem>> itemSupplier;

TestBot(TestWorkItem... items) {
TestBot(WorkItem... items) {
this.items = Arrays.asList(items);
itemSupplier = null;
}
@@ -122,6 +148,14 @@ private BotRunnerConfiguration config() {
}
}

private BotRunnerConfiguration config(String json) {
var config = JSON.parse(json).asObject();
try {
return BotRunnerConfiguration.parse(config);
} catch (ConfigurationError configurationError) {
throw new RuntimeException(configurationError);
}
}
@Test
void simpleConcurrent() throws TimeoutException {
var item1 = new TestWorkItem(i -> true, "Item 1");
@@ -131,8 +165,8 @@ void simpleConcurrent() throws TimeoutException {

runner.runOnce(Duration.ofSeconds(10));

Assertions.assertTrue(item1.hasRun);
Assertions.assertTrue(item2.hasRun);
assertTrue(item1.hasRun);
assertTrue(item2.hasRun);
}

@Test
@@ -144,8 +178,8 @@ void simpleSerial() throws TimeoutException {

runner.runOnce(Duration.ofSeconds(10));

Assertions.assertTrue(item1.hasRun);
Assertions.assertTrue(item2.hasRun);
assertTrue(item1.hasRun);
assertTrue(item2.hasRun);
}

@Test
@@ -160,7 +194,7 @@ void moreItemsThanScratchPaths() throws TimeoutException {
runner.runOnce(Duration.ofSeconds(10));

for (var item : items) {
Assertions.assertTrue(item.hasRun);
assertTrue(item.hasRun);
}
}

@@ -195,8 +229,8 @@ void periodItemsThrow() throws TimeoutException {
Assertions.assertFalse(item2.hasRun);

new BotRunner(config(), List.of(bot)).runOnce(Duration.ofSeconds(10));
Assertions.assertTrue(item1.hasRun);
Assertions.assertTrue(item2.hasRun);
assertTrue(item1.hasRun);
assertTrue(item2.hasRun);
}

@Test
@@ -210,10 +244,10 @@ void discardAdditionalBlockedItems() throws TimeoutException {

runner.runOnce(Duration.ofSeconds(10));

Assertions.assertTrue(item1.hasRun);
assertTrue(item1.hasRun);
Assertions.assertFalse(item2.hasRun);
Assertions.assertFalse(item3.hasRun);
Assertions.assertTrue(item4.hasRun);
assertTrue(item4.hasRun);
}

@Test
@@ -230,12 +264,44 @@ void dontDiscardDifferentBlockedItems() throws TimeoutException {

runner.runOnce(Duration.ofSeconds(10));

Assertions.assertTrue(item1.hasRun);
assertTrue(item1.hasRun);
Assertions.assertFalse(item2.hasRun);
Assertions.assertFalse(item3.hasRun);
Assertions.assertTrue(item4.hasRun);
assertTrue(item4.hasRun);
Assertions.assertFalse(item5.hasRun);
Assertions.assertFalse(item6.hasRun);
Assertions.assertTrue(item7.hasRun);
assertTrue(item7.hasRun);
}

@Test
void watchdogTrigger() throws TimeoutException {
var countdownLatch = new CountDownLatch(1);
var item = new TestBlockedWorkItem(countdownLatch);
var bot = new TestBot(item);
var runner = new BotRunner(config("{ \"runner\": { \"watchdog\": \"PT0.01S\" } }"), List.of(bot));

var errors = new ArrayList<String>();
var log = Logger.getLogger("org.openjdk.skara.bot");
log.addHandler(new Handler() {
@Override
public void publish(LogRecord record) {
if (record.getLevel().equals(Level.SEVERE)) {
errors.add(record.getMessage());
}
}

@Override
public void flush() {
}

@Override
public void close() throws SecurityException {
}
});

assertThrows(TimeoutException.class, () -> runner.runOnce(Duration.ofMillis(100)));
assertTrue(errors.size() > 0);
assertTrue(errors.size() <= 10);
countdownLatch.countDown();
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.