Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1191: CommitCommentWorkItem overwhelms scheduler #1223

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -176,7 +176,7 @@ private void runMeasured() {
synchronized (executor) {
if (scratchPaths.isEmpty()) {
log.finer("No scratch paths available - postponing " + item);
pending.put(item, Optional.empty());
addPending(item, null);
return;
}
scratchPath = scratchPaths.removeFirst();
@@ -208,7 +208,7 @@ private void runMeasured() {

synchronized (executor) {
scratchPaths.addLast(scratchPath);
active.remove(item);
done(item);

// Some of the pending items may now be eligible for execution
var candidateItems = pending.entrySet().stream()
@@ -229,10 +229,8 @@ private void runMeasured() {
}

if (maySubmit) {
pending.remove(candidate);
RunnableWorkItem runnableWorkItem = new RunnableWorkItem(candidate);
executor.submit(runnableWorkItem);
active.put(candidate, runnableWorkItem);
removePending(candidate);
submit(candidate);
log.finer("Submitting candidate: " + candidate);
}
}
@@ -250,6 +248,16 @@ private void runMeasured() {
Counter.name("skara_runner_scheduled").labels("bot", "work_item").register();
private static final Counter.WithTwoLabels DISCARDED_COUNTER =
Counter.name("skara_runner_discarded").labels("bot", "work_item").register();
/**
* Gauge that tracks the number of active WorkItems for each kind
*/
private final Gauge.WithTwoLabels activeGauge =
Gauge.name("skara_runner_active").labels("bot", "work_item").register();
/**
* Gauge that tracks the number of pending WorkItems for each kind
*/
private final Gauge.WithTwoLabels pendingGauge =
Gauge.name("skara_runner_pending").labels("bot", "work_item").register();

private void submitOrSchedule(WorkItem item) {
SCHEDULED_COUNTER.labels(item.botName(), item.workItemName()).inc();
@@ -263,23 +271,57 @@ private void submitOrSchedule(WorkItem item) {
log.finer("Discarding obsoleted item " + pendingItem +
" in favor of item " + item);
DISCARDED_COUNTER.labels(item.botName(), item.workItemName()).inc();
pending.remove(pendingItem);
removePending(pendingItem);
// There can't be more than one
break;
}
}

pending.put(item, Optional.of(activeItem));
addPending(item, activeItem);
return;
}
}

RunnableWorkItem runnableWorkItem = new RunnableWorkItem(item);
executor.submit(runnableWorkItem);
active.put(item, runnableWorkItem);
submit(item);
}
}

/**
* Called to add a WorkItem to the pending queue
* @param item Item to queue
* @param activeItem Optional active item that this item is waiting for
*/
private void addPending(WorkItem item, WorkItem activeItem) {
pending.put(item, Optional.ofNullable(activeItem));
pendingGauge.labels(item.botName(), item.workItemName()).inc();
}

/**
* Called to remove an item from the pending queue.
*/
private void removePending(WorkItem item) {
pending.remove(item);
pendingGauge.labels(item.botName(), item.workItemName()).dec();
}

/**
* Called to submit a WorkItem for execution
*/
private void submit(WorkItem item) {
RunnableWorkItem runnableWorkItem = new RunnableWorkItem(item);
executor.submit(runnableWorkItem);
active.put(item, runnableWorkItem);
activeGauge.labels(item.botName(), item.workItemName()).inc();
}

/**
* Called when a WorkItem is done executing
*/
private void done(WorkItem item) {
active.remove(item);
activeGauge.labels(item.botName(), item.workItemName()).dec();
}

private void drain(Duration timeout) throws TimeoutException {
Instant start = Instant.now();

@@ -53,7 +53,7 @@ public boolean concurrentWith(WorkItem other) {
return true;
}

return !repo.webUrl().equals(((CSRBot) other).repo.webUrl());
return !repo.isSame(((CSRBot) other).repo);
}

private String describe(PullRequest pr) {
@@ -81,11 +81,10 @@ public boolean allowedInCommit() {

@Override
public boolean concurrentWith(WorkItem other) {
if (!(other instanceof CommitCommandWorkItem)) {
if (!(other instanceof CommitCommandWorkItem otherItem)) {
return true;
}
CommitCommandWorkItem otherItem = (CommitCommandWorkItem) other;
if (!bot.repo().webUrl().equals(otherItem.bot.repo().webUrl())) {
if (!bot.repo().isSame(otherItem.bot.repo())) {
return true;
}
if (!commitComment.id().equals(otherItem.commitComment.id())) {
@@ -49,7 +49,13 @@ class CommitCommentsWorkItem implements WorkItem {

@Override
public boolean concurrentWith(WorkItem other) {
return true;
if (!(other instanceof CommitCommentsWorkItem otherItem)) {
return true;
}
if (!repo.isSame(otherItem.repo)) {
return true;
}
return false;
}

private boolean isAncestor(ReadOnlyRepository repo, Hash ancestor, Hash descendant) {
@@ -131,4 +131,11 @@ default URI reviewUrl(Hash hash) {

return null;
}

/**
* Returns true if this HostedRepository represents the same repo as the other.
*/
default boolean isSame(HostedRepository other) {
return name().equals(other.name()) && forge().name().equals(other.forge().name());
}
}
@@ -168,8 +168,6 @@ public interface PullRequest extends Issue {
* Returns true if this PullRequest represents the same pull request as the other.
*/
default boolean isSame(PullRequest other) {
return id().equals(other.id())
&& repository().name().equals(other.repository().name())
&& repository().forge().name().equals(other.repository().forge().name());
return id().equals(other.id()) && repository().isSame(other.repository());
}
}