fix: add circuit breaker to table-diff to prevent OOM on node failure#111
fix: add circuit breaker to table-diff to prevent OOM on node failure#111mason-sharp merged 1 commit intomainfrom
Conversation
📝 WalkthroughWalkthroughModified Changes
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Duplication | 0 |
TIP This summary will be updated as you push new changes. Give us feedback
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
internal/consistency/mtree/merkle.go (1)
708-743: Consider adding direct tests for the fetch-row SQL builders.This PR updates the schema/table contract for
buildFetchRowsSQLSimpleandbuildFetchRowsSQLCompositetoo, but the visible test coverage only locks inbuildRowHashQuery. A small unit test around qualified-table generation and placeholder ordering here would make this harder to regress.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/consistency/mtree/merkle.go` around lines 708 - 743, Add unit tests for buildFetchRowsSQLSimple and buildFetchRowsSQLComposite that assert the generated SQL string includes the sanitized qualified table name, the selectCols prefix (commit_ts and node_origin), the ORDER BY token with the provided orderBy value, and that placeholder numbering and args ordering match the input keys; specifically test: (1) buildFetchRowsSQLSimple with one and multiple scalar keys to verify placeholders $1..$N and args ordering, and (2) buildFetchRowsSQLComposite with multi-column PKs and multiple key tuples to verify tuple column order, placeholder sequence ($1..$M across tuples) and corresponding args flattening. Ensure tests check both the returned query string and the args slice to guard against regressions in identifier qualification and placeholder mapping.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 113-115: Reset the circuit-breaker state at the start of
ExecuteTask: clear firstError (guarded by firstErrorMu) and unset errorRecorded
(atomic.Bool) in the same place you reset totalDiffRows and diffLimitTriggered
so a reused TableDiffTask won't short-circuit subsequent runs; ensure
ExecuteTask initializes firstError under firstErrorMu and calls
errorRecorded.Store(false) before worker paths that call shouldStop() execute.
---
Nitpick comments:
In `@internal/consistency/mtree/merkle.go`:
- Around line 708-743: Add unit tests for buildFetchRowsSQLSimple and
buildFetchRowsSQLComposite that assert the generated SQL string includes the
sanitized qualified table name, the selectCols prefix (commit_ts and
node_origin), the ORDER BY token with the provided orderBy value, and that
placeholder numbering and args ordering match the input keys; specifically test:
(1) buildFetchRowsSQLSimple with one and multiple scalar keys to verify
placeholders $1..$N and args ordering, and (2) buildFetchRowsSQLComposite with
multi-column PKs and multiple key tuples to verify tuple column order,
placeholder sequence ($1..$M across tuples) and corresponding args flattening.
Ensure tests check both the returned query string and the args slice to guard
against regressions in identifier qualification and placeholder mapping.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 558a3e26-5de7-4a29-a71f-095acfadf764
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (12)
.github/workflows/test.ymlDockerfiledb/queries/queries.gogo.modinternal/consistency/diff/table_diff.gointernal/consistency/diff/table_rerun.gointernal/consistency/mtree/merkle.gointernal/consistency/mtree/merkle_test.gointernal/consistency/repair/stale_repair.gointernal/consistency/repair/table_repair.gointernal/infra/cdc/listen.gointernal/infra/db/auth.go
5b2c9d6 to
05b4099
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/consistency/diff/table_diff.go (1)
1846-1889: Optional: consider per-query timeouts for split-path queries to fully close the OOM loop.The initial hash phase uses
context.WithTimeout(t.Ctx, 60*time.Second)(L1518) and the sub-range hashes do the same (L2098), butcountQuery(L1848) andmedianQuery(L1878, L1885) ingenerateSubRangesstill run on rawt.Ctx. If a node is degraded but not outright closing connections, these can block longer than intended before any error is recorded to trip the circuit breaker. In practice the newshouldStop()gate at L2050 prevents new entries into this path once any error is recorded, so this PR doesn't regress anything — just flagging as a follow-up for full symmetry with the hash phase.Possible follow-up
- err := pool.QueryRow(t.Ctx, countQuery, args...).Scan(&count) // nosemgrep + countCtx, cancelCount := context.WithTimeout(t.Ctx, 60*time.Second) + err := pool.QueryRow(countCtx, countQuery, args...).Scan(&count) // nosemgrep + cancelCount()Same treatment for the two
medianQueryscans.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/consistency/diff/table_diff.go` around lines 1846 - 1889, generateSubRanges uses pool.QueryRow with the parent context for countQuery and medianQuery (and the two medianQuery scans); wrap each of those calls in a short per-query context timeout (e.g. ctx, cancel := context.WithTimeout(t.Ctx, 60*time.Second); defer cancel()) and pass that ctx into pool.QueryRow instead of t.Ctx. Update the countQuery QueryRow call and both medianQuery QueryRow.Scan branches (single-PK and multi-PK scan) to use the new timeout context so long-running or blocked queries return promptly. Ensure you import context if not already present.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1846-1889: generateSubRanges uses pool.QueryRow with the parent
context for countQuery and medianQuery (and the two medianQuery scans); wrap
each of those calls in a short per-query context timeout (e.g. ctx, cancel :=
context.WithTimeout(t.Ctx, 60*time.Second); defer cancel()) and pass that ctx
into pool.QueryRow instead of t.Ctx. Update the countQuery QueryRow call and
both medianQuery QueryRow.Scan branches (single-PK and multi-PK scan) to use the
new timeout context so long-running or blocked queries return promptly. Ensure
you import context if not already present.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 93c07043-2d06-4691-b951-598b2d9f7b4e
📒 Files selected for processing (1)
internal/consistency/diff/table_diff.go
When a node starts timing out during recursive diff, workers had no mechanism to stop — they continued iterating through every remaining sub-range, each waiting up to 60 s for a context deadline, accumulating error objects and log output until OOM. - Add shouldStop() combining the existing row-limit check with a new error-recorded check, so all workers bail out once any node error is recorded. - Replace all shouldStopDueToLimit() calls in the recursive diff path (recursiveDiff + mismatch dispatch loop) with shouldStop(). - Extend the circuit breaker to the initial hash phase: worker goroutines now drain hashTaskQueue without doing work once shouldStop() is true. - Change continue → return on hash errors inside the sub-range loop so a goroutine stops immediately rather than grinding through remaining sub-ranges. - Back hasError() with an atomic.Bool (errorRecorded) set in recordError, eliminating mutex acquisition on every sub-range iteration in the common no-error path. - Reset errorRecorded and firstError at the start of ExecuteTask so a reused TableDiffTask does not short-circuit subsequent runs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
05b4099 to
7b0e63a
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/consistency/diff/table_diff.go (1)
1514-1518: Optional:returninstead of draining the remaining hash queue.When
shouldStop()flips to true, each worker still iterates every remaining task just to callbar.Increment()andcontinue. BecausehashTaskQueueis buffered tototalHashTasks(line 1508), the dispatcher at lines 1541–1546 will never block even if all workers exit, andinitialHashWg.Wait()still completes afterclose(hashTaskQueue). Swappingcontinueforreturnlets workers exit immediately on trip, which is marginally tidier for very wide range sets. The progress bar being left partially filled is inconsequential sincempb.BarRemoveOnComplete()is set on this bar.Not a correctness issue — feel free to defer.
Suggested change
for task := range hashTaskQueue { if t.shouldStop() { - bar.Increment() - continue + return }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/consistency/diff/table_diff.go` around lines 1514 - 1518, The worker loop reading from hashTaskQueue currently calls bar.Increment() and continue when t.shouldStop() is true; change that behavior to call bar.Increment() and then return so the goroutine exits immediately (locate the loop that ranges over hashTaskQueue and the t.shouldStop() check in table_diff.go), ensuring workers stop without draining the remaining buffered tasks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1514-1518: The worker loop reading from hashTaskQueue currently
calls bar.Increment() and continue when t.shouldStop() is true; change that
behavior to call bar.Increment() and then return so the goroutine exits
immediately (locate the loop that ranges over hashTaskQueue and the
t.shouldStop() check in table_diff.go), ensuring workers stop without draining
the remaining buffered tasks.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: abb111de-463e-42dc-8ddd-ae114727745b
📒 Files selected for processing (1)
internal/consistency/diff/table_diff.go
Summary
When a node starts timing out during a table diff, workers had no mechanism to stop — they continued iterating through every remaining sub-range (each waiting up to 60 s on a context deadline), accumulating error objects until OOM.
shouldStop()combining the existing row-limit check with a new error-recorded flag, replacing allshouldStopDueToLimit()calls in the recursive diff path so all workers bail out once any node error is set.shouldStop()is true.continue→returnon hash errors in the sub-range loop so a goroutine stops immediately rather than grinding through remaining sub-ranges.hasError()with anatomic.Boolto avoid mutex acquisition on every sub-range iteration in the common no-error path.Fixes ACE-184.