fix(job-queue): follow-up correctness fixes to PR #511#513
Conversation
…te abort check Three follow-up fixes to PR #511 that the commit message claimed but that did not land in the code: 1. RateLimiter slot leak (CRITICAL): validateJobState() failures now call limiter.release() (not complete()) before rethrow, and the outer finally is gated on a limiterReleased flag so the slot is not double-decremented. Without this, every DEADLINE-EXCEEDED or pre-aborted job permanently consumed a RateLimiter window slot. 2. disableJob dispatch (HIGH): JobDisabledError now routes through disableJob() instead of falling into failJob(), so attempting to disable a job no longer clobbers the DISABLED status with FAILED. The H5 atomic-disable code path is now reachable. 3. Pre-execute abort check (HIGH): createAbortController(job.id) is moved before validateJobState() so the pre-execute abort flag check at the top of validateJobState (which reads activeJobAbortControllers) is reachable. Previously the controller was created after validation, making the branch dead code. Adds JobQueueWorker.test.ts with regressions for all three.
… backfill Two follow-up fixes to PR #511: 1. IndexedDbQueueStorage.abort(PENDING) (CRITICAL): replaced `this.complete(job)` with a direct `put()` that sets status=FAILED, abort_requested_at, completed_at WITHOUT bumping attempts. Matches the cross-backend contract verified in InMemoryQueueStorage/PostgresQueueStorage and asserted in a new genericJobQueueTests case. 2. IndexedDB v2 migration (CRITICAL): v2 previously copied only run_after → visible_at, leaving run_attempts, last_ran_at, max_retries, worker_id orphaned on upgrade. The storage layer reads the post-rename names, so existing browser-deployed queues silently lost retry budgets, last-attempt timestamps, and lease ownership on first migration. Extend the cursor body to migrate all five renames idempotently. Move the queue_status_visible_at createIndex into the terminal cursor branch so the index is built off post-migration rows. Adds IndexedDbQueueMigrations.integration test for the five-rename case and a cross-backend abort(PENDING) attempts-stability assertion.
PR #511 lowered the default retry budget from 20 (Postgres) / 23 (SQLite v1) to 10. Postgres v3 explicitly applied `ALTER COLUMN max_attempts SET DEFAULT 10`; SQLite v3 renamed the column but did not adjust the default. Fresh SQLite installs ended up at default 23, Postgres at 10, so callers omitting maxAttempts got divergent retry behavior across backends. SQLite has no `ALTER COLUMN ... SET DEFAULT` syntax, so the fix uses the documented 12-step table-rebuild procedure (https://www.sqlite.org/lang_altertable.html#otheralter): build the new CREATE TABLE statement from the post-rename PRAGMA table_info, swap max_attempts's default literal, copy rows, drop old, rename new, recreate indexes. The rebuild is gated on the current default not already being '10' so re-running v3 is idempotent. Extend the migrations parity integration test to compare DEFAULTS, not just column names, so future drift is caught.
…ackends PR #511 added Number.isFinite guards to Supabase only; Postgres, SQLite, InMemoryQueueStorage, and IndexedDB passed leaseMs / ms directly into new Date(Date.now() + ms).toISOString() (yields "Invalid Date" for NaN/Infinity, poisoning the row) or into parameterized SQL fragments (runtime error for non-finite). A negative leaseMs immediately re-expires the lease a worker just claimed. Extract validation into a shared validateLeaseMs() helper in @workglow/job-queue; call it at the top of every next() and extendLease() across all 5 backends. Migrate Supabase from its inline Error throw to the shared RangeError so all backends report the same exception type. ms === 0 remains valid (instant expiry). Adds a cross-backend contract test that runs against every backend via the shared genericJobQueueTests harness.
@workglow/cli
@workglow/ai
@workglow/browser-control
@workglow/indexeddb
@workglow/javascript
@workglow/job-queue
@workglow/knowledge-base
@workglow/mcp
@workglow/storage
@workglow/task-graph
@workglow/tasks
@workglow/util
workglow
@workglow/anthropic
@workglow/bun-webview
@workglow/chrome-ai
@workglow/electron
@workglow/google-gemini
@workglow/huggingface-inference
@workglow/huggingface-transformers
@workglow/node-llama-cpp
@workglow/ollama
@workglow/openai
@workglow/playwright
@workglow/postgres
@workglow/sqlite
@workglow/supabase
@workglow/tf-mediapipe
commit: |
Coverage Report
File CoverageNo changed files found. |
There was a problem hiding this comment.
Pull request overview
Four follow-up correctness fixes for PR #511's job-queue refactor: a RateLimiter slot leak plus two unreachable dispatch branches in JobQueueWorker.processSingleJob, an IndexedDB v2 backfill that previously dropped four of five renamed fields, an IndexedDB abort(PENDING) that incorrectly bumped attempts, a SQLite v3 max_attempts default mismatch with Postgres, and missing leaseMs/extendLease input validation across non-Supabase backends consolidated into a shared validateLeaseMs helper.
Changes:
JobQueueWorker.processSingleJob: release limiter slot on validation failure (gated bylimiterReleased), routeJobDisabledErrorthroughdisableJob, register abort controller beforevalidateJobState.- IndexedDB v2 migration: backfill all five legacy field renames in one cursor pass and defer
queue_status_visible_atindex creation;IndexedDbQueueStorage.abort(PENDING)writes directly viaput()instead ofcomplete()to avoid bumpingattempts. - SQLite v3 migration: rebuild table via 12-step procedure to lower
max_attemptsdefault 23→10 for Postgres parity; new sharedvalidateLeaseMs(throwsRangeError) wired into Postgres/SQLite/InMemory/IndexedDB/Supabasenext()andextendLease().
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/job-queue/src/job/JobQueueWorker.ts | Three regressions fixed in processSingleJob: limiter release on validation failure, JobDisabledError dispatch, pre-execute abort controller registration. |
| packages/job-queue/src/queue-storage/validateLeaseMs.ts | New shared helper throwing RangeError for non-finite/negative ms. |
| packages/job-queue/src/common.ts | Re-export the new helper. |
| packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts | Call validateLeaseMs in next / extendLease. |
| packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts | Validate lease ms inputs; abort(PENDING) uses put() to avoid bumping attempts. |
| packages/indexeddb/src/migrations/indexedDbQueueMigrations.ts | v2 migrates all five legacy renames; index creation moved into terminal cursor branch. |
| providers/postgres/src/job-queue/PostgresQueueStorage.ts | Validate lease ms inputs. |
| providers/sqlite/src/job-queue/SqliteQueueStorage.ts | Validate lease ms inputs. |
| providers/sqlite/src/migrations/sqliteQueueMigrations.ts | v3 rebuilds table to set max_attempts DEFAULT 10 for Postgres parity. |
| providers/supabase/src/job-queue/SupabaseQueueStorage.ts | Replace inline Error throws with shared validateLeaseMs. |
| packages/test/src/test/job-queue/JobQueueWorker.test.ts | New regression tests for the three processSingleJob fixes. |
| packages/test/src/test/job-queue/genericJobQueueTests.ts | Cross-backend lease-ms validation tests and abort(PENDING) no-bump contract test. |
| packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts | Verifies all five legacy renames are migrated. |
| packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts | Cross-backend defaults parity test (max_attempts=10, attempts=0). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| it("pre-execute abort flag is observed during validateJobState", async () => { | ||
| // Before the fix, createAbortController() ran AFTER validateJobState, | ||
| // so the activeJobAbortControllers.get(...).signal.aborted branch in | ||
| // validateJobState was dead code. With the controller registered | ||
| // first, an abort fired before start() / right at start time is | ||
| // observable and the job fails with AbortSignalJobError + | ||
| // abort_requested_at set. | ||
| const server = new JobQueueServer<TI, TO, TJob>(TJob, { | ||
| storage: storage as any, | ||
| queueName, | ||
| pollIntervalMs: 5, | ||
| stopTimeoutMs: 0, | ||
| }); | ||
| const client = new JobQueueClient<TI, TO>({ storage: storage as any, queueName }); | ||
| client.attach(server); | ||
|
|
||
| const handle = await client.send({ taskType: "long_running", data: "pre-abort" }); | ||
| // Abort before any worker has claimed the row — this PENDING-abort | ||
| // path sets the row to FAILED with abort_requested_at set immediately. | ||
| await storage.abort(handle.id); | ||
|
|
||
| await server.start(); | ||
|
|
||
| const reached = await waitUntil(async () => { | ||
| const j = await storage.get(handle.id); | ||
| return ( | ||
| j?.status === JobStatus.FAILED || | ||
| j?.status === JobStatus.COMPLETED || | ||
| j?.status === JobStatus.DISABLED | ||
| ); | ||
| }); | ||
| expect(reached).toBe(true); | ||
|
|
||
| await server.stop(); | ||
|
|
||
| const final = await storage.get(handle.id); | ||
| expect(final?.status).toBe(JobStatus.FAILED); | ||
| expect(final?.abort_requested_at).toBeTruthy(); | ||
| }); |
| const postRenameInfos: ColInfo[] = db | ||
| .prepare<[], ColInfo>(`PRAGMA table_info(${tableName})`) | ||
| .all(); | ||
| const maxAttemptsCol = postRenameInfos.find((c) => c.name === "max_attempts"); | ||
| if (maxAttemptsCol && maxAttemptsCol.dflt_value !== "10") { | ||
| // Build a new CREATE TABLE statement from the post-rename | ||
| // table_info, swapping max_attempts's default. Preserving the | ||
| // existing types / NOT NULL / PK / other defaults keeps the | ||
| // rebuild a true no-op for every other column. | ||
| const columnDefs = postRenameInfos | ||
| .map((c) => { | ||
| const parts: string[] = [c.name, c.type || ""]; | ||
| if (c.pk) { | ||
| parts.push("PRIMARY KEY"); | ||
| } | ||
| if (c.notnull) { | ||
| parts.push("NOT NULL"); | ||
| } | ||
| const dflt = | ||
| c.name === "max_attempts" ? "10" : c.dflt_value !== null ? c.dflt_value : null; | ||
| if (dflt !== null) { | ||
| parts.push(`DEFAULT ${dflt}`); | ||
| } | ||
| return parts.filter((p) => p.length > 0).join(" "); | ||
| }) | ||
| .join(",\n "); | ||
| const colList = postRenameInfos.map((c) => c.name).join(", "); | ||
| const newTable = `${tableName}__new_v3`; | ||
| db.exec(` | ||
| CREATE TABLE ${newTable} ( | ||
| ${columnDefs} | ||
| ); | ||
| INSERT INTO ${newTable} (${colList}) SELECT ${colList} FROM ${tableName}; | ||
| DROP TABLE ${tableName}; | ||
| ALTER TABLE ${newTable} RENAME TO ${tableName}; | ||
|
|
||
| CREATE INDEX IF NOT EXISTS job_queue_fetcher${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, status, visible_at); | ||
| CREATE INDEX IF NOT EXISTS job_queue_fingerprint${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, fingerprint, status); | ||
| CREATE INDEX IF NOT EXISTS job_queue_job_run_id${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, job_run_id); | ||
| `); | ||
| } |
|
@copilot apply changes based on the comments in this thread |
Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com>
Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com>
Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com>
Applied the thread follow-ups in 7eebbda. The worker regression test now aborts the active controller before validation so it actually proves the pre-execute path, and the SQLite v3 rebuild now uses a shared canonical CREATE TABLE helper instead of reconstructing DDL from PRAGMA metadata. |
Four follow-up correctness fixes for PR #511 (refactor(job-queue): drop legacy limiter methods, fix QueuedExecutionStrategy release, rename releaseClaim). Each fix is a separate commit so they can be reviewed and reverted independently.
Commit 1 —
fix(job-queue): RateLimiter slot leak, disableJob dispatch, pre-execute abort checkThree regressions in
JobQueueWorker.processSingleJobthat PR #511's commit message claimed to fix but that did not land in the code:validateJobState()failures now calllimiter.release()(notcomplete()) before rethrow, and the outerfinallyis gated on alimiterReleasedflag so the slot is not double-decremented. Without this, every DEADLINE-EXCEEDED or pre-aborted job permanently consumed aRateLimiterwindow slot (complete()is a no-op forRateLimiter; the slot only ages out of the window naturally — but the slot was never released).disableJobdispatch (HIGH).JobDisabledErrornow routes throughdisableJob()instead of falling into the genericfailJob()branch. The H5 atomic-disable code path was previously unreachable; disabling a job mid-flight clobbered theDISABLEDstatus withFAILED.createAbortController(job.id)is now created BEFOREvalidateJobState()so theactiveJobAbortControllers.get(job.id)?.signal.abortedcheck at the top ofvalidateJobStateis reachable. Previously the controller was created after validation, making the branch dead code.Tests:
packages/test/src/test/job-queue/JobQueueWorker.test.ts(new file, three regressions, one per fix).Commit 2 —
fix(job-queue,indexeddb): abort(PENDING) attempts bump + v2 migration backfillIndexedDbQueueStorage.abort(PENDING)(CRITICAL). Replacedthis.complete(job)with a directput()that setsstatus=FAILED,abort_requested_at,completed_atWITHOUT bumpingattempts. Matches the cross-backend contract verified inInMemoryQueueStorage/PostgresQueueStorage.run_after → visible_at, leavingrun_attempts,last_ran_at,max_retries,worker_idorphaned on upgrade. The storage layer reads the post-rename names, so existing browser-deployed queues silently lost retry budgets, last-attempt timestamps, and lease ownership on first migration. Extend the cursor body to migrate all five renames idempotently and move thequeue_status_visible_atcreateIndexinto the terminalcursor === nullbranch so the index is built off post-migration rows.Tests: new
it("v2 migrates all five legacy field renames", ...)inpackages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts; newit("abort(PENDING) does not bump attempts (cross-backend contract)", ...)inside the existingH1+H4describe block inpackages/test/src/test/job-queue/genericJobQueueTests.ts(runs against every backend).Commit 3 —
fix(sqlite): v3 max_attempts default = 10 to match Postgres parityPR #511 lowered the default retry budget from 20 (Postgres) / 23 (SQLite v1) to 10. Postgres v3 explicitly applied
ALTER COLUMN max_attempts SET DEFAULT 10; SQLite v3 renamed the column but did not adjust the default. Fresh SQLite installs ended up at default 23, Postgres at 10 — callers omittingmaxAttemptsgot divergent retry behavior across backends.SQLite has no
ALTER COLUMN ... SET DEFAULTsyntax, so the fix uses the documented 12-step table-rebuild procedure: build the newCREATE TABLEstatement from the post-renamePRAGMA table_info, swapmax_attempts's default literal, copy rows, drop old, rename new, recreate indexes. Gated on the current default not already being'10'so re-running v3 is idempotent.Tests: extended
packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.tswith a newdescribe("queue migrations: cross-backend default parity", ...)block that assertsmax_attempts default === 10andattempts default === 0on both Postgres (information_schema.columns) and SQLite (PRAGMA table_info).Commit 4 —
fix(job-queue): validate leaseMs / extendLease ms inputs across all backendsPR #511 added
Number.isFiniteguards to Supabase only; Postgres, SQLite,InMemoryQueueStorage, and IndexedDB passedleaseMs/msdirectly intonew Date(Date.now() + ms).toISOString()(yields"Invalid Date"forNaN/Infinity, poisoninglease_expires_at) or into parameterized SQL fragments (runtime error for non-finite). A negativeleaseMsimmediately re-expired the lease a worker just claimed.New shared helper
validateLeaseMs()atpackages/job-queue/src/queue-storage/validateLeaseMs.ts, exported from@workglow/job-queue. Called at the top of everynext()andextendLease()across all 5 backends. Supabase's inlineErrorthrows migrated to the sharedRangeErrorso all backends report the same exception type.ms === 0remains valid (instant expiry).Tests: new
describe("leaseMs / extendLease input validation (PR #511 follow-up)", ...)block insidepackages/test/src/test/job-queue/genericJobQueueTests.ts— runs against every backend, covers negative/NaN/Infinity rejections onnext()andextendLease()plus theleaseMs: 0accept case.Verification
bun run build:types— clean (30 tasks)bun scripts/test.ts queue vitest— 398 passed, 6 skippedbun scripts/test.ts storage vitest— 1226 passed, 13 skippedTest files added / modified
packages/test/src/test/job-queue/JobQueueWorker.test.tspackages/job-queue/src/queue-storage/validateLeaseMs.tspackages/test/src/test/job-queue/genericJobQueueTests.ts(cross-backend regressions)packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.tspackages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.tsReference: PR #511 (#511) —
69c1bd1.Generated by Claude Code