pj-rehearse: add concurrency control, changed-files prefilter, and drop notifications#5165
Conversation
|
Pipeline controller notification For optional jobs, comment This repository is configured in: automatic mode |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository YAML (base), Central YAML (inherited) Review profile: CHILL Plan: Enterprise Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds a bounded webhook handler dispatcher with Prometheus metrics, queuing, and drop notifications; and adds prefilter helpers to skip job-analysis when no rehearsal-relevant files changed, wiring those checks into issue-comment and affected-job flows. ChangesHandler Admission Control & Request Drop Notifications
Job Analysis Early-Skip Optimization
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 10 | ❌ 4❌ Failed checks (3 warnings, 1 inconclusive)
✅ Passed checks (10 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/pj-rehearse/main.go`:
- Around line 90-100: The queued request currently holds its queued slot until
run() returns because dequeue() is deferred; change this so the dequeued metric
is released as soon as execution starts by removing the top-level defer
d.dequeue() and instead calling d.dequeue() immediately after successfully
acquiring the execution slot (i.e., right after the case that does
d.executionSlots <- struct{}{}), then call d.run(logger, handler) as before;
keep the existing timeout/drop branch (droppedHandlerRequests, onDrop,
queueTimeout) untouched so dropped requests still increment metrics and trigger
onDrop.
In `@cmd/pj-rehearse/server_test.go`:
- Around line 268-272: The test uses time.Sleep to wait for the goroutine
started with d.dispatch to acquire a slot (via the blocker closure), which
causes flakiness; instead add explicit synchronization: create a ready (or
queued) channel that the dispatching goroutine closes or sends on when it has
acquired the slot or enqueued the job, have the test wait on that channel (with
a timeout) rather than sleeping, and apply the same ready/queued signaling
replacement for the other dispatcher tests that currently use Sleep (the tests
invoking d.dispatch and blocker at the other locations).
In `@cmd/pj-rehearse/server.go`:
- Around line 534-537: The diff currently uses a branch ref name (baseRef) in
shouldAnalyzeRehearsals which can be stale; change the function to diff against
the exact base commit SHA used by prepareCandidate: update
shouldAnalyzeRehearsals signature to accept the base commit SHA (e.g., baseSHA
string) instead of or in addition to baseRef, and replace the
repoClient.Diff(baseRef, "HEAD") call with repoClient.Diff(baseSHA, "HEAD");
ensure callers (including prepareCandidate) pass the same SHA that was fetched
and used for the rebase so both functions use the identical commit SHA.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Central YAML (inherited)
Review profile: CHILL
Plan: Enterprise
Run ID: 02c61700-85e4-496b-ae8f-7f70bb1a65d3
📒 Files selected for processing (3)
cmd/pj-rehearse/main.gocmd/pj-rehearse/server.gocmd/pj-rehearse/server_test.go
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
cmd/pj-rehearse/server.go (1)
531-535:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDiff against the rebased base SHA, not the branch name.
prepareCandidate()checks out the exact base SHA from GitHub, but this helper diffsbaseRef(for example,main) againstHEAD. If the local branch ref is stale or missing, the prefilter can skip rehearsals or fail even though the rebase succeeded. Pass the same base SHA thatprepareCandidate()used.Suggested fix
-func shouldAnalyzeRehearsals(repoClient git.RepoClient, baseRef string, includeRegistryChanges bool, logger *logrus.Entry) (bool, error) { - changedFiles, err := repoClient.Diff(baseRef, "HEAD") +func shouldAnalyzeRehearsals(repoClient git.RepoClient, baseSHA string, includeRegistryChanges bool, logger *logrus.Entry) (bool, error) { + changedFiles, err := repoClient.Diff(baseSHA, "HEAD") if err != nil { - return false, fmt.Errorf("failed to diff changed files from base ref %s: %w", baseRef, err) + return false, fmt.Errorf("failed to diff changed files from base SHA %s: %w", baseSHA, err) }Then update the callers to pass the exact base SHA used during
prepareCandidate().🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cmd/pj-rehearse/server.go` around lines 531 - 535, shouldAnalyzeRehearsals currently diffs the named baseRef (e.g., "main") against HEAD which can be stale; change it to accept and use the exact base SHA that prepareCandidate() checks out so the diff is performed against the rebased commit, and update all callers to pass that same base SHA (ensure the function signature and callers reference shouldAnalyzeRehearsals and prepareCandidate consistently so the exact base SHA is propagated).cmd/pj-rehearse/main.go (1)
82-95:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRelease the queue slot as soon as execution starts.
A request that leaves the queue still counts as queued until
run()returns becausedequeue()is deferred. That inflatespj_rehearse_handlers_queuedand can triggerqueue_fulldrops while nothing is actually waiting.Suggested fix
if !d.tryQueue() { droppedHandlerRequests.WithLabelValues("queue_full").Inc() logger.WithField("max_queue", cap(d.queueSlots)).Warn("Dropping webhook request because handler queue is full") if onDrop != nil { onDrop("queue_full") } return } - defer d.dequeue() select { case d.executionSlots <- struct{}{}: + d.dequeue() d.run(logger, handler) case <-time.After(d.queueTimeout): + d.dequeue() droppedHandlerRequests.WithLabelValues("queue_timeout").Inc() logger.WithField("timeout", d.queueTimeout).Warn("Dropping webhook request because it waited too long in queue") if onDrop != nil { onDrop("queue_timeout") } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cmd/pj-rehearse/main.go` around lines 82 - 95, The deferred dequeue keeps the queue slot occupied until run() returns; remove the defer d.dequeue() and instead release the queue slot as soon as execution starts by calling d.dequeue() immediately after successfully acquiring an execution slot (i.e., inside the select case after sending to d.executionSlots and before calling d.run), so queued metrics (d.queueSlots / pj_rehearse_handlers_queued) reflect actual waiting requests; adjust any error paths similarly to ensure the slot is always released when execution is handed off.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/pj-rehearse/main.go`:
- Around line 390-404: The event registrations currently submit every PR and
issue-comment event into dispatcher.dispatch even when the action would be
ignored; before calling dispatcher.dispatch in
eventServer.RegisterHandlePullRequestEvent and RegisterHandleIssueCommentEvent,
filter by the GitHub event action (e.g., only allow
"opened"/"reopened"/"synchronize" for PRs that should trigger
s.handlePullRequestCreation or s.handleNewPush, and only allow relevant comment
actions/commands for issue comments) and return early if not relevant so
dispatcher.dispatch is never invoked; update the anonymous handlers wrapping
s.handlePullRequestCreation, s.handleNewPush, and s.handleIssueComment to
perform that pre-check and only call notifyDroppedRequest when an event that
would have been dispatched is actually dropped by the dispatcher.
- Around line 108-116: The worker goroutine started in handlerDispatcher.run
must recover from panics thrown by the callback to avoid crashing the process;
update the anonymous goroutine that calls handler() to wrap the handler
invocation with a deferred recover() that catches any panic, logs the panic and
stack trace (use the same logger variable) and allows the goroutine to clean up
(maintaining the existing deferred concurrentHandlersInFlight.Dec(),
<-d.executionSlots and close(done) behavior); specifically modify the anonymous
func in run to add a defer that calls recover(), formats the panic and
runtime.Stack() and emits a logger.Errorf or logger.WithField(...).Errorf so the
panic is recorded but does not kill the process.
---
Duplicate comments:
In `@cmd/pj-rehearse/main.go`:
- Around line 82-95: The deferred dequeue keeps the queue slot occupied until
run() returns; remove the defer d.dequeue() and instead release the queue slot
as soon as execution starts by calling d.dequeue() immediately after
successfully acquiring an execution slot (i.e., inside the select case after
sending to d.executionSlots and before calling d.run), so queued metrics
(d.queueSlots / pj_rehearse_handlers_queued) reflect actual waiting requests;
adjust any error paths similarly to ensure the slot is always released when
execution is handed off.
In `@cmd/pj-rehearse/server.go`:
- Around line 531-535: shouldAnalyzeRehearsals currently diffs the named baseRef
(e.g., "main") against HEAD which can be stale; change it to accept and use the
exact base SHA that prepareCandidate() checks out so the diff is performed
against the rebased commit, and update all callers to pass that same base SHA
(ensure the function signature and callers reference shouldAnalyzeRehearsals and
prepareCandidate consistently so the exact base SHA is propagated).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Central YAML (inherited)
Review profile: CHILL
Plan: Enterprise
Run ID: c1c84ebe-fb99-4f49-b011-cc5cd43c5b22
📒 Files selected for processing (3)
cmd/pj-rehearse/main.gocmd/pj-rehearse/server.gocmd/pj-rehearse/server_test.go
There was a problem hiding this comment.
♻️ Duplicate comments (2)
cmd/pj-rehearse/main.go (2)
108-124:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAdd panic recovery in the worker goroutine.
A panic in
handler()will crash the plugin process. Wrap withrecover()to log and continue.Suggested fix
func (d *handlerDispatcher) run(logger *logrus.Entry, handler func()) { done := make(chan struct{}) go func() { concurrentHandlersInFlight.Inc() defer concurrentHandlersInFlight.Dec() - defer func() { <-d.executionSlots }() - defer close(done) + defer func() { + if r := recover(); r != nil { + logger.WithField("panic", r).Error("webhook handler panicked") + } + <-d.executionSlots + close(done) + }() handler() }()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cmd/pj-rehearse/main.go` around lines 108 - 124, The goroutine started in handlerDispatcher.run can panic inside handler(), which will crash the process; wrap the goroutine's body with a defer that recovers from panic, increments/observes the existing metrics as needed, and logs the panic and stack using the provided logger (use logger.WithField/WithError and include debug stack) so the process continues; ensure recovery runs before the other defers that decrement concurrentHandlersInFlight and release d.executionSlots and that close(done) still executes after recovery.
390-404:⚠️ Potential issue | 🟠 Major | ⚡ Quick winFilter irrelevant events before entering the dispatcher.
Currently all PR and issue-comment events are dispatched, but handlers no-op for most actions (e.g.,
closed,labeled). Under load this wastes dispatcher capacity and may post drop notifications for events that would have been ignored.Suggested fix
eventServer.RegisterHandlePullRequestEvent(func(l *logrus.Entry, event github.PullRequestEvent) { + if event.Action != github.PullRequestActionOpened && event.Action != github.PullRequestActionSynchronize { + return + } dispatcher.dispatch(l, func() { s.handlePullRequestCreation(l, event) s.handleNewPush(l, event) }, func(reason string) { notifyDroppedRequest(s.ghc, event.Repo.Owner.Login, event.Repo.Name, event.Number, event.PullRequest.User.Login, reason, o.queueTimeoutMinutes, false, l) }) }) eventServer.RegisterHandleIssueCommentEvent(func(l *logrus.Entry, event github.IssueCommentEvent) { + if !event.Issue.IsPullRequest() || event.Action != github.IssueCommentActionCreated { + return + } dispatcher.dispatch(l, func() { s.handleIssueComment(l, event) }, func(reason string) { notifyDroppedRequest(s.ghc, event.Repo.Owner.Login, event.Repo.Name, event.Issue.Number, event.Comment.User.Login, reason, o.queueTimeoutMinutes, true, l) }) })🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cmd/pj-rehearse/main.go` around lines 390 - 404, Filter out irrelevant GitHub event actions before calling dispatcher to avoid wasting capacity and spurious drop notifications: inside the RegisterHandlePullRequestEvent callback check event.Action and only call dispatcher.dispatch for PR actions your handlers expect (e.g., "opened", "reopened", "synchronize", "edited"/"ready_for_review" as applicable) and inside RegisterHandleIssueCommentEvent only dispatch for relevant comment actions (e.g., "created", and optionally "edited") and, if appropriate, only for comments on pull requests; keep the existing notifyDroppedRequest usage for events you intentionally skip so dropped-notify behavior remains correct. Target the callbacks registered here (the anonymous funcs passed to eventServer.RegisterHandlePullRequestEvent and RegisterHandleIssueCommentEvent) and gate the dispatcher.dispatch invocation based on event.Action and whether the Issue is a PR.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@cmd/pj-rehearse/main.go`:
- Around line 108-124: The goroutine started in handlerDispatcher.run can panic
inside handler(), which will crash the process; wrap the goroutine's body with a
defer that recovers from panic, increments/observes the existing metrics as
needed, and logs the panic and stack using the provided logger (use
logger.WithField/WithError and include debug stack) so the process continues;
ensure recovery runs before the other defers that decrement
concurrentHandlersInFlight and release d.executionSlots and that close(done)
still executes after recovery.
- Around line 390-404: Filter out irrelevant GitHub event actions before calling
dispatcher to avoid wasting capacity and spurious drop notifications: inside the
RegisterHandlePullRequestEvent callback check event.Action and only call
dispatcher.dispatch for PR actions your handlers expect (e.g., "opened",
"reopened", "synchronize", "edited"/"ready_for_review" as applicable) and inside
RegisterHandleIssueCommentEvent only dispatch for relevant comment actions
(e.g., "created", and optionally "edited") and, if appropriate, only for
comments on pull requests; keep the existing notifyDroppedRequest usage for
events you intentionally skip so dropped-notify behavior remains correct. Target
the callbacks registered here (the anonymous funcs passed to
eventServer.RegisterHandlePullRequestEvent and RegisterHandleIssueCommentEvent)
and gate the dispatcher.dispatch invocation based on event.Action and whether
the Issue is a PR.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Central YAML (inherited)
Review profile: CHILL
Plan: Enterprise
Run ID: 1979fce6-49e3-45e0-ba01-6ccd28420ab9
📒 Files selected for processing (3)
cmd/pj-rehearse/main.gocmd/pj-rehearse/server.gocmd/pj-rehearse/server_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- cmd/pj-rehearse/server_test.go
…op notifications
pj-rehearse was vulnerable to memory spikes (50+ GB) under load because
it handled an unlimited number of webhook requests concurrently, each
loading full CI/Prow configurations.
This commit introduces three features to mitigate the problem:
1. Handler dispatcher with bounded concurrency and queuing:
- Configurable max concurrent handlers (default 5), max queued (50),
queue timeout (5m), and execution timeout (15m) via CLI flags.
- Requests that exceed the queue capacity or wait too long are dropped
with a GitHub comment notifying the user.
- Separate Prometheus metrics for in-flight handlers, queued requests,
drops (by reason), and execution timeouts.
2. Changed-files prefilter (implements DPTP-2888):
- Before loading full configs, diffs the PR against the base branch to
check if any changed files fall under ci-operator/config,
ci-operator/jobs, core-services/prow/02_config, or
ci-operator/step-registry.
- If no rehearsal-relevant paths changed, skips DetermineAffectedJobs
entirely, avoiding the expensive config load.
3. Merged PR event handlers into a single registration:
- handlePullRequestCreation and handleNewPush are now dispatched as
one unit, preventing a single PR event from consuming two dispatcher
slots.
Co-authored-by Cursor
Signed-off-by: Jakub Guzik <jguzik@redhat.com>
|
/lgtm |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: deepsm007, jmguzik The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/test images |
|
@hector-vido: Overrode contexts on behalf of hector-vido: ci/prow/breaking-changes DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Pipeline controller notification No second-stage tests were triggered for this PR. This can happen when:
Use |
|
@jmguzik: all tests passed! Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
pj-rehearse was vulnerable to memory spikes (50+ GB) under load because it handled an unlimited number of webhook requests concurrently, each loading full CI/Prow configurations.
This commit introduces three features to mitigate the problem:
Handler dispatcher with bounded concurrency and queuing:
Changed-files prefilter (implements DPTP-2888):
Merged PR event handlers into a single registration:
Co-authored-by Cursor
Summary by CodeRabbit
New Features
Improvements
Tests