fix: handle recovery in queue processing#57
Conversation
Summary by CodeRabbit
Summary by CodeRabbit
WalkthroughRenamed the job error handler to exported Changes
Sequence Diagram(s)sequenceDiagram
participant Queue
participant Goroutine
participant Job
participant Logger
Queue->>Goroutine: spawn job execution
activate Goroutine
Note right of Goroutine: defer { recover -> format log } (NEW)
Goroutine->>Job: invoke Process()
alt Process returns error
Job->>Job: HandlerError(reason)
Job-->>Goroutine: status handled
else Process panics
Goroutine->>Logger: recover panic -> fmt.Sprintf -> q.formatLog (NEW)
Logger-->>Goroutine: fatal/log emitted
end
deactivate Goroutine
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
1c917fc to
278848f
Compare
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
queue.go (1)
256-266: Data race on finishedJob slice in Retry().Multiple goroutines append to finishedJob without synchronization. This is a race and can corrupt the slice.
- var finishedJob []string + var finishedJob []string + var mu sync.Mutex for i := range numJobs { job := numJobs[i] wg.Add(1) go func(job *Job) { defer wg.Done() q.jobFnc(job) if job.IsFinished() { - finishedJob = append(finishedJob, job.Id) + mu.Lock() + finishedJob = append(finishedJob, job.Id) + mu.Unlock() } }(job) }
🧹 Nitpick comments (4)
queue.go (1)
181-183: Avoid deferring cancel() inside the processing loop.Deferring cancel in the loop delays timer cleanup until Run returns, leaking timers per batch. Call cancel() after the select instead.
- ctx, cancel := context.WithTimeout(context.Background(), q.config.Timeout) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), q.config.Timeout) ... select { case <-done: q.formatLog(LoggerInfo, "All jobs done\n") case <-ctx.Done(): q.MarkJobFailedTimeout(numJobs) } + // Clean up the timer promptly per-iteration + cancel()queue_test.go (1)
181-183: Test_Crash may still fail if recovered panics are logged with Fatal.If the production code logs recovered panics with LoggerFatal, the process exits (os.Exit) and tests abort. After applying the non-fatal logging fix in queue.go, this should be stable. If you need a stopgap, set Logger to LoggerInfo or LoggerDisabled here.
userQueue := queue.New("crash", &queue.Options{ @@ - RetryFailures: 3, + RetryFailures: 3, + // Optional: keep CI resilient even if logging behavior changes + // Logger: queue.LoggerInfo, })Also applies to: 194-197
job.go (2)
73-79: Optional: capture stack trace for diagnostics on panic.You already convert the panic into a failure via HandlerError. Capturing debug.Stack() here improves postmortem debugging.
import ( "context" "fmt" + "runtime/debug" "time" ) @@ defer func() { if r := recover(); r != nil { - failedReason := fmt.Sprintf("%v", r) - job.HandlerError(failedReason) + job.Stacktrace = append(job.Stacktrace, string(debug.Stack())) + job.HandlerError(fmt.Sprintf("%v", r)) } }()
89-105: Use the queue’s context when storing errors.HandlerError should use job.queue.ctx (or a passed-in ctx) instead of context.Background(), to respect caller cancellation and timeouts.
- _, err := client.HSet(context.Background(), job.queue.Name, job.Id, job.FailedReason).Result() + _, err := client.HSet(job.queue.ctx, job.queue.Name, job.Id, job.FailedReason).Result()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (4)
go.mod(1 hunks)job.go(2 hunks)queue.go(2 hunks)queue_test.go(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
queue.go (1)
constant.go (1)
LoggerFatal(8-8)
queue_test.go (1)
queue.go (1)
AddJobOptions(110-114)
🔇 Additional comments (1)
go.mod (1)
3-5: Verify minimum Go 1.24 across CI/dev; forward compatibility with 1.25+ confirmed.The
go 1.24.0directive is correct, but the concern about "builds will fail" without go1.24.x is imprecise. Builds succeed on go 1.25+ (verified: system running go 1.25.1 resolves all modules and compiles successfully). The actual requirement is go ≥ 1.24.0; versions older than 1.24.0 will cause build failures.Edge cases to verify in your CI fleet:
- If any runner is on macOS 11: Go 1.25+ drops macOS 11 support; stay on 1.24.x or upgrade runners to macOS 12+.
- If tests are floating-point sensitive (AMD64): Go 1.25+ uses FMA instructions, which may alter precision; confirm test stability.
- If using cgo with C bindings: Go 1.25 enables ASAN leak detection by default; may flag previously silent leaks (disable with ASAN_OPTIONS if needed).
Otherwise, ensuring runners have go1.24.0+ (including 1.25+) is sufficient.
| defer func() { | ||
| if r := recover(); r != nil { | ||
| failedReason := fmt.Sprintf("%v", r) | ||
| q.formatLog(LoggerFatal, "Error when processing job: %v\n", failedReason) | ||
| } | ||
| }() |
There was a problem hiding this comment.
Recovered panic should not terminate the process; also mark the job failed/delayed.
- log.Fatalf in a recover path exits the process, defeating recovery and likely breaking Test_Crash.
- The job isn’t marked failed/delayed here, so an early panic (before job.Process) leaves it stuck in Wait.
Fix by logging non-fatally and recording the failure via HandlerError:
- defer func() {
- if r := recover(); r != nil {
- failedReason := fmt.Sprintf("%v", r)
- q.formatLog(LoggerFatal, "Error when processing job: %v\n", failedReason)
- }
- }()
+ defer func() {
+ if r := recover(); r != nil {
+ // Ensure the job transitions out of Wait/Active
+ job.HandlerError(fmt.Sprint(r))
+ // Do not exit the process; just log the incident
+ q.formatLog(LoggerInfo, "Recovered panic while processing job %s: %v\n", job.Id, r)
+ }
+ }()There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
job.go (1)
73-78: Consider capturing the stack trace for panics.The defer-recover block correctly handles panics, but it only captures the panic value without the stack trace. The
Jobstruct has aStacktracefield (line 27) that could be populated here for better debugging.Apply this diff to capture the stack trace:
+import ( + "runtime/debug" +) + defer func() { if r := recover(); r != nil { failedReason := fmt.Sprintf("%v", r) + job.Stacktrace = append(job.Stacktrace, string(debug.Stack())) job.HandlerError(failedReason) } }()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (4)
go.mod(1 hunks)job.go(2 hunks)queue.go(3 hunks)queue_test.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- queue.go
- queue_test.go
- go.mod
🔇 Additional comments (3)
job.go (3)
69-87: LGTM: Consistent error handling flow.The
Processmethod now handles both panic recovery (line 76) and normal errors (line 85) through the sameHandlerErrormethod, providing a consistent error handling path.
89-89: Let me search for any documentation or test references to confirm whether this export is intentional:Verify that exporting HandlerError is intentional and necessary.
Based on a search of the codebase,
HandlerErroris not called from any external packages—only internally withinjob.go. According to Go best practices, unexported methods are a perfectly good design choice when they're not needed to satisfy an interface, keeping them from becoming part of the externally visible API.Consider whether this method should remain exported or be reverted to an unexported method, as minimizing the exported API surface keeps the package's usage clear and intentions explicit.
94-94: Breaking change confirmed: Redis error storage key was simplified, but collision concern is unsupported.The change from
<queue>storeto<queue>is intentional (commit 278848f). However, the review's concern about collisions is unfounded—no other data in the codebase usesqueue.Nameas a Redis key. The application itself won't break since error data is never read back.Verify: Is this simplification intentional? If so, document it as a breaking change for any external tools reading the old
<queue>storekey and provide migration guidance for existing deployments.
No description provided.