Skip to content

refactor: reconcile events in parallel#4178

Merged
chrisgacsal merged 5 commits into
mainfrom
refactor/event-reconcile
Apr 27, 2026
Merged

refactor: reconcile events in parallel#4178
chrisgacsal merged 5 commits into
mainfrom
refactor/event-reconcile

Conversation

@chrisgacsal
Copy link
Copy Markdown
Collaborator

@chrisgacsal chrisgacsal commented Apr 20, 2026

Overview

Use worker pool for processing in-flight notification events in parallel.

Fixes #(issue)

Notes for reviewer

Summary by CodeRabbit

  • New Features
    • Configurable ReconcilerWorkers to control concurrent notification processing.
    • Notification reconciliation now uses a bounded worker pool for improved throughput; a sensible default is auto-detected from host parallelism.
  • Bug Fixes
    • Improved lock handling during reconciliation to reduce conflicts and failures.
  • Tests
    • Sample configuration/test updated to include reconcilerWorkers.

@chrisgacsal chrisgacsal self-assigned this Apr 20, 2026
@chrisgacsal chrisgacsal requested a review from a team as a code owner April 20, 2026 08:02
@chrisgacsal chrisgacsal added the release-note/misc Miscellaneous changes label Apr 20, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 20, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

New configurable ReconcilerWorkers (defaulting to runtime GOMAXPROCS) added; reconciliation now runs with a bounded worker pool (semaphore + goroutines + WaitGroup). Notification event handler wiring updated to accept a Postgres pgdriver.Driver and initialize a session lockr passed into the event handler.

Changes

Cohort / File(s) Summary
Configuration & Tests
app/config/notification.go, app/config/config_test.go, app/config/testdata/complete.yaml
Add ReconcilerWorkers int to notification config, register notification.reconcilerWorkers default, update test expectation and test YAML (reconcilerWorkers: 10).
Common wiring
app/common/notification.go, cmd/server/wire_gen.go
NewNotificationEventHandler signature gains driver *pgdriver.Driver; initializes Postgres session lockr and passes ReconcilerWorkers and Lockr into eventhandler.New. Injector callsite updated.
Defaults & Config struct
openmeter/notification/eventhandler.go, openmeter/notification/eventhandler/handler.go
Add DefaultReconcilerWorkers = runtime.GOMAXPROCS(0); eventhandler.Config gains ReconcilerWorkers int; New defaults zero to DefaultReconcilerWorkers and sets handler workerPoolSize.
Reconciliation runtime
openmeter/notification/eventhandler/reconcile.go
Handler.Reconcile switched from transaction-based single-threaded flow to concurrent processing: uses TryLockWithScopes, acquires/releases lock, uses a weighted semaphore (workerPoolSize) to bound concurrency, spawns goroutines for reconcileEvent, logs errors inside goroutines, and waits with a WaitGroup; computes nextAttemptBefore once per reconciliation.

Sequence Diagram(s)

sequenceDiagram
  participant Scheduler as Scheduler (Ticker)
  participant Handler as EventHandler
  participant Store as EventStore
  participant Lockr as SessionLockr
  participant WorkerPool as Semaphore (workerPoolSize)
  participant Reconciler as reconcileEvent
  participant Logger as Logger

  Scheduler->>Handler: Trigger Reconcile()
  Handler->>Lockr: TryLockWithScopes()
  alt lock acquired
    Handler->>Store: ListEvents(nextAttemptBefore, page...)
    Store-->>Handler: Events page
    loop each event
      Handler->>WorkerPool: Acquire slot (weighted)
      WorkerPool-->>Handler: slot granted
      Handler->>Reconciler: spawn goroutine -> reconcileEvent(event)
      Reconciler-->>Logger: log error (if any)
      Reconciler->>WorkerPool: release slot
    end
    Handler->>Handler: Wait for goroutines (WaitGroup)
    Handler->>Lockr: Release()
  else lock not acquired
    Handler->>Logger: log lock acquisition failure
  end
  Handler-->>Scheduler: Reconcile finished
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested reviewers

  • gergely-kurucz-konghq
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately captures the main change: refactoring event reconciliation to use parallel processing via a worker pool.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/event-reconcile

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
app/config/notification.go (1)

