fix: make cron job claiming transactional with claim timeout#134
fix: make cron job claiming transactional with claim timeout#134
Conversation
The FOR UPDATE SKIP LOCKED in GetNextJobsToRun ran outside any transaction, so no row lock was ever acquired. Multiple workers could claim and execute the same job simultaneously. Changes: - Add claimed_until column to cron_jobs for visibility timeout - Replace GetNextJobsToRun with ClaimNextJobsToRun: transactional SELECT ... FOR UPDATE SKIP LOCKED + UPDATE to set far-future next_run_at and claimed_until, all in one tx - Add CompleteJobRun: atomic INSERT run record + UPDATE next_run_at - Add ReapStaleClaims: recover jobs where claim expired mid-execution - Remove fire-and-forget goroutine dispatch; run synchronously - Add reaper ticker to CronWorker for stale claim recovery - Add ClaimTimeout exported constant for test use Fixes finding #7 from project-review-findings.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 8 minutes and 0 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (12)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR fixes distributed cron execution by making job claiming and completion transactional, adding a claim timeout mechanism for crash recovery, and updating the worker/repository APIs accordingly.
Changes:
- Add
claimed_untilto cron jobs (schema + domain) to support claim timeouts and stale-claim recovery. - Replace non-atomic “get due jobs + run + update” flow with transactional
ClaimNextJobsToRunandCompleteJobRun. - Update CronWorker to claim jobs, run them synchronously, and periodically reap stale claims.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/repositories/postgres/migrations/104_add_cron_claim_fields.up.sql | Adds claimed_until column and index to support claim tracking. |
| internal/repositories/postgres/migrations/104_add_cron_claim_fields.down.sql | Rolls back claimed_until column (but currently incomplete vs index). |
| internal/repositories/postgres/cron_repo.go | Implements transactional claim/complete/reap flows for cron jobs. |
| internal/repositories/postgres/cron_repo_test.go | Updates integration test to use ClaimNextJobsToRun. |
| internal/core/services/mock_util_test.go | Updates MockCronRepo to new repository interface. |
| internal/core/services/cron_worker.go | Updates worker to claim jobs, complete runs transactionally, and add a reaper ticker. |
| internal/core/services/cron_worker_test.go | Updates worker tests to the new repo methods and sync execution model. |
| internal/core/ports/cron.go | Updates CronRepository interface for transactional claiming/completion and reaping. |
| internal/core/domain/cron.go | Adds ClaimedUntil to CronJob for visibility timeout tracking. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -0,0 +1,2 @@ | |||
| -- +goose Down | |||
There was a problem hiding this comment.
The down migration drops the claimed_until column but leaves the partial index (idx_cron_jobs_claimed_until) behind. Rollbacks will fail to fully revert schema changes and will keep an unused index around; drop the index in the down migration as well.
| -- +goose Down | |
| -- +goose Down | |
| DROP INDEX IF EXISTS idx_cron_jobs_claimed_until; |
| func (r *PostgresCronRepository) scanCronJobWithTenant(row pgx.Row) (*domain.CronJob, error) { | ||
| var job domain.CronJob | ||
| var status string | ||
| err := row.Scan( | ||
| &job.ID, | ||
| &job.UserID, | ||
| &job.Name, | ||
| &job.Schedule, | ||
| &job.TargetURL, | ||
| &job.TargetMethod, | ||
| &job.TargetPayload, | ||
| &status, | ||
| &job.LastRunAt, | ||
| &job.NextRunAt, | ||
| &job.TenantID, | ||
| &job.CreatedAt, | ||
| &job.UpdatedAt, | ||
| ) |
There was a problem hiding this comment.
scanCronJobWithTenant scans tenant_id into domain.CronJob.TenantID (uuid.UUID). In the schema tenant_id is nullable (added in a later migration) and the current CreateJob insert doesn’t set tenant_id, so rows can contain NULL and this Scan will error ("cannot scan NULL into *uuid.UUID"), breaking ClaimNextJobsToRun. Either ensure cron_jobs.tenant_id is always populated (insert + backfill + NOT NULL) or scan into a nullable type and map NULL to a valid value.
| count := 0 | ||
| for rows.Next() { | ||
| count++ | ||
| } |
There was a problem hiding this comment.
ReapStaleClaims counts returned rows but never checks rows.Err(). If iteration stops due to a driver/network error, this will incorrectly commit and return a partial count with nil error. Check rows.Err() before committing (or use Exec + RowsAffected to avoid RETURNING/row iteration altogether).
| } | |
| } | |
| if err := rows.Err(); err != nil { | |
| return 0, err | |
| } |
|
|
||
| jobs, err := r.scanCronJobsWithTenant(rows) | ||
| if err != nil { | ||
| rows.Close() |
There was a problem hiding this comment.
ClaimNextJobsToRun: scanCronJobsWithTenant already defers rows.Close(). The additional rows.Close() on the error path is redundant and can be removed to avoid double-closing and keep row lifecycle management in one place.
| rows.Close() |
| sched, _ := w.parser.Parse(job.Schedule) | ||
| now := time.Now() | ||
| nextRun := sched.Next(now) | ||
|
|
There was a problem hiding this comment.
completeRun ignores the error from w.parser.Parse(job.Schedule). If a job somehow has an invalid schedule (e.g., created before validation was added or corrupted data), sched will be nil and sched.Next(...) will panic, potentially crashing the worker. Handle the parse error and choose a safe next_run_at (or mark the job failed/paused) instead of ignoring it.
| sched, _ := w.parser.Parse(job.Schedule) | |
| now := time.Now() | |
| nextRun := sched.Next(now) | |
| now := time.Now() | |
| nextRun := now.Add(24 * time.Hour) | |
| sched, err := w.parser.Parse(job.Schedule) | |
| if err != nil { | |
| log.Printf("CronWorker: invalid schedule for job %s: %q: %v; using fallback next run at %s", job.ID, job.Schedule, err, nextRun.Format(time.RFC3339)) | |
| } else { | |
| nextRun = sched.Next(now) | |
| } |
| func (w *CronWorker) ProcessJobs(ctx context.Context) { | ||
| jobs, err := w.repo.GetNextJobsToRun(ctx) | ||
| jobs, err := w.repo.ClaimNextJobsToRun(ctx, ClaimTimeout) | ||
| if err != nil { | ||
| log.Printf("CronWorker: failed to fetch jobs: %v", err) | ||
| log.Printf("CronWorker: failed to claim jobs: %v", err) | ||
| return | ||
| } | ||
|
|
||
| for _, job := range jobs { | ||
| go w.runJob(context.Background(), job) | ||
| w.runJob(ctx, job) | ||
| } |
There was a problem hiding this comment.
ProcessJobs executes claimed jobs sequentially. Since ClaimNextJobsToRun can claim up to 10 jobs with a fixed ClaimTimeout, later jobs in the batch may not start/finish before their claimed_until expires (e.g., 10×30s HTTP timeout ≈ 5m), allowing other workers to reclaim and run duplicates. Also, long job execution blocks the main Run loop (delaying ctx cancellation and the reaper). Consider executing jobs concurrently with a bounded worker pool and/or claiming only as many jobs as can complete within the claim timeout.
- cron_worker.go: handle parse error on invalid schedule with 24h fallback instead of panicking with nil dereference - cron_repo.go: remove redundant rows.Close() on error path since scanCronJobsWithTenant already defers rows.Close() - cron_repo.go: add rows.Err() check before committing in ReapStaleClaims to avoid returning a partial count on driver/network error - 104_add_cron_claim_fields.down.sql: drop index before dropping column
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
docs/database.md:655
- The documented
cron_jobs/cron_job_runsschemas here don’t match the actual migrations underinternal/repositories/postgres/migrations/(e.g., migrations createcron_job_runs(status_code,response,duration_ms,started_at)and addcron_jobs.tenant_idas nullable with no default). Please update this doc block to reflect the real DB schema so operators/users don’t rely on incorrect columns/defaults.
CREATE TABLE cron_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
tenant_id UUID NOT NULL DEFAULT auth.jwt_token().tenant_id,
name VARCHAR(255) NOT NULL,
schedule VARCHAR(100) NOT NULL,
target_url TEXT NOT NULL,
target_method VARCHAR(10) NOT NULL DEFAULT 'POST',
target_payload TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'ACTIVE',
last_run_at TIMESTAMPTZ,
next_run_at TIMESTAMPTZ,
claimed_until TIMESTAMPTZ, -- visibility timeout for distributed claiming
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(user_id, name)
);
CREATE TABLE cron_job_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID NOT NULL REFERENCES cron_jobs(id) ON DELETE CASCADE,
status VARCHAR(50) NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
output TEXT,
error TEXT
);
</details>
---
💡 <a href="/poyrazK/thecloud/new/main?filename=.github/instructions/*.instructions.md" class="Link--inTextBlock" target="_blank" rel="noopener noreferrer">Add Copilot custom instructions</a> for smarter, more guided reviews. <a href="https://docs.github.com/en/copilot/customizing-copilot/adding-repository-custom-instructions-for-github-copilot" class="Link--inTextBlock" target="_blank" rel="noopener noreferrer">Learn how to get started</a>.
| selectQuery := ` | ||
| SELECT id, user_id, name, schedule, target_url, target_method, target_payload, | ||
| status, last_run_at, next_run_at, tenant_id, created_at, updated_at | ||
| FROM cron_jobs | ||
| WHERE status = 'ACTIVE' | ||
| AND next_run_at <= NOW() | ||
| AND (claimed_until IS NULL OR claimed_until <= NOW()) | ||
| ORDER BY next_run_at ASC | ||
| LIMIT 10 | ||
| FOR UPDATE SKIP LOCKED |
There was a problem hiding this comment.
ClaimNextJobsToRun selects/scans tenant_id into CronJob.TenantID (a non-nullable uuid.UUID). In this repo's migrations, cron_jobs.tenant_id was added as a nullable column (071_add_tenant_id_to_resources.up.sql) and CreateJob does not insert a tenant_id, so newly created jobs can have NULL tenant_id and this scan will fail at runtime. Either ensure tenant_id is always populated at write-time (and ideally make it NOT NULL in the DB), or avoid selecting/scanning tenant_id here unless it’s guaranteed non-NULL (e.g., scan into a nullable type/COALESCE).
| - **Worker**: A background goroutine (`CronWorker`) polls every 10 seconds for due jobs via `ClaimNextJobsToRun`. | ||
| - **Transactional claiming**: Jobs are claimed atomically using `FOR UPDATE SKIP LOCKED` inside a `BEGIN...COMMIT` transaction. The `next_run_at` is set to a far-future value (1 year) and a `claimed_until` timestamp is recorded to prevent double-execution across workers. | ||
| - **Atomic completion**: After execution, `CompleteJobRun` atomically inserts the run record and advances `next_run_at` to the true next scheduled time — in a single transaction. | ||
| - **Crash recovery**: A reaper runs every 1 minute to reset stale claims where `claimed_until` has expired (worker died mid-execution). Those jobs are immediately re-queued. | ||
| - **Parsing**: Uses the `robfig/cron/v3` library for reliable cron expression parsing. | ||
| - **Execution**: All jobs are currently "HTTP Targets" - they trigger an HTTP/REST call to a user-defined URL. | ||
| - **Execution**: All jobs are currently "HTTP Targets" — they trigger an HTTP/REST call to a user-defined URL. | ||
|
|
||
| ## Features | ||
| - **Run History**: Every execution's status, code, and duration are recorded in `cron_job_runs`. | ||
| - **States**: Jobs can be `ACTIVE` or `PAUSED`. | ||
| - **Multi-worker safe**: Due to transactional claiming with claim timeouts, the same job cannot be executed by multiple workers simultaneously. |
There was a problem hiding this comment.
The docs claim the claim timeout mechanism means the same job "cannot be executed by multiple workers simultaneously" / prevents double-execution across workers. With a finite claimed_until, if a run exceeds the timeout (or the reaper fires early), another worker can reclaim and execute the same job. Consider clarifying this as “best-effort within the claim timeout” and noting that ClaimTimeout must exceed worst-case execution time to avoid duplicates.
Summary
FOR UPDATE SKIP LOCKEDwas useless — ran outside any transaction, so no row lock was ever acquired. Multiple workers could execute the same job.ClaimNextJobsToRunnow usesBEGIN → SELECT FOR UPDATE SKIP LOCKED → UPDATE claimed_until + far-future next_run_at → COMMITin one transaction.CompleteJobRuncombines run record insert + job state update in one transaction — no more separate non-atomicSaveJobRun+UpdateJobcalls.ReapStaleClaimsresets jobs whereclaimed_untilexpired mid-execution (worker died). Reaper runs every 1 minute.runJobis now synchronous — completion is tracked before returning.Changes
domain/cron.goClaimedUntilfieldports/cron.goGetNextJobsToRun/SaveJobRunwithClaimNextJobsToRun/CompleteJobRun/ReapStaleClaimscron_repo.goClaimNextJobsToRun,CompleteJobRun,ReapStaleClaimscron_worker.goClaimNextJobsToRun, sync execution, add reaper tickercron_worker_test.gomock_util_test.goMockCronRepoto new interfacecron_repo_test.gomigrations/104_*claimed_untilcolumnTest plan
go build ./...go test ./internal/core/services/... -short -run TestCronWorkerFixes finding #7.