31-42: ⚠️ Potential issue | 🟡 Minor

Consider validating ReconcilerWorkers is non-negative.

The value is eventually passed into semaphore.NewWeighted(int64(config.ReconcilerWorkers)). If a user sets this to a negative number in config, Acquire(1) will block forever (or worse with a large negative value), and 0 is already being remapped to the default in eventhandler.New, so a config-time guard would give a clearer error than silent hangs.

🛡️ Suggested validation
 func (c NotificationConfiguration) Validate() error {
 	var errs []error

 	if c.Consumer.Validate(); err != nil {
 		errs = append(errs, errorsx.WithPrefix(err, "consumer"))
 	}

+	if c.ReconcilerWorkers < 0 {
+		errs = append(errs, errors.New("reconcilerWorkers must be non-negative"))
+	}
+
 	return errors.Join(errs...)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/config/notification.go` around lines 31 - 42, The Validate method for
NotificationConfiguration should reject negative ReconcilerWorkers to prevent
semaphore.NewWeighted from being given a negative value; update
NotificationConfiguration.Validate() to check c.ReconcilerWorkers and return an
error (with a clear prefix like "reconciler_workers") if it's negative (allow
zero since eventhandler.New remaps zero to default), so callers get a
config-time error instead of runtime hangs when
semaphore.NewWeighted(int64(config.ReconcilerWorkers)) is used.
openmeter/notification/eventhandler/reconcile.go (2)

100-135: ⚠️ Potential issue | 🔴 Critical

🐛 Critical: data race on the shared err variable.

err is declared once by out, err := h.repo.ListEvents(...) at line 101 and is then reassigned on line 119 (err = workerPool.Acquire(...)) and again inside the goroutine on line 127 (if err = h.reconcileEvent(...)). With multiple goroutines in flight plus the outer loop still iterating, several writers are racing on the same variable. This is both a real race (will fire under -race) and a correctness issue — a goroutine's error could overwrite the outer err or vice versa.

Use fresh locals:

🐛 Proposed fix
-				for _, event := range out.Items {
-					err = workerPool.Acquire(ctx, 1)
-					if err != nil {
-						return fmt.Errorf("failed to acquire worker from pool: %w", err)
-					}
-
-					wg.Go(func() {
-						defer workerPool.Release(1)
-
-						if err = h.reconcileEvent(ctx, &event); err != nil {
-							h.logger.ErrorContext(ctx, "failed to reconcile notification event",
-								"namespace", event.Namespace,
-								"notification.event.id", event.ID,
-								"error", err.Error(),
-							)
-						}
-					})
-				}
+				for _, event := range out.Items {
+					if err := workerPool.Acquire(ctx, 1); err != nil {
+						wg.Wait() // drain in-flight work before returning
+						return fmt.Errorf("failed to acquire worker from pool: %w", err)
+					}
+
+					event := event // belt-and-braces; safe on Go 1.22+
+					wg.Go(func() {
+						defer workerPool.Release(1)
+
+						if err := h.reconcileEvent(ctx, &event); err != nil {
+							h.logger.ErrorContext(ctx, "failed to reconcile notification event",
+								"namespace", event.Namespace,
+								"notification.event.id", event.ID,
+								"error", err.Error(),
+							)
+						}
+					})
+				}

Note the extra wg.Wait() before returning on Acquire failure — otherwise the function returns (and the transaction rolls back) while goroutines are still using ctx/tx.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/notification/eventhandler/reconcile.go` around lines 100 - 135, The
code races on the shared err and range variable; fix by using fresh locals and
waiting on the waitgroup before returning on Acquire failure: replace uses of
the shared err with new local variables (e.g. localErr :=
workerPool.Acquire(...)) and inside the goroutine use a new local error variable
(e.g. if err := h.reconcileEvent(ctx, &localEvent); err != nil { ... }) so you
don't write to the outer err, and capture the loop value with a copy (e.g.
localEvent := event and pass &localEvent to h.reconcileEvent) to avoid the
range-variable address reuse; also if workerPool.Acquire returns an error, call
wg.Wait() before returning to ensure all started goroutines complete.

71-148: ⚠️ Potential issue | 🟠 Major

⚠️ Concurrent use of the same transaction across goroutines.

The whole loop runs inside transaction.RunWithNoValue(ctx, h.repo, ...), which sets a tx driver on the context. When h.reconcileEvent is invoked from multiple goroutines all sharing that same ctx, every event's DB work (e.g., UpdateEventDeliveryStatus) picks up the same underlying tx/connection via GetDriverFromContext. PostgreSQL connections and ent transactions aren't safe for concurrent use — you can get "conn busy" errors, interleaved queries, or subtle corruption.

A few options to fix:

  1. Run each reconcileEvent in its own transaction — start a fresh tx inside each worker goroutine. The outer wrapper would then only scope the list + the lock.
  2. Keep the outer tx only for locking and listing, then spawn reconcileEvent goroutines outside that tx scope (each one doing its own tx as needed).
  3. If the intent is to reconcile everything atomically, parallelism on the same tx won't work — the loop needs to stay sequential.

Also, consider swapping the manual semaphore + WaitGroup for golang.org/x/sync/errgroup with g.SetLimit(h.workerPoolSize). It handles the limit, wait, and early-cancel-on-error cleanly in one shot.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/notification/eventhandler/reconcile.go` around lines 71 - 148, The
current Handler.Reconcile runs the listing and then spawns goroutines that call
h.reconcileEvent while still inside transaction.RunWithNoValue(ctx, h.repo,
...), causing every worker to share the same DB tx/connection (via
GetDriverFromContext) which is unsafe; fix by limiting the outer transaction to
only the lock+list (move transaction.RunWithNoValue so it does the lock and
ListEvents only), then spawn workers outside that tx and have each worker start
its own transaction for DB work (or alternatively always call a per-worker repo
method that begins/commits its own tx inside h.reconcileEvent). Also replace the
current incorrect sync.WaitGroup usage and manual semaphore with
golang.org/x/sync/errgroup: use eg, set eg.SetLimit(h.workerPoolSize) and launch
goroutines with eg.Go to manage concurrency and error propagation instead of
workerPool/semaphore.NewWeighted and wg.Go/wg.Done.
🧹 Nitpick comments (3)
openmeter/notification/eventhandler/handler.go (1)

135-137: Default fallback is good — consider also guarding against negative values.

If a user supplies a negative ReconcilerWorkers, it propagates all the way into semaphore.NewWeighted and will cause Acquire to hang forever. Either treat <= 0 as "use default" here, or validate at the config layer (see the separate comment on app/config/notification.go).

♻️ Minimal in-place guard
-	if config.ReconcilerWorkers == 0 {
+	if config.ReconcilerWorkers <= 0 {
 		config.ReconcilerWorkers = notification.DefaultReconcilerWorkers
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/notification/eventhandler/handler.go` around lines 135 - 137, The
code currently only replaces ReconcilerWorkers when it equals 0, allowing
negative values to reach semaphore.NewWeighted and hang Acquire; change the
guard in the handler that sets config.ReconcilerWorkers so that any value <= 0
is replaced with notification.DefaultReconcilerWorkers (i.e., check
config.ReconcilerWorkers <= 0 instead of == 0), or alternatively add validation
earlier in the config parsing to reject negative ReconcilerWorkers; update any
related comments/tests to reflect the <= 0 behavior and ensure
semaphore.NewWeighted always receives a positive value.
openmeter/notification/eventhandler/reconcile.go (2)

90-98: Nice micro-optimization on nextAttemptBefore.

Hoisting clock.Now().Add(-1 * nextAttemptDelay) out of the pagination loop makes the cutoff consistent across pages, which is actually a small correctness win (previously a slow reconciler could drift its cutoff between pages and miss/re-see rows at page boundaries). 👍

One tiny thought: if h.workerPoolSize is ever 0 (e.g. someone constructs Handler without going through New), semaphore.NewWeighted(0) will make Acquire(1) block forever. Since the New path already defaults it, this is only a defensive concern — still might be worth a guard or a doc comment on the struct field.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/notification/eventhandler/reconcile.go` around lines 90 - 98, The
semaphore is created with semaphore.NewWeighted(h.workerPoolSize) which will
deadlock Acquire(1) if h.workerPoolSize is 0; guard by clamping or defaulting
the size when creating the semaphore (e.g. compute a local size :=
h.workerPoolSize; if size <= 0 { size = 1 }) and pass that to
semaphore.NewWeighted, or return an error before reconciling when
h.workerPoolSize is invalid; reference h.workerPoolSize, semaphore.NewWeighted
and Acquire to locate the change and consider adding a short struct doc comment
about the non-zero requirement.

71-148: Recommended refactor: errgroup with SetLimit makes this a lot cleaner.

As a follow-up idea (not blocking), the combination of weighted semaphore + WaitGroup + manual acquire/release here is exactly what errgroup.Group.SetLimit was built for. Sketch:

import "golang.org/x/sync/errgroup"

g, gctx := errgroup.WithContext(ctx)
g.SetLimit(int(h.workerPoolSize))

// ... pagination loop ...
for _, event := range out.Items {
    event := event
    g.Go(func() error {
        if err := h.reconcileEvent(gctx, &event); err != nil {
            h.logger.ErrorContext(gctx, "failed to reconcile notification event",
                "namespace", event.Namespace,
                "notification.event.id", event.ID,
                "error", err.Error(),
            )
        }
        return nil // swallow per-event errors as before
    })
}

if err := g.Wait(); err != nil {
    return err
}

Fewer moving parts, correct wait semantics by construction, and easy to later promote to "cancel on first error" if desired. Combine this with the per-goroutine tx fix from the other comment and the whole loop becomes quite tidy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/notification/eventhandler/reconcile.go` around lines 71 - 148, The
Reconcile function uses a weighted semaphore + sync.WaitGroup pattern that is
verbose and error-prone; replace this with golang.org/x/sync/errgroup by
creating g, gctx := errgroup.WithContext(ctx), call
g.SetLimit(int(h.workerPoolSize)), then inside the pagination loop for each
event capture event := event and use g.Go(func() error { if err :=
h.reconcileEvent(gctx, &event); err != nil { h.logger.ErrorContext(gctx, ... ) }
return nil }), and after the loop replace the incorrect wg.Done() with if err :=
g.Wait(); err != nil { return err } to preserve concurrency limits and correct
wait semantics (refer to Reconcile, reconcileEvent, workerPool/semaphore usage
and wg).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@openmeter/notification/eventhandler/reconcile.go`:
- Around line 143-146: The "Wait for all workers to finish" block incorrectly
calls wg.Done() which causes a negative WaitGroup counter or returns before
workers finish; replace the call with wg.Wait() so the current goroutine
actually waits for the worker goroutines to complete, and verify the existing
worker creation (the variable wg and any use of wg.Add/wg.Go) still matches
(i.e., remove any extra Done calls and ensure each worker performs its own Done
if you used Add manually or rely on wg.Go which handles Add/Done).

---

Outside diff comments:
In `@app/config/notification.go`:
- Around line 31-42: The Validate method for NotificationConfiguration should
reject negative ReconcilerWorkers to prevent semaphore.NewWeighted from being
given a negative value; update NotificationConfiguration.Validate() to check
c.ReconcilerWorkers and return an error (with a clear prefix like
"reconciler_workers") if it's negative (allow zero since eventhandler.New remaps
zero to default), so callers get a config-time error instead of runtime hangs
when semaphore.NewWeighted(int64(config.ReconcilerWorkers)) is used.

In `@openmeter/notification/eventhandler/reconcile.go`:
- Around line 100-135: The code races on the shared err and range variable; fix
by using fresh locals and waiting on the waitgroup before returning on Acquire
failure: replace uses of the shared err with new local variables (e.g. localErr
:= workerPool.Acquire(...)) and inside the goroutine use a new local error
variable (e.g. if err := h.reconcileEvent(ctx, &localEvent); err != nil { ... })
so you don't write to the outer err, and capture the loop value with a copy
(e.g. localEvent := event and pass &localEvent to h.reconcileEvent) to avoid the
range-variable address reuse; also if workerPool.Acquire returns an error, call
wg.Wait() before returning to ensure all started goroutines complete.
- Around line 71-148: The current Handler.Reconcile runs the listing and then
spawns goroutines that call h.reconcileEvent while still inside
transaction.RunWithNoValue(ctx, h.repo, ...), causing every worker to share the
same DB tx/connection (via GetDriverFromContext) which is unsafe; fix by
limiting the outer transaction to only the lock+list (move
transaction.RunWithNoValue so it does the lock and ListEvents only), then spawn
workers outside that tx and have each worker start its own transaction for DB
work (or alternatively always call a per-worker repo method that begins/commits
its own tx inside h.reconcileEvent). Also replace the current incorrect
sync.WaitGroup usage and manual semaphore with golang.org/x/sync/errgroup: use
eg, set eg.SetLimit(h.workerPoolSize) and launch goroutines with eg.Go to manage
concurrency and error propagation instead of workerPool/semaphore.NewWeighted
and wg.Go/wg.Done.

---

Nitpick comments:
In `@openmeter/notification/eventhandler/handler.go`:
- Around line 135-137: The code currently only replaces ReconcilerWorkers when
it equals 0, allowing negative values to reach semaphore.NewWeighted and hang
Acquire; change the guard in the handler that sets config.ReconcilerWorkers so
that any value <= 0 is replaced with notification.DefaultReconcilerWorkers
(i.e., check config.ReconcilerWorkers <= 0 instead of == 0), or alternatively
add validation earlier in the config parsing to reject negative
ReconcilerWorkers; update any related comments/tests to reflect the <= 0
behavior and ensure semaphore.NewWeighted always receives a positive value.

In `@openmeter/notification/eventhandler/reconcile.go`:
- Around line 90-98: The semaphore is created with
semaphore.NewWeighted(h.workerPoolSize) which will deadlock Acquire(1) if
h.workerPoolSize is 0; guard by clamping or defaulting the size when creating
the semaphore (e.g. compute a local size := h.workerPoolSize; if size <= 0 {
size = 1 }) and pass that to semaphore.NewWeighted, or return an error before
reconciling when h.workerPoolSize is invalid; reference h.workerPoolSize,
semaphore.NewWeighted and Acquire to locate the change and consider adding a
short struct doc comment about the non-zero requirement.
- Around line 71-148: The Reconcile function uses a weighted semaphore +
sync.WaitGroup pattern that is verbose and error-prone; replace this with
golang.org/x/sync/errgroup by creating g, gctx := errgroup.WithContext(ctx),
call g.SetLimit(int(h.workerPoolSize)), then inside the pagination loop for each
event capture event := event and use g.Go(func() error { if err :=
h.reconcileEvent(gctx, &event); err != nil { h.logger.ErrorContext(gctx, ... ) }
return nil }), and after the loop replace the incorrect wg.Done() with if err :=
g.Wait(); err != nil { return err } to preserve concurrency limits and correct
wait semantics (refer to Reconcile, reconcileEvent, workerPool/semaphore usage
and wg).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 8a7baa46-7f7e-4b5b-87c3-a88f7978ff2c

📥 Commits

Reviewing files that changed from the base of the PR and between 13d183b and 7a855fb.

📒 Files selected for processing (7)
  • app/common/notification.go
  • app/config/config_test.go
  • app/config/notification.go
  • app/config/testdata/complete.yaml
  • openmeter/notification/eventhandler.go
  • openmeter/notification/eventhandler/handler.go
  • openmeter/notification/eventhandler/reconcile.go

Comment thread openmeter/notification/eventhandler/reconcile.go
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
openmeter/notification/eventhandler/reconcile.go (3)

100-148: ⚠️ Potential issue | 🟡 Minor

🟠 Per-event errors are now silently swallowed from the EventReconciler contract

Previously reconcileEvent errors propagated out of Reconcile and got logged by the ticker loop in handler.go (and surfaced through the EventReconciler interface). Now they're only logged inside the goroutine, so:

  • the interface still declares Reconcile(ctx) error, but only infra failures (lock, ListEvents, Acquire) can ever produce a non-nil return;
  • any caller relying on the returned error for metrics/alerting will silently see "success" even when every event in a page failed.

If that's intentional (seems to match the PR intent of not letting one bad event abort the batch), cool — but consider aggregating per-event failures into a joined error on return, or at minimum incrementing a failure counter/metric so this doesn't become invisible. Worth a line of code comment too for the next person reading this.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/notification/eventhandler/reconcile.go` around lines 100 - 148, The
Reconcile implementation swallows per-event errors (errors from h.reconcileEvent
called in the goroutine) so Reconcile only returns infra errors; fix by
collecting per-event failures and surface them after wg.Wait: e.g., create a
concurrency-safe error aggregator (atomic counter or errors channel) referenced
near workerPool.Acquire / wg.Go, increment or send when reconcileEvent returns
an error (keep the existing h.logger.ErrorContext), then after wg.Wait check the
aggregator and return a combined error (or fmt.Errorf("n events failed")) so
Reconcile (and the EventReconciler) reflects per-event failures; ensure symbols
mentioned are reconcileEvent, Reconcile, EventReconciler, workerPool.Acquire,
wg.Go and preserve current logging.

118-145: ⚠️ Potential issue | 🟠 Major

🟠 Early return on Acquire failure leaks in-flight goroutines past the tx

If workerPool.Acquire returns an error (e.g. ctx cancelled), we bail out of the transaction function at line 121 without ever hitting wg.Wait() at line 145. The tx then rolls back / commits while previously-spawned workers are still happily using its ctx and tx-bound repo. At best you get log spam from failed queries on a closed tx; at worst, undefined behavior depending on the driver.

Suggested shape
-			for {
+			var loopErr error
+			for {
 				out, err := h.repo.ListEvents(ctx, notification.ListEventsInput{
 					...
 				})
 				if err != nil {
-					return fmt.Errorf("failed to fetch notification delivery statuses for reconciliation: %w", err)
+					loopErr = fmt.Errorf("failed to fetch notification delivery statuses for reconciliation: %w", err)
+					break
 				}
 				...
 				for _, event := range out.Items {
 					if err := workerPool.Acquire(ctx, 1); err != nil {
-						return fmt.Errorf("failed to acquire worker from pool: %w", err)
+						loopErr = fmt.Errorf("failed to acquire worker from pool: %w", err)
+						break
 					}
 					...
 				}
 				...
 			}
 
-			// Wait for all workers to finish
 			wg.Wait()
-			return nil
+			return loopErr

Same idea: always wg.Wait() on the way out so no goroutine outlives the enclosing tx.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/notification/eventhandler/reconcile.go` around lines 118 - 145,
When workerPool.Acquire(ctx, 1) fails we must not return immediately because
spawned goroutines may still be using the transaction context; instead capture
the acquire error (e.g. set a local acquireErr), stop creating new goroutines
(break the loop), and after the loop call wg.Wait() before returning the error.
Concretely: in the loop around out.Items, replace the early return on
workerPool.Acquire with logic that assigns the error to a new variable
(acquireErr), breaks out of the pagination/item loop, and after wg.Wait() check
and return acquireErr (or the original err) so all goroutines using
reconcileEvent and workerPool.Release complete before the transaction function
exits.

73-148: ⚠️ Potential issue | 🔴 Critical

Concurrent database transaction access + data race on error variable

The whole paging loop runs inside transaction.RunWithNoValue, which means every goroutine spawned via wg.Go shares the same transaction context. Here's the thing: database/sql.Tx isn't goroutine-safe, so concurrent reads/writes from different goroutines can corrupt the driver state or cause interleaved statement issues.

Additionally, there are two concrete bugs:

  1. Data race on err (line 119 and 127): The outer err is written to from within the goroutine at line 127 without synchronization. Should use := inside the goroutine to create a local variable instead.

  2. Goroutine leak (line 120-121): If workerPool.Acquire fails, the function returns early without calling wg.Wait(), so any goroutines already spawned will keep running after the function exits and the transaction is committed/closed.

The safest fix: move the parallelism outside the transaction. Fetch the page within a short transaction, commit it, then fan out the reconcileEvent calls as independent operations (each can manage its own transaction if needed). This sidesteps the concurrency issue entirely and avoids the goroutine leak.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/notification/eventhandler/reconcile.go` around lines 73 - 148, The
paging + worker fan-out is running inside transaction.RunWithNoValue which
allows goroutines to share the same DB transaction and causes unsafe concurrent
access plus a data race on the outer err and potential goroutine leaks; fix by
moving parallel reconciliation outside the transaction: use RunWithNoValue only
to acquire the lock and fetch a single page (via h.repo.ListEvents) then
commit/return and spawn the workerPool/wg and call h.reconcileEvent for that
page outside the transaction (each reconcileEvent call should open its own
transaction if needed); also ensure the goroutine uses a local error variable
(use := inside the goroutine) and never return from the function before wg.Wait
completes (i.e., don't spawn workers while still in the transaction and avoid
early returns on workerPool.Acquire failures).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@openmeter/notification/eventhandler/reconcile.go`:
- Around line 118-134: The goroutine is writing to the loop-scoped err variable
causing a data race; fix by shadowing err inside the closure and by capturing
the per-iteration event copy. Specifically, in the loop around
workerPool.Acquire and wg.Go, create local copies (e.g., ev := event and use a
locally declared err within the goroutine) so the goroutine calls
h.reconcileEvent(ctx, &ev) and assigns to its own err variable (not the outer
one); leave workerPool.Release(1) and the logging call intact but reference the
local err when logging.

---

Outside diff comments:
In `@openmeter/notification/eventhandler/reconcile.go`:
- Around line 100-148: The Reconcile implementation swallows per-event errors
(errors from h.reconcileEvent called in the goroutine) so Reconcile only returns
infra errors; fix by collecting per-event failures and surface them after
wg.Wait: e.g., create a concurrency-safe error aggregator (atomic counter or
errors channel) referenced near workerPool.Acquire / wg.Go, increment or send
when reconcileEvent returns an error (keep the existing h.logger.ErrorContext),
then after wg.Wait check the aggregator and return a combined error (or
fmt.Errorf("n events failed")) so Reconcile (and the EventReconciler) reflects
per-event failures; ensure symbols mentioned are reconcileEvent, Reconcile,
EventReconciler, workerPool.Acquire, wg.Go and preserve current logging.
- Around line 118-145: When workerPool.Acquire(ctx, 1) fails we must not return
immediately because spawned goroutines may still be using the transaction
context; instead capture the acquire error (e.g. set a local acquireErr), stop
creating new goroutines (break the loop), and after the loop call wg.Wait()
before returning the error. Concretely: in the loop around out.Items, replace
the early return on workerPool.Acquire with logic that assigns the error to a
new variable (acquireErr), breaks out of the pagination/item loop, and after
wg.Wait() check and return acquireErr (or the original err) so all goroutines
using reconcileEvent and workerPool.Release complete before the transaction
function exits.
- Around line 73-148: The paging + worker fan-out is running inside
transaction.RunWithNoValue which allows goroutines to share the same DB
transaction and causes unsafe concurrent access plus a data race on the outer
err and potential goroutine leaks; fix by moving parallel reconciliation outside
the transaction: use RunWithNoValue only to acquire the lock and fetch a single
page (via h.repo.ListEvents) then commit/return and spawn the workerPool/wg and
call h.reconcileEvent for that page outside the transaction (each reconcileEvent
call should open its own transaction if needed); also ensure the goroutine uses
a local error variable (use := inside the goroutine) and never return from the
function before wg.Wait completes (i.e., don't spawn workers while still in the
transaction and avoid early returns on workerPool.Acquire failures).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 6f9c8ab2-2972-4850-a188-872fda6f54f3

📥 Commits

Reviewing files that changed from the base of the PR and between 7a855fb and f80df86.

📒 Files selected for processing (7)
  • app/common/notification.go
  • app/config/config_test.go
  • app/config/notification.go
  • app/config/testdata/complete.yaml
  • openmeter/notification/eventhandler.go
  • openmeter/notification/eventhandler/handler.go
  • openmeter/notification/eventhandler/reconcile.go
✅ Files skipped from review due to trivial changes (2)
  • app/config/testdata/complete.yaml
  • app/config/config_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • openmeter/notification/eventhandler.go
  • app/config/notification.go
  • openmeter/notification/eventhandler/handler.go
  • app/common/notification.go

Comment thread openmeter/notification/eventhandler/reconcile.go Outdated
@chrisgacsal chrisgacsal force-pushed the refactor/event-reconcile branch from d0193ca to bcb80d9 Compare April 27, 2026 19:49
@chrisgacsal chrisgacsal marked this pull request as ready for review April 27, 2026 19:49
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/common/notification.go`:
- Around line 79-80: Validate config.ReconcilerWorkers before assigning it to
ReconcilerWorkers: ensure it is a positive integer (or at least non-negative
depending on intended semantics) and reject or clamp invalid values so the
reconciler never receives a negative semaphore size; update the initialization
that sets ReconcilerWorkers (where config.ReconcilerWorkers is assigned
alongside Lockr/sessionLockr) to either return an error for bad config or
default to a safe minimum (e.g., 1) so the weighted semaphore used by the
reconciler cannot be constructed with a negative capacity.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 009e3c5e-6534-440b-8ccb-b0d91ba1098e

📥 Commits

Reviewing files that changed from the base of the PR and between 0bd359c and bcb80d9.

📒 Files selected for processing (5)
  • app/common/notification.go
  • app/config/config_test.go
  • app/config/notification.go
  • app/config/testdata/complete.yaml
  • cmd/server/wire_gen.go
✅ Files skipped from review due to trivial changes (1)
  • app/config/testdata/complete.yaml
🚧 Files skipped from review as they are similar to previous changes (2)
  • app/config/config_test.go
  • app/config/notification.go

Comment thread app/common/notification.go
@chrisgacsal chrisgacsal merged commit 5d6b2c1 into main Apr 27, 2026
28 checks passed
@chrisgacsal chrisgacsal deleted the refactor/event-reconcile branch April 27, 2026 22:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note/misc Miscellaneous changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants