From 677796920c719a5308f16a8a2323679c64123d31 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 19 May 2026 08:31:32 +0000 Subject: [PATCH 1/7] fix(job-queue): RateLimiter slot leak, disableJob dispatch, pre-execute abort check Three follow-up fixes to PR #511 that the commit message claimed but that did not land in the code: 1. RateLimiter slot leak (CRITICAL): validateJobState() failures now call limiter.release() (not complete()) before rethrow, and the outer finally is gated on a limiterReleased flag so the slot is not double-decremented. Without this, every DEADLINE-EXCEEDED or pre-aborted job permanently consumed a RateLimiter window slot. 2. disableJob dispatch (HIGH): JobDisabledError now routes through disableJob() instead of falling into failJob(), so attempting to disable a job no longer clobbers the DISABLED status with FAILED. The H5 atomic-disable code path is now reachable. 3. Pre-execute abort check (HIGH): createAbortController(job.id) is moved before validateJobState() so the pre-execute abort flag check at the top of validateJobState (which reads activeJobAbortControllers) is reachable. Previously the controller was created after validation, making the branch dead code. Adds JobQueueWorker.test.ts with regressions for all three. --- packages/job-queue/src/job/JobQueueWorker.ts | 34 ++- .../src/test/job-queue/JobQueueWorker.test.ts | 234 ++++++++++++++++++ 2 files changed, 261 insertions(+), 7 deletions(-) create mode 100644 packages/test/src/test/job-queue/JobQueueWorker.test.ts diff --git a/packages/job-queue/src/job/JobQueueWorker.ts b/packages/job-queue/src/job/JobQueueWorker.ts index f4bbb5d92..cf04441fc 100644 --- a/packages/job-queue/src/job/JobQueueWorker.ts +++ b/packages/job-queue/src/job/JobQueueWorker.ts @@ -615,20 +615,31 @@ export class JobQueueWorker< }) : undefined; + let limiterReleased = false; try { // The limiter slot was already atomically reserved by tryAcquire() in - // the main loop (or processNext), so we no longer call recordJobStart - // here — doing so would double-count. + // the main loop (or processNext). We register the abort controller + // BEFORE validateJobState so the controller is observable from inside + // validateJobState (which checks activeJobAbortControllers for a + // pre-execute abort flag). Previously the controller was created + // AFTER validation, making that abort-before-execute branch dead code. + const abortController = this.createAbortController(job.id); + try { await this.validateJobState(job); } catch (validationErr) { - // Throw — the outer finally block's limiter.complete() will release - // the slot. Do NOT call limiter.release() here too; that would - // double-decrement the counter and admit one extra concurrent job. + // RateLimiter.complete() is a no-op on the success/error paths + // (the slot only ages out of the window naturally) — so on a + // validation failure here we MUST call release() to actually free + // the slot. Without this, a DEADLINE-EXCEEDED or pre-aborted job + // permanently consumed a RateLimiter window slot. The + // limiterReleased flag gates the outer finally's complete() call so + // we don't double-handle the slot. + await this.limiter.release(limiterToken); + limiterReleased = true; throw validationErr; } - const abortController = this.createAbortController(job.id); this.events.emit("job_start", job.id); let leaseInterval: ReturnType | undefined; @@ -701,13 +712,22 @@ export class JobQueueWorker< }); span?.setStatus(SpanStatusCode.UNSET); } + } else if (error instanceof JobDisabledError) { + // Route through disableJob so the row transitions to DISABLED + // (not FAILED). Without this branch, attempting to disable a job + // mid-flight clobbered the DISABLED status with FAILED and the + // H5 atomic-disable code path was unreachable. + await this.disableJob(job); + span?.setStatus(SpanStatusCode.UNSET); } else { await this.failJob(job, error); span?.setStatus(SpanStatusCode.ERROR, error.message); } span?.setAttributes({ "workglow.job.error": spanErrorMessage }); } finally { - await this.limiter.complete(limiterToken); + if (!limiterReleased) { + await this.limiter.complete(limiterToken); + } span?.end(); // Guard against a concurrent processSingleJob for the same jobId (which // can start before this finally block runs, e.g. after a reschedule). diff --git a/packages/test/src/test/job-queue/JobQueueWorker.test.ts b/packages/test/src/test/job-queue/JobQueueWorker.test.ts new file mode 100644 index 000000000..f381450a6 --- /dev/null +++ b/packages/test/src/test/job-queue/JobQueueWorker.test.ts @@ -0,0 +1,234 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { IJobExecuteContext } from "@workglow/job-queue"; +import { + InMemoryQueueStorage, + InMemoryRateLimiterStorage, + Job, + JobDisabledError, + JobQueueClient, + JobQueueServer, + JobStatus, + RateLimiter, +} from "@workglow/job-queue"; +import { setLogger, sleep, uuid4 } from "@workglow/util"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { getTestingLogger } from "../../binding/TestingLogger"; + +interface TI { + readonly taskType?: string; + readonly data?: string; + readonly [key: string]: unknown; +} +interface TO { + readonly result?: string; + readonly [key: string]: unknown; +} + +/** + * Test job that supports a few input flavours. + * - `long_running` — hangs on a promise that only resolves when its signal fires. + * - `disable_on_abort` — throws JobDisabledError when its signal fires (used to + * exercise the JobDisabledError dispatch branch in processSingleJob's catch). + * - anything else — resolves immediately. + */ +class TJob extends Job { + public override async execute(input: TI, context: IJobExecuteContext): Promise { + if (input.taskType === "disable_on_abort") { + return new Promise((_, reject) => { + context.signal.addEventListener( + "abort", + () => reject(new JobDisabledError(`Job ${String(input.data)} was disabled`)), + { once: true } + ); + }); + } + if (input.taskType === "long_running") { + return new Promise((_, reject) => { + context.signal.addEventListener("abort", () => reject(new Error("Aborted")), { + once: true, + }); + }); + } + return { result: "done" }; + } +} + +async function waitUntil( + predicate: () => boolean | Promise, + ceilingMs = 1000, + stepMs = 5 +): Promise { + const deadline = Date.now() + ceilingMs; + while (Date.now() < deadline) { + if (await predicate()) return true; + await sleep(stepMs); + } + return false; +} + +describe("JobQueueWorker — PR #511 follow-up regressions", () => { + setLogger(getTestingLogger()); + let storage: InMemoryQueueStorage; + let queueName: string; + + beforeEach(async () => { + queueName = `worker-followup-${uuid4()}`; + storage = new InMemoryQueueStorage(queueName); + await storage.migrate(); + }); + + afterEach(async () => { + await storage.deleteAll(); + }); + + it("DEADLINE-EXCEEDED job releases its RateLimiter slot", async () => { + // Two-slot rate limiter. Without the fix, a job whose deadline_at is in + // the past consumes one slot at tryAcquire and never returns it (the + // outer finally calls limiter.complete(), which is a no-op for + // RateLimiter). The fix calls limiter.release() before rethrowing + // the validation error. + const limiter = new RateLimiter(new InMemoryRateLimiterStorage(), queueName, { + maxExecutions: 2, + windowSizeInSeconds: 60, + }); + + const server = new JobQueueServer(TJob, { + storage: storage as any, + queueName, + pollIntervalMs: 5, + stopTimeoutMs: 0, + limiter, + }); + const client = new JobQueueClient({ storage: storage as any, queueName }); + client.attach(server); + + // Insert a job whose deadline is already in the past so validateJobState + // rejects with PermanentJobError ("has exceeded its deadline"). We use + // storage.add directly because client.send only accepts a positive + // timeoutSeconds. + const pastDeadline = new Date(Date.now() - 60_000).toISOString(); + const id = await storage.add({ + input: { taskType: "deadline-exceeded", data: "x" }, + visible_at: null, + completed_at: null, + deadline_at: pastDeadline, + } as any); + + await server.start(); + + // Wait for the job to leave PENDING (it should fail validation and reach + // FAILED very quickly). + const left = await waitUntil(async () => { + const j = await storage.get(id); + return !!j && j.status !== JobStatus.PENDING; + }); + expect(left).toBe(true); + + // Give the finally block a tick to run. + await sleep(20); + + // Both slots must still be free — the failed validation must not have + // burned a slot. + const t1 = await limiter.tryAcquire(); + const t2 = await limiter.tryAcquire(); + expect(t1).not.toBeNull(); + expect(t2).not.toBeNull(); + + await limiter.release(t1); + await limiter.release(t2); + + await server.stop(); + }); + + it("disabling a job mid-flight transitions it to DISABLED, not FAILED", async () => { + // The user task throws JobDisabledError when its abort signal fires — + // the realistic shape of consumer code that detects disablement on + // re-checking state. Without the JobDisabledError dispatch branch in + // processSingleJob's catch, this error fell into the generic failJob + // branch and clobbered status to FAILED. + const server = new JobQueueServer(TJob, { + storage: storage as any, + queueName, + pollIntervalMs: 5, + stopTimeoutMs: 0, + }); + const client = new JobQueueClient({ storage: storage as any, queueName }); + client.attach(server); + + await server.start(); + + const handle = await client.send({ taskType: "disable_on_abort", data: "disable-me" }); + + // Wait for PROCESSING. + const reached = await waitUntil(async () => { + const j = await storage.get(handle.id); + return j?.status === JobStatus.PROCESSING; + }); + expect(reached).toBe(true); + + // Flag the row as DISABLED in storage and signal the worker to abort it. + await storage.saveStatus(handle.id, JobStatus.DISABLED); + // Restore PROCESSING so the worker's checkForAbortingJobs (which peeks + // PROCESSING rows) observes abort_requested_at and fires the abort + // controller — the user task then throws JobDisabledError. + await storage.saveStatus(handle.id, JobStatus.PROCESSING); + await storage.abort(handle.id); + + // Wait for terminal state — DISABLED. + const disabled = await waitUntil(async () => { + const j = await storage.get(handle.id); + return j?.status === JobStatus.DISABLED; + }); + + await server.stop(); + expect(disabled).toBe(true); + + const final = await storage.get(handle.id); + expect(final?.status).toBe(JobStatus.DISABLED); + }); + + it("pre-execute abort flag is observed during validateJobState", async () => { + // Before the fix, createAbortController() ran AFTER validateJobState, + // so the activeJobAbortControllers.get(...).signal.aborted branch in + // validateJobState was dead code. With the controller registered + // first, an abort fired before start() / right at start time is + // observable and the job fails with AbortSignalJobError + + // abort_requested_at set. + const server = new JobQueueServer(TJob, { + storage: storage as any, + queueName, + pollIntervalMs: 5, + stopTimeoutMs: 0, + }); + const client = new JobQueueClient({ storage: storage as any, queueName }); + client.attach(server); + + const handle = await client.send({ taskType: "long_running", data: "pre-abort" }); + // Abort before any worker has claimed the row — this PENDING-abort + // path sets the row to FAILED with abort_requested_at set immediately. + await storage.abort(handle.id); + + await server.start(); + + const reached = await waitUntil(async () => { + const j = await storage.get(handle.id); + return ( + j?.status === JobStatus.FAILED || + j?.status === JobStatus.COMPLETED || + j?.status === JobStatus.DISABLED + ); + }); + expect(reached).toBe(true); + + await server.stop(); + + const final = await storage.get(handle.id); + expect(final?.status).toBe(JobStatus.FAILED); + expect(final?.abort_requested_at).toBeTruthy(); + }); +}); From e0a2d4706489204d63a5aee8c6b8eed71715951a Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 19 May 2026 08:35:02 +0000 Subject: [PATCH 2/7] fix(job-queue,indexeddb): abort(PENDING) attempts bump + v2 migration backfill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two follow-up fixes to PR #511: 1. IndexedDbQueueStorage.abort(PENDING) (CRITICAL): replaced `this.complete(job)` with a direct `put()` that sets status=FAILED, abort_requested_at, completed_at WITHOUT bumping attempts. Matches the cross-backend contract verified in InMemoryQueueStorage/PostgresQueueStorage and asserted in a new genericJobQueueTests case. 2. IndexedDB v2 migration (CRITICAL): v2 previously copied only run_after → visible_at, leaving run_attempts, last_ran_at, max_retries, worker_id orphaned on upgrade. The storage layer reads the post-rename names, so existing browser-deployed queues silently lost retry budgets, last-attempt timestamps, and lease ownership on first migration. Extend the cursor body to migrate all five renames idempotently. Move the queue_status_visible_at createIndex into the terminal cursor branch so the index is built off post-migration rows. Adds IndexedDbQueueMigrations.integration test for the five-rename case and a cross-backend abort(PENDING) attempts-stability assertion. --- .../src/job-queue/IndexedDbQueueStorage.ts | 5 +- .../migrations/indexedDbQueueMigrations.ts | 68 +++++--- .../test/job-queue/genericJobQueueTests.ts | 21 +++ ...dexedDbQueueMigrations.integration.test.ts | 149 ++++++++++++++++++ 4 files changed, 224 insertions(+), 19 deletions(-) diff --git a/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts b/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts index 14e1da9b1..c17900c3e 100644 --- a/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts +++ b/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts @@ -537,7 +537,10 @@ export class IndexedDbQueueStorage implements IQueueStorage { const cursor = cursorReq.result; - if (!cursor) return; + if (!cursor) { + // Recreate the renamed index under the new key path. Skip if a + // previous partial run already created it — IDB forbids two + // indexes sharing a name and would throw mid-upgrade. + if (!Array.from(store.indexNames).includes("queue_status_visible_at")) { + store.createIndex("queue_status_visible_at", k(["queue", "status", "visible_at"]), { + unique: false, + }); + } + return; + } const row = cursor.value as Record | null; - if (row && row.visible_at === undefined && row.run_after !== undefined) { - row.visible_at = row.run_after; - cursor.update(row); + if (row) { + let dirty = false; + if (row.run_after !== undefined && row.visible_at === undefined) { + row.visible_at = row.run_after; + delete row.run_after; + dirty = true; + } + if (row.run_attempts !== undefined && row.attempts === undefined) { + row.attempts = row.run_attempts; + delete row.run_attempts; + dirty = true; + } + if (row.last_ran_at !== undefined && row.last_attempted_at === undefined) { + row.last_attempted_at = row.last_ran_at; + delete row.last_ran_at; + dirty = true; + } + if (row.max_retries !== undefined && row.max_attempts === undefined) { + row.max_attempts = row.max_retries; + delete row.max_retries; + dirty = true; + } + if (row.worker_id !== undefined && row.lease_owner === undefined) { + row.lease_owner = row.worker_id; + delete row.worker_id; + dirty = true; + } + if (dirty) cursor.update(row); } cursor.continue(); }; - - // Recreate the index under the new name keyed on `visible_at`. - // Skip if a previous partial run already created it (defensive — IDB - // forbids two indexes sharing a name and would throw mid-upgrade). - if (!Array.from(store.indexNames).includes("queue_status_visible_at")) { - store.createIndex("queue_status_visible_at", k(["queue", "status", "visible_at"]), { - unique: false, - }); - } }, }, ]; diff --git a/packages/test/src/test/job-queue/genericJobQueueTests.ts b/packages/test/src/test/job-queue/genericJobQueueTests.ts index e27d37be9..381597c36 100644 --- a/packages/test/src/test/job-queue/genericJobQueueTests.ts +++ b/packages/test/src/test/job-queue/genericJobQueueTests.ts @@ -1343,5 +1343,26 @@ export function runGenericJobQueueTests( expect(second?.attempts).toBe(attemptsBeforeReclaim + 1); expect(second?.abort_requested_at ?? null).toBe(null); }); + + it("abort(PENDING) does not bump attempts (cross-backend contract)", async () => { + // Regression for PR #511 follow-up: IndexedDbQueueStorage.abort(PENDING) + // previously routed through complete() which bumps attempts. The + // cross-backend contract (InMemory/Postgres) is that aborting a row + // the worker never claimed must NOT consume retry budget — the worker + // never actually attempted execution. + const handle = await client.send({ taskType: "task1", data: "abort-pending-no-bump" }); + const id = handle.id; + const before = await storage.get(id); + expect(before?.status).toBe(JobStatus.PENDING); + expect(before?.attempts ?? 0).toBe(0); + + await storage.abort(id); + + const after = await storage.get(id); + expect(after?.status).toBe(JobStatus.FAILED); + expect(after?.abort_requested_at).toBeTruthy(); + expect(after?.completed_at).toBeTruthy(); + expect(after?.attempts ?? 0).toBe(0); + }); }); } diff --git a/packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts b/packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts index 56611db31..e845bd85a 100644 --- a/packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts +++ b/packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts @@ -152,4 +152,153 @@ describe("indexedDB queue migrations: v1 → v2 rename + backfill", () => { }; }); }); + + it("v2 migrates all five legacy field renames", async () => { + // Regression for PR #511 follow-up: v2 previously only backfilled + // run_after → visible_at. The other four renames (run_attempts → attempts, + // last_ran_at → last_attempted_at, max_retries → max_attempts, + // worker_id → lease_owner) were silently dropped, so existing + // browser-deployed queues lost retry budgets, last-attempt timestamps, + // and lease ownership on first migration. + const dbName = `wglw_idb_qm5_${Math.random().toString(36).slice(2)}`; + const tableName = "jobs"; + + // ── (1) Hand-build a v1 DB with a row that uses all five legacy names. + await new Promise((resolve, reject) => { + const req = indexedDB.open(dbName, 2); + req.onerror = () => reject(req.error); + req.onupgradeneeded = () => { + const db = req.result; + if (!db.objectStoreNames.contains("_storage_migrations")) { + db.createObjectStore("_storage_migrations", { keyPath: ["component", "version"] }); + } + if (!db.objectStoreNames.contains(tableName)) { + const store = db.createObjectStore(tableName, { keyPath: "id" }); + store.createIndex("queue_status", ["queue", "status"], { unique: false }); + store.createIndex("queue_status_run_after", ["queue", "status", "run_after"], { + unique: false, + }); + store.createIndex("queue_job_run_id", ["queue", "job_run_id"], { unique: false }); + store.createIndex("queue_fingerprint_status", ["queue", "fingerprint", "status"], { + unique: false, + }); + } + }; + req.onsuccess = () => { + const db = req.result; + const tx = db.transaction(["_storage_migrations", tableName], "readwrite"); + tx.oncomplete = () => { + db.close(); + resolve(); + }; + tx.onerror = () => { + db.close(); + reject(tx.error); + }; + tx.objectStore("_storage_migrations").put({ + component: `queue:indexeddb:${tableName}`, + version: 1, + description: "Create queue object store + indexes", + appliedAt: new Date().toISOString(), + }); + // Seed a row with ALL five legacy field names populated, NO post-rename + // names. The migration must move every value and delete every old key. + tx.objectStore(tableName).put({ + id: 42, + queue: "test", + fingerprint: "fp42", + job_run_id: "jr42", + status: "PENDING", + input: "{}", + run_after: "2026-02-01T00:00:00.000Z", + run_attempts: 3, + last_ran_at: "2026-01-31T12:00:00.000Z", + max_retries: 7, + worker_id: "worker-legacy-1", + }); + }; + }); + + // ── (2) Run the migration chain. + const runner = new IndexedDbMigrationRunner(dbName); + await runner.run(indexedDbQueueMigrations(tableName, [])); + + // ── (3) Verify all five renames landed: new keys carry the values, + // legacy keys are deleted, and the queue_status_visible_at + // index can serve a range query keyed on the migrated + // [queue, status, visible_at] tuple. + await new Promise((resolve, reject) => { + const req = indexedDB.open(dbName); + req.onerror = () => reject(req.error); + req.onsuccess = () => { + const db = req.result; + try { + const tx = db.transaction([tableName], "readonly"); + const store = tx.objectStore(tableName); + + const getReq = store.get(42); + getReq.onsuccess = () => { + try { + const row = getReq.result as Record | undefined; + expect(row).toBeDefined(); + + // All five new keys carry the migrated values. + expect(row!.visible_at).toBe("2026-02-01T00:00:00.000Z"); + expect(row!.attempts).toBe(3); + expect(row!.last_attempted_at).toBe("2026-01-31T12:00:00.000Z"); + expect(row!.max_attempts).toBe(7); + expect(row!.lease_owner).toBe("worker-legacy-1"); + + // All five legacy keys are gone. + expect(row!.run_after).toBeUndefined(); + expect(row!.run_attempts).toBeUndefined(); + expect(row!.last_ran_at).toBeUndefined(); + expect(row!.max_retries).toBeUndefined(); + expect(row!.worker_id).toBeUndefined(); + } catch (e) { + reject(e); + return; + } + + // Index range query on the migrated tuple returns the row. + try { + const idx = store.index("queue_status_visible_at"); + const range = IDBKeyRange.bound( + ["test", "PENDING", "2026-01-01T00:00:00.000Z"], + ["test", "PENDING", "2026-12-31T00:00:00.000Z"] + ); + const cReq = idx.openCursor(range); + cReq.onsuccess = () => { + const cursor = cReq.result; + try { + expect(cursor).not.toBeNull(); + expect((cursor!.value as { id: number }).id).toBe(42); + } catch (e) { + reject(e); + } + }; + tx.oncomplete = () => { + db.close(); + resolve(); + }; + tx.onerror = () => { + db.close(); + reject(tx.error); + }; + } catch (e) { + db.close(); + reject(e); + } + }; + getReq.onerror = () => { + db.close(); + reject(getReq.error); + }; + } catch (e) { + db.close(); + reject(e); + } + }; + }); + }); }); From 475e252eec9079296056d296c4ef03fffe27f5f7 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 19 May 2026 08:39:10 +0000 Subject: [PATCH 3/7] fix(sqlite): v3 max_attempts default = 10 to match Postgres parity PR #511 lowered the default retry budget from 20 (Postgres) / 23 (SQLite v1) to 10. Postgres v3 explicitly applied `ALTER COLUMN max_attempts SET DEFAULT 10`; SQLite v3 renamed the column but did not adjust the default. Fresh SQLite installs ended up at default 23, Postgres at 10, so callers omitting maxAttempts got divergent retry behavior across backends. SQLite has no `ALTER COLUMN ... SET DEFAULT` syntax, so the fix uses the documented 12-step table-rebuild procedure (https://www.sqlite.org/lang_altertable.html#otheralter): build the new CREATE TABLE statement from the post-rename PRAGMA table_info, swap max_attempts's default literal, copy rows, drop old, rename new, recreate indexes. The rebuild is gated on the current default not already being '10' so re-running v3 is idempotent. Extend the migrations parity integration test to compare DEFAULTS, not just column names, so future drift is caught. --- .../queueMigrationsParity.integration.test.ts | 65 ++++++++++++++++++ .../src/migrations/sqliteQueueMigrations.ts | 67 +++++++++++++++++-- 2 files changed, 127 insertions(+), 5 deletions(-) diff --git a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts index ff565787d..c87fa1c2e 100644 --- a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts +++ b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts @@ -131,3 +131,68 @@ describe("sqlite queue migrations: v1→v2→v3 schema parity", () => { } }); }); + +/** + * PR #511 follow-up: SQLite v3 originally renamed `max_retries → max_attempts` + * but did not adjust the column default, leaving SQLite fresh installs at + * `DEFAULT 23` while Postgres landed on `DEFAULT 10`. Callers omitting + * `maxAttempts` got divergent retry behavior across backends. This block + * compares the post-migration column DEFAULTS (not just the column names) + * so future drift is caught. + */ +describe("queue migrations: cross-backend default parity", () => { + it("max_attempts default is 10 on both Postgres and SQLite after v3", async () => { + // ── (1) Fresh PGlite + run all Postgres queue migrations. + const pg = new PGlite(); + // ── (2) Fresh SQLite :memory: + run all SQLite queue migrations. + await Sqlite.init(); + const sqlite = new Sqlite.Database(":memory:"); + try { + await new PostgresMigrationRunner(pg as unknown as Pool).run( + postgresQueueMigrations("jobs", []) + ); + await new SqliteMigrationRunner(sqlite).run(sqliteQueueMigrations("jobs", [])); + + // ── (3) Read Postgres defaults via information_schema.columns. + const pgRows = ( + await (pg as unknown as Pool).query<{ + column_name: string; + column_default: string | null; + }>( + `SELECT column_name, column_default FROM information_schema.columns + WHERE table_name = $1 AND table_schema = current_schema() + AND column_name IN ('max_attempts', 'attempts')`, + ["jobs"] + ) + ).rows; + const pgMaxAttempts = pgRows.find((r) => r.column_name === "max_attempts"); + const pgAttempts = pgRows.find((r) => r.column_name === "attempts"); + expect(pgMaxAttempts).toBeDefined(); + expect(pgAttempts).toBeDefined(); + + // ── (4) Read SQLite defaults via PRAGMA table_info(jobs). + type SqliteCol = { name: string; dflt_value: string | null }; + const sqliteRows = sqlite + .prepare<[], SqliteCol>(`PRAGMA table_info(jobs)`) + .all() + .filter((r: SqliteCol) => r.name === "max_attempts" || r.name === "attempts"); + const sqliteMaxAttempts = sqliteRows.find((r) => r.name === "max_attempts"); + const sqliteAttempts = sqliteRows.find((r) => r.name === "attempts"); + expect(sqliteMaxAttempts).toBeDefined(); + expect(sqliteAttempts).toBeDefined(); + + // ── (5) Both backends MUST agree on the integer values of the + // defaults. Number(...) coerces the SQL-literal string forms + // ("10", "0") into JS numbers so the comparison is back-end-agnostic. + expect(Number(pgMaxAttempts!.column_default)).toBe(10); + expect(Number(sqliteMaxAttempts!.dflt_value)).toBe(10); + + // Sanity check: `attempts` defaults to 0 on both backends. + expect(Number(pgAttempts!.column_default)).toBe(0); + expect(Number(sqliteAttempts!.dflt_value)).toBe(0); + } finally { + await pg.close(); + sqlite.close(); + } + }); +}); diff --git a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts index 78e174851..6e1913e83 100644 --- a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts +++ b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts @@ -84,15 +84,22 @@ export function sqliteQueueMigrations( component, version: 3, description: - "Rename run_after→visible_at, last_ran_at→last_attempted_at, run_attempts→attempts, max_retries→max_attempts, worker_id→lease_owner; drop run_after-keyed index and recreate visible_at-keyed", + "Rename run_after→visible_at, last_ran_at→last_attempted_at, run_attempts→attempts, max_retries→max_attempts, worker_id→lease_owner; drop run_after-keyed index and recreate visible_at-keyed; lower max_attempts DEFAULT 23→10 to match Postgres parity", up(db: Sqlite.Database) { // PRAGMA table_info guards each rename so fresh installs (which // arrive at v3 having just created the v1 schema in this same // migration run) are no-ops here. - const cols: string[] = db - .prepare<[], { name: string }>(`PRAGMA table_info(${tableName})`) - .all() - .map((r) => r.name); + type ColInfo = { + readonly name: string; + readonly type: string; + readonly notnull: number; + readonly dflt_value: string | null; + readonly pk: number; + }; + const colInfos: ColInfo[] = db + .prepare<[], ColInfo>(`PRAGMA table_info(${tableName})`) + .all(); + const cols: string[] = colInfos.map((r) => r.name); const renames: [string, string][] = [ ["run_after", "visible_at"], ["last_ran_at", "last_attempted_at"], @@ -114,6 +121,56 @@ export function sqliteQueueMigrations( DROP INDEX IF EXISTS job_queue_fetcher${indexSuffix}_idx; CREATE INDEX IF NOT EXISTS job_queue_fetcher${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, status, visible_at); `); + + // Postgres v3 explicitly applied `ALTER COLUMN max_attempts SET + // DEFAULT 10`; SQLite has no `ALTER COLUMN ... SET DEFAULT` syntax, + // so the original v3 migration left the SQLite default at 23 from + // v1's CREATE TABLE. Fresh SQLite installs ended up at default 23 + // while Postgres was 10 — callers omitting `maxAttempts` got + // divergent retry behavior across backends. Rebuild the table with + // the correct default using SQLite's documented 12-step procedure + // (https://www.sqlite.org/lang_altertable.html#otheralter). + const postRenameInfos: ColInfo[] = db + .prepare<[], ColInfo>(`PRAGMA table_info(${tableName})`) + .all(); + const maxAttemptsCol = postRenameInfos.find((c) => c.name === "max_attempts"); + if (maxAttemptsCol && maxAttemptsCol.dflt_value !== "10") { + // Build a new CREATE TABLE statement from the post-rename + // table_info, swapping max_attempts's default. Preserving the + // existing types / NOT NULL / PK / other defaults keeps the + // rebuild a true no-op for every other column. + const columnDefs = postRenameInfos + .map((c) => { + const parts: string[] = [c.name, c.type || ""]; + if (c.pk) { + parts.push("PRIMARY KEY"); + } + if (c.notnull) { + parts.push("NOT NULL"); + } + const dflt = + c.name === "max_attempts" ? "10" : c.dflt_value !== null ? c.dflt_value : null; + if (dflt !== null) { + parts.push(`DEFAULT ${dflt}`); + } + return parts.filter((p) => p.length > 0).join(" "); + }) + .join(",\n "); + const colList = postRenameInfos.map((c) => c.name).join(", "); + const newTable = `${tableName}__new_v3`; + db.exec(` + CREATE TABLE ${newTable} ( + ${columnDefs} + ); + INSERT INTO ${newTable} (${colList}) SELECT ${colList} FROM ${tableName}; + DROP TABLE ${tableName}; + ALTER TABLE ${newTable} RENAME TO ${tableName}; + + CREATE INDEX IF NOT EXISTS job_queue_fetcher${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, status, visible_at); + CREATE INDEX IF NOT EXISTS job_queue_fingerprint${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, fingerprint, status); + CREATE INDEX IF NOT EXISTS job_queue_job_run_id${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, job_run_id); + `); + } }, }, ]; From 386cd3b13621824c7129cc1e23f0dfef6a569ead Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 19 May 2026 08:44:51 +0000 Subject: [PATCH 4/7] fix(job-queue): validate leaseMs / extendLease ms inputs across all backends PR #511 added Number.isFinite guards to Supabase only; Postgres, SQLite, InMemoryQueueStorage, and IndexedDB passed leaseMs / ms directly into new Date(Date.now() + ms).toISOString() (yields "Invalid Date" for NaN/Infinity, poisoning the row) or into parameterized SQL fragments (runtime error for non-finite). A negative leaseMs immediately re-expires the lease a worker just claimed. Extract validation into a shared validateLeaseMs() helper in @workglow/job-queue; call it at the top of every next() and extendLease() across all 5 backends. Migrate Supabase from its inline Error throw to the shared RangeError so all backends report the same exception type. ms === 0 remains valid (instant expiry). Adds a cross-backend contract test that runs against every backend via the shared genericJobQueueTests harness. --- .../src/job-queue/IndexedDbQueueStorage.ts | 6 ++- packages/job-queue/src/common.ts | 1 + .../src/queue-storage/InMemoryQueueStorage.ts | 3 ++ .../src/queue-storage/validateLeaseMs.ts | 17 ++++++ .../test/job-queue/genericJobQueueTests.ts | 53 +++++++++++++++++++ .../src/job-queue/PostgresQueueStorage.ts | 4 +- .../src/job-queue/SqliteQueueStorage.ts | 4 +- .../src/job-queue/SupabaseQueueStorage.ts | 12 ++--- 8 files changed, 89 insertions(+), 11 deletions(-) create mode 100644 packages/job-queue/src/queue-storage/validateLeaseMs.ts diff --git a/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts b/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts index c17900c3e..eb112af09 100644 --- a/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts +++ b/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts @@ -12,7 +12,7 @@ import type { QueueStorageOptions, QueueSubscribeOptions, } from "@workglow/job-queue"; -import { JobStatus } from "@workglow/job-queue"; +import { JobStatus, validateLeaseMs } from "@workglow/job-queue"; import { HybridSubscriptionManager } from "@workglow/storage"; import { createServiceToken, deepEqual, makeFingerprint, uuid4 } from "@workglow/util"; import { IndexedDbMigrationRunner } from "../migrations/IndexedDbMigrationRunner"; @@ -266,11 +266,12 @@ export class IndexedDbQueueStorage implements IQueueStorage | undefined> { + const leaseMs = opts?.leaseMs ?? 30000; + validateLeaseMs(leaseMs, "leaseMs"); const db = await this.getDb(); const tx = db.transaction(this.tableName, "readwrite"); const store = tx.objectStore(this.tableName); const now = new Date().toISOString(); - const leaseMs = opts?.leaseMs ?? 30000; const leaseExpiry = new Date(Date.now() + leaseMs).toISOString(); const prefixKeyValues = this.getPrefixKeyValues(); @@ -421,6 +422,7 @@ export class IndexedDbQueueStorage implements IQueueStorage { + validateLeaseMs(ms, "ms"); const job = await this.get(id); if (!job || job.status !== JobStatus.PROCESSING || job.lease_owner !== workerId) { throw new Error( diff --git a/packages/job-queue/src/common.ts b/packages/job-queue/src/common.ts index 57031b177..83b6e1c75 100644 --- a/packages/job-queue/src/common.ts +++ b/packages/job-queue/src/common.ts @@ -33,6 +33,7 @@ export * from "./queue-storage/InMemoryMessageQueue"; export * from "./queue-storage/InMemoryQueueStorage"; export * from "./queue-storage/TelemetryQueueStorage"; export * from "./queue-storage/createInMemoryQueue"; +export * from "./queue-storage/validateLeaseMs"; export * from "./queue-storage/wrapQueueStorage"; export * from "./rate-limiter-storage/IRateLimiterStorage"; diff --git a/packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts b/packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts index 6a3b87320..abf00ce0a 100644 --- a/packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts +++ b/packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts @@ -20,6 +20,7 @@ import { QueueStorageOptions, QueueSubscribeOptions, } from "./IQueueStorage"; +import { validateLeaseMs } from "./validateLeaseMs"; /** * Event listeners for queue storage events @@ -147,6 +148,7 @@ export class InMemoryQueueStorage implements IQueueStorage | undefined> { await sleep(0); const leaseMs = opts?.leaseMs ?? 30000; + validateLeaseMs(leaseMs, "leaseMs"); const now = new Date().toISOString(); const leaseExpiry = new Date(Date.now() + leaseMs).toISOString(); @@ -192,6 +194,7 @@ export class InMemoryQueueStorage implements IQueueStorage { + validateLeaseMs(ms, "ms"); await sleep(0); const job = this.jobQueue.find((j) => j.id === id && this.matchesPrefixes(j)); if (!job || job.status !== JobStatus.PROCESSING || job.lease_owner !== workerId) { diff --git a/packages/job-queue/src/queue-storage/validateLeaseMs.ts b/packages/job-queue/src/queue-storage/validateLeaseMs.ts new file mode 100644 index 000000000..31c7a869e --- /dev/null +++ b/packages/job-queue/src/queue-storage/validateLeaseMs.ts @@ -0,0 +1,17 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Validates `leaseMs` / `extendLease ms` inputs across all job-queue + * backends. NaN/Infinity produce "Invalid Date" ISO strings that + * poison `lease_expires_at`; negative values immediately re-expire + * the lease a worker just claimed. `0` is permitted (instant expiry). + */ +export function validateLeaseMs(ms: number, fieldName = "leaseMs"): void { + if (!Number.isFinite(ms) || ms < 0) { + throw new RangeError(`${fieldName} must be a non-negative finite number; got ${ms}`); + } +} diff --git a/packages/test/src/test/job-queue/genericJobQueueTests.ts b/packages/test/src/test/job-queue/genericJobQueueTests.ts index 381597c36..e4605abb2 100644 --- a/packages/test/src/test/job-queue/genericJobQueueTests.ts +++ b/packages/test/src/test/job-queue/genericJobQueueTests.ts @@ -606,6 +606,59 @@ export function runGenericJobQueueTests( }); }); + describe("leaseMs / extendLease input validation (PR #511 follow-up)", () => { + // PR #511 added Number.isFinite guards to Supabase only; the other + // backends silently produced "Invalid Date" ISO strings (poisoning + // lease_expires_at) or runtime SQL errors. These tests assert the + // unified RangeError contract across every backend exercised by this + // generic suite. + it("next(): negative leaseMs rejects with RangeError", async () => { + await expect(storage.next("rangeerr-worker", { leaseMs: -1 })).rejects.toThrow(RangeError); + }); + + it("next(): NaN leaseMs rejects with RangeError", async () => { + await expect(storage.next("rangeerr-worker", { leaseMs: Number.NaN })).rejects.toThrow( + RangeError + ); + }); + + it("next(): Infinity leaseMs rejects with RangeError", async () => { + await expect( + storage.next("rangeerr-worker", { leaseMs: Number.POSITIVE_INFINITY }) + ).rejects.toThrow(RangeError); + }); + + it("next(): leaseMs === 0 is accepted (instant expiry)", async () => { + // 0 is permitted — it means "lease expires immediately", which is a + // valid (if unusual) configuration. The call must not throw a + // RangeError; it may legitimately return a job or undefined depending + // on what's enqueued. + let threw: unknown = null; + try { + await storage.next("zerolease-worker", { leaseMs: 0 }); + } catch (e) { + threw = e; + } + expect(threw).toBeNull(); + }); + + it("extendLease(): negative ms rejects with RangeError", async () => { + await expect(storage.extendLease("any-id", "any-worker", -1)).rejects.toThrow(RangeError); + }); + + it("extendLease(): NaN ms rejects with RangeError", async () => { + await expect(storage.extendLease("any-id", "any-worker", Number.NaN)).rejects.toThrow( + RangeError + ); + }); + + it("extendLease(): Infinity ms rejects with RangeError", async () => { + await expect( + storage.extendLease("any-id", "any-worker", Number.POSITIVE_INFINITY) + ).rejects.toThrow(RangeError); + }); + }); + describe("Same-process optimizations", () => { const itFastWake = skipFastWakeTests ? it.skip : it; diff --git a/providers/postgres/src/job-queue/PostgresQueueStorage.ts b/providers/postgres/src/job-queue/PostgresQueueStorage.ts index c382ae812..4b2271675 100644 --- a/providers/postgres/src/job-queue/PostgresQueueStorage.ts +++ b/providers/postgres/src/job-queue/PostgresQueueStorage.ts @@ -12,7 +12,7 @@ import type { QueueStorageOptions, QueueSubscribeOptions, } from "@workglow/job-queue"; -import { JobStatus } from "@workglow/job-queue"; +import { JobStatus, validateLeaseMs } from "@workglow/job-queue"; import type { Pool } from "@workglow/postgres/storage"; import { assertPrefixesSafe, @@ -238,6 +238,7 @@ export class PostgresQueueStorage implements IQueueStorage | undefined> { const leaseMs = opts?.leaseMs ?? 30000; + validateLeaseMs(leaseMs, "leaseMs"); // Parameters: $1=PROCESSING, $2=now+leaseMs interval, $3=queue, $4=workerId, $5=PENDING, $6=PROCESSING, $7+=prefix params const { conditions: prefixConditions, params: prefixParams } = this.buildPrefixWhereClause(7); const result = await this.db.query< @@ -296,6 +297,7 @@ export class PostgresQueueStorage implements IQueueStorage { + validateLeaseMs(ms, "ms"); const { conditions: prefixConditions, params: prefixParams } = this.buildPrefixWhereClause(5); const result = await this.db.query( `UPDATE ${this.tableName} diff --git a/providers/sqlite/src/job-queue/SqliteQueueStorage.ts b/providers/sqlite/src/job-queue/SqliteQueueStorage.ts index 20345efee..1e9a3cf3e 100644 --- a/providers/sqlite/src/job-queue/SqliteQueueStorage.ts +++ b/providers/sqlite/src/job-queue/SqliteQueueStorage.ts @@ -12,7 +12,7 @@ import type { QueueStorageOptions, QueueSubscribeOptions, } from "@workglow/job-queue"; -import { JobStatus } from "@workglow/job-queue"; +import { JobStatus, validateLeaseMs } from "@workglow/job-queue"; import type { Sqlite } from "@workglow/sqlite/storage"; import { assertPrefixesSafe, @@ -358,6 +358,7 @@ export class SqliteQueueStorage implements IQueueStorage | undefined> { const now = new Date().toISOString(); const leaseMs = opts?.leaseMs ?? 30000; + validateLeaseMs(leaseMs, "leaseMs"); const leaseExpiry = new Date(Date.now() + leaseMs).toISOString(); const prefixConditions = this.buildPrefixWhereClause(); const prefixParams = this.getPrefixParamValues(); @@ -429,6 +430,7 @@ export class SqliteQueueStorage implements IQueueStorage { + validateLeaseMs(ms, "ms"); const leaseExpiry = new Date(Date.now() + ms).toISOString(); const prefixConditions = this.buildPrefixWhereClause(); const prefixParams = this.getPrefixParamValues(); diff --git a/providers/supabase/src/job-queue/SupabaseQueueStorage.ts b/providers/supabase/src/job-queue/SupabaseQueueStorage.ts index 7aad321a6..5856278e9 100644 --- a/providers/supabase/src/job-queue/SupabaseQueueStorage.ts +++ b/providers/supabase/src/job-queue/SupabaseQueueStorage.ts @@ -14,7 +14,7 @@ import type { QueueStorageOptions, QueueSubscribeOptions, } from "@workglow/job-queue"; -import { JobStatus } from "@workglow/job-queue"; +import { JobStatus, validateLeaseMs } from "@workglow/job-queue"; import { PollingSubscriptionManager } from "@workglow/storage"; import { createServiceToken, deepEqual, makeFingerprint, uuid4 } from "@workglow/util"; @@ -369,9 +369,7 @@ export class SupabaseQueueStorage implements IQueueStorage | undefined> { const leaseMs = opts?.leaseMs ?? 30000; - if (!Number.isFinite(leaseMs) || leaseMs < 0) { - throw new Error(`Invalid leaseMs: ${leaseMs}`); - } + validateLeaseMs(leaseMs, "leaseMs"); const prefixConditions = this.buildPrefixWhereSql(); const validatedQueueName = this.validateSqlValue(this.queueName, "queueName"); const validatedWorkerId = this.validateSqlValue(workerId, "workerId"); @@ -425,15 +423,15 @@ export class SupabaseQueueStorage implements IQueueStorage { + // Validate lease arg FIRST so callers get a consistent RangeError across + // backends regardless of whether the id happens to be invalid too. + validateLeaseMs(ms, "ms"); const validatedWorkerId = this.validateSqlValue(workerId, "workerId"); const escapedWorkerId = this.escapeSqlString(validatedWorkerId); const numericId = Number(id); if (!Number.isFinite(numericId)) { throw new Error(`Invalid job id: ${id}`); } - if (!Number.isFinite(ms) || ms < 0) { - throw new Error(`Invalid lease extension ms: ${ms}`); - } const prefixConditions = this.buildPrefixWhereSql(); From fdeca2fc63d48e7b3d8070d2539ff16d3e30bce8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 May 2026 15:45:22 +0000 Subject: [PATCH 5/7] fix(sqlite): harden v3 rebuild and tighten follow-up tests Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com> --- .../src/test/job-queue/JobQueueWorker.test.ts | 44 +++++++--- .../queueMigrationsParity.integration.test.ts | 47 +++++++++++ .../src/migrations/sqliteQueueMigrations.ts | 81 ++++++++++--------- 3 files changed, 122 insertions(+), 50 deletions(-) diff --git a/packages/test/src/test/job-queue/JobQueueWorker.test.ts b/packages/test/src/test/job-queue/JobQueueWorker.test.ts index f381450a6..980becd1f 100644 --- a/packages/test/src/test/job-queue/JobQueueWorker.test.ts +++ b/packages/test/src/test/job-queue/JobQueueWorker.test.ts @@ -12,6 +12,7 @@ import { JobDisabledError, JobQueueClient, JobQueueServer, + JobQueueWorker, JobStatus, RateLimiter, } from "@workglow/job-queue"; @@ -37,7 +38,10 @@ interface TO { * - anything else — resolves immediately. */ class TJob extends Job { + public static executeCalls = 0; + public override async execute(input: TI, context: IJobExecuteContext): Promise { + TJob.executeCalls += 1; if (input.taskType === "disable_on_abort") { return new Promise((_, reject) => { context.signal.addEventListener( @@ -77,6 +81,7 @@ describe("JobQueueWorker — PR #511 follow-up regressions", () => { let queueName: string; beforeEach(async () => { + TJob.executeCalls = 0; queueName = `worker-followup-${uuid4()}`; storage = new InMemoryQueueStorage(queueName); await storage.migrate(); @@ -193,13 +198,34 @@ describe("JobQueueWorker — PR #511 follow-up regressions", () => { }); it("pre-execute abort flag is observed during validateJobState", async () => { - // Before the fix, createAbortController() ran AFTER validateJobState, - // so the activeJobAbortControllers.get(...).signal.aborted branch in - // validateJobState was dead code. With the controller registered - // first, an abort fired before start() / right at start time is - // observable and the job fails with AbortSignalJobError + - // abort_requested_at set. - const server = new JobQueueServer(TJob, { + class PreAbortedWorker extends JobQueueWorker { + protected override createAbortController(jobId: unknown): AbortController { + const controller = super.createAbortController(jobId); + controller.abort(); + return controller; + } + } + + class PreAbortedServer extends JobQueueServer { + protected override createWorker(): JobQueueWorker { + return new PreAbortedWorker(this.jobClass, { + messageQueue: this.messageQueue, + jobStore: this.jobStore, + queueName: this.queueName, + limiter: this.limiter, + pollIntervalMs: this.pollIntervalMs, + stopTimeoutMs: this.stopTimeoutMs, + deadLetter: this.deadLetter, + prefetch: this.prefetch, + }); + } + } + + // Before the fix, createAbortController() ran AFTER validateJobState, so + // aborting the controller here would have been too late to trip the + // activeJobAbortControllers.get(...).signal.aborted branch. The worker + // would enter execute() instead of failing during validation. + const server = new PreAbortedServer(TJob, { storage: storage as any, queueName, pollIntervalMs: 5, @@ -209,9 +235,6 @@ describe("JobQueueWorker — PR #511 follow-up regressions", () => { client.attach(server); const handle = await client.send({ taskType: "long_running", data: "pre-abort" }); - // Abort before any worker has claimed the row — this PENDING-abort - // path sets the row to FAILED with abort_requested_at set immediately. - await storage.abort(handle.id); await server.start(); @@ -230,5 +253,6 @@ describe("JobQueueWorker — PR #511 follow-up regressions", () => { const final = await storage.get(handle.id); expect(final?.status).toBe(JobStatus.FAILED); expect(final?.abort_requested_at).toBeTruthy(); + expect(TJob.executeCalls).toBe(0); }); }); diff --git a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts index c87fa1c2e..1b68d6c78 100644 --- a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts +++ b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts @@ -196,3 +196,50 @@ describe("queue migrations: cross-backend default parity", () => { } }); }); + +describe("sqlite queue migrations: canonical v3 schema", () => { + it("rebuilds the table from the explicit post-v3 CREATE TABLE statement", async () => { + await Sqlite.init(); + const sqlite = new Sqlite.Database(":memory:"); + try { + await new SqliteMigrationRunner(sqlite).run(sqliteQueueMigrations("jobs", [])); + + const row = sqlite + .prepare<[{ readonly name: string }], { readonly sql: string | null }>( + "SELECT sql FROM sqlite_schema WHERE type = 'table' AND name = ?" + ) + .get("jobs"); + expect(row?.sql).toBeDefined(); + + const normalizeSql = (sql: string): string => sql.replace(/\s+/g, " ").trim(); + const expectedSql = `CREATE TABLE "jobs" ( + id INTEGER PRIMARY KEY, + fingerprint TEXT NOT NULL, + queue TEXT NOT NULL, + job_run_id TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'PENDING', + input TEXT NOT NULL, + output TEXT, + attempts INTEGER DEFAULT 0, + max_attempts INTEGER DEFAULT 10, + visible_at TEXT NOT NULL, + last_attempted_at TEXT, + created_at TEXT NOT NULL, + completed_at TEXT, + deadline_at TEXT, + error TEXT, + error_code TEXT, + progress REAL DEFAULT 0, + progress_message TEXT DEFAULT '', + progress_details TEXT NULL, + lease_owner TEXT, + abort_requested_at TEXT, + lease_expires_at TEXT + )`; + + expect(normalizeSql(row!.sql!)).toBe(normalizeSql(expectedSql)); + } finally { + sqlite.close(); + } + }); +}); diff --git a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts index 6e1913e83..3d984f9fd 100644 --- a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts +++ b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts @@ -32,6 +32,30 @@ export function sqliteQueueMigrations( const prefixColumnsSql = buildPrefixColumnsSql(SqliteDialect, prefixes); const prefixIndexPrefix = getPrefixIndexPrefix(prefixes); const indexSuffix = getPrefixIndexSuffix(prefixes); + const postV3ColumnSql = `${prefixColumnsSql}fingerprint TEXT NOT NULL, + queue TEXT NOT NULL, + job_run_id TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'PENDING', + input TEXT NOT NULL, + output TEXT, + attempts INTEGER DEFAULT 0, + max_attempts INTEGER DEFAULT 10, + visible_at TEXT NOT NULL, + last_attempted_at TEXT, + created_at TEXT NOT NULL, + completed_at TEXT, + deadline_at TEXT, + error TEXT, + error_code TEXT, + progress REAL DEFAULT 0, + progress_message TEXT DEFAULT '', + progress_details TEXT NULL, + lease_owner TEXT, + abort_requested_at TEXT, + lease_expires_at TEXT`; + const postV3ColumnList = `${prefixes.map((p) => p.name).join(", ")}${ + prefixes.length > 0 ? ", " : "" + }fingerprint, queue, job_run_id, status, input, output, attempts, max_attempts, visible_at, last_attempted_at, created_at, completed_at, deadline_at, error, error_code, progress, progress_message, progress_details, lease_owner, abort_requested_at, lease_expires_at`; return [ { @@ -89,16 +113,8 @@ export function sqliteQueueMigrations( // PRAGMA table_info guards each rename so fresh installs (which // arrive at v3 having just created the v1 schema in this same // migration run) are no-ops here. - type ColInfo = { - readonly name: string; - readonly type: string; - readonly notnull: number; - readonly dflt_value: string | null; - readonly pk: number; - }; - const colInfos: ColInfo[] = db - .prepare<[], ColInfo>(`PRAGMA table_info(${tableName})`) - .all(); + type ColInfo = { readonly name: string }; + const colInfos: ColInfo[] = db.prepare<[], ColInfo>(`PRAGMA table_info(${tableName})`).all(); const cols: string[] = colInfos.map((r) => r.name); const renames: [string, string][] = [ ["run_after", "visible_at"], @@ -130,39 +146,24 @@ export function sqliteQueueMigrations( // divergent retry behavior across backends. Rebuild the table with // the correct default using SQLite's documented 12-step procedure // (https://www.sqlite.org/lang_altertable.html#otheralter). - const postRenameInfos: ColInfo[] = db - .prepare<[], ColInfo>(`PRAGMA table_info(${tableName})`) - .all(); - const maxAttemptsCol = postRenameInfos.find((c) => c.name === "max_attempts"); - if (maxAttemptsCol && maxAttemptsCol.dflt_value !== "10") { - // Build a new CREATE TABLE statement from the post-rename - // table_info, swapping max_attempts's default. Preserving the - // existing types / NOT NULL / PK / other defaults keeps the - // rebuild a true no-op for every other column. - const columnDefs = postRenameInfos - .map((c) => { - const parts: string[] = [c.name, c.type || ""]; - if (c.pk) { - parts.push("PRIMARY KEY"); - } - if (c.notnull) { - parts.push("NOT NULL"); - } - const dflt = - c.name === "max_attempts" ? "10" : c.dflt_value !== null ? c.dflt_value : null; - if (dflt !== null) { - parts.push(`DEFAULT ${dflt}`); - } - return parts.filter((p) => p.length > 0).join(" "); - }) - .join(",\n "); - const colList = postRenameInfos.map((c) => c.name).join(", "); - const newTable = `${tableName}__new_v3`; + const tableSqlRow = db + .prepare<[{ readonly name: string }], { readonly sql: string | null }>( + "SELECT sql FROM sqlite_schema WHERE type = 'table' AND name = ?" + ) + .get(tableName); + if (tableSqlRow?.sql && !tableSqlRow.sql.includes("max_attempts INTEGER DEFAULT 10")) { + // Rebuild from the canonical post-v3 CREATE TABLE statement instead + // of reconstructing from PRAGMA table_info. table_info drops + // metadata such as explicit NULL/COLLATE/CHECK clauses, so deriving + // DDL from it would silently erase future schema details during this + // rebuild step. + const newTable = `"${tableName}__new_v3"`; db.exec(` CREATE TABLE ${newTable} ( - ${columnDefs} + id INTEGER PRIMARY KEY, + ${postV3ColumnSql} ); - INSERT INTO ${newTable} (${colList}) SELECT ${colList} FROM ${tableName}; + INSERT INTO ${newTable} (${postV3ColumnList}) SELECT ${postV3ColumnList} FROM ${tableName}; DROP TABLE ${tableName}; ALTER TABLE ${newTable} RENAME TO ${tableName}; From 187388d147de7d9fc1b2717324a507da2daf5722 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 May 2026 15:49:19 +0000 Subject: [PATCH 6/7] test(sqlite): share canonical v3 schema helper Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com> --- .../queueMigrationsParity.integration.test.ts | 29 +-------- .../src/migrations/sqliteQueueMigrations.ts | 64 +++++++++++-------- 2 files changed, 41 insertions(+), 52 deletions(-) diff --git a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts index 1b68d6c78..3e1cad787 100644 --- a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts +++ b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts @@ -8,7 +8,7 @@ import { PGlite } from "@electric-sql/pglite"; import { postgresQueueMigrations } from "@workglow/postgres/job-queue"; import type { Pool } from "@workglow/postgres/storage"; import { PostgresMigrationRunner } from "@workglow/postgres/storage"; -import { sqliteQueueMigrations } from "@workglow/sqlite/job-queue"; +import { buildSqliteQueuePostV3TableSql, sqliteQueueMigrations } from "@workglow/sqlite/job-queue"; import { Sqlite, SqliteMigrationRunner } from "@workglow/sqlite/storage"; import { describe, expect, it } from "vitest"; @@ -212,32 +212,7 @@ describe("sqlite queue migrations: canonical v3 schema", () => { expect(row?.sql).toBeDefined(); const normalizeSql = (sql: string): string => sql.replace(/\s+/g, " ").trim(); - const expectedSql = `CREATE TABLE "jobs" ( - id INTEGER PRIMARY KEY, - fingerprint TEXT NOT NULL, - queue TEXT NOT NULL, - job_run_id TEXT NOT NULL, - status TEXT NOT NULL DEFAULT 'PENDING', - input TEXT NOT NULL, - output TEXT, - attempts INTEGER DEFAULT 0, - max_attempts INTEGER DEFAULT 10, - visible_at TEXT NOT NULL, - last_attempted_at TEXT, - created_at TEXT NOT NULL, - completed_at TEXT, - deadline_at TEXT, - error TEXT, - error_code TEXT, - progress REAL DEFAULT 0, - progress_message TEXT DEFAULT '', - progress_details TEXT NULL, - lease_owner TEXT, - abort_requested_at TEXT, - lease_expires_at TEXT - )`; - - expect(normalizeSql(row!.sql!)).toBe(normalizeSql(expectedSql)); + expect(normalizeSql(row!.sql!)).toBe(normalizeSql(buildSqliteQueuePostV3TableSql("jobs", ""))); } finally { sqlite.close(); } diff --git a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts index 3d984f9fd..75c3c7bcc 100644 --- a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts +++ b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts @@ -14,25 +14,8 @@ import { SqliteDialect, } from "@workglow/storage"; -/** - * Initial migration set for the SQLite queue table identified by `tableName`. - * - * v1 is FROZEN byte-for-byte against the pre-PR shape — it creates the - * `run_after`/`run_attempts`/`max_retries`/`last_ran_at`/`worker_id` - * columns and the `run_after`-keyed index. Renames and the index swap - * live in v3, guarded by `PRAGMA table_info` lookups so fresh installs - * (which still run v1 → v2 → v3) end up at the same final schema as - * already-migrated DBs. - */ -export function sqliteQueueMigrations( - tableName: string, - prefixes: readonly PrefixColumn[] -): IMigration[] { - const component = `queue:sqlite:${tableName}`; - const prefixColumnsSql = buildPrefixColumnsSql(SqliteDialect, prefixes); - const prefixIndexPrefix = getPrefixIndexPrefix(prefixes); - const indexSuffix = getPrefixIndexSuffix(prefixes); - const postV3ColumnSql = `${prefixColumnsSql}fingerprint TEXT NOT NULL, +export function buildSqliteQueuePostV3ColumnSql(prefixColumnsSql: string): string { + return `${prefixColumnsSql}fingerprint TEXT NOT NULL, queue TEXT NOT NULL, job_run_id TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'PENDING', @@ -53,9 +36,43 @@ export function sqliteQueueMigrations( lease_owner TEXT, abort_requested_at TEXT, lease_expires_at TEXT`; - const postV3ColumnList = `${prefixes.map((p) => p.name).join(", ")}${ +} + +export function buildSqliteQueuePostV3ColumnList(prefixes: readonly PrefixColumn[]): string { + return `${prefixes.map((p) => p.name).join(", ")}${ prefixes.length > 0 ? ", " : "" }fingerprint, queue, job_run_id, status, input, output, attempts, max_attempts, visible_at, last_attempted_at, created_at, completed_at, deadline_at, error, error_code, progress, progress_message, progress_details, lease_owner, abort_requested_at, lease_expires_at`; +} + +export function buildSqliteQueuePostV3TableSql( + tableName: string, + prefixColumnsSql: string +): string { + return `CREATE TABLE "${tableName}" ( + id INTEGER PRIMARY KEY, + ${buildSqliteQueuePostV3ColumnSql(prefixColumnsSql)} + )`; +} + +/** + * Initial migration set for the SQLite queue table identified by `tableName`. + * + * v1 is FROZEN byte-for-byte against the pre-PR shape — it creates the + * `run_after`/`run_attempts`/`max_retries`/`last_ran_at`/`worker_id` + * columns and the `run_after`-keyed index. Renames and the index swap + * live in v3, guarded by `PRAGMA table_info` lookups so fresh installs + * (which still run v1 → v2 → v3) end up at the same final schema as + * already-migrated DBs. + */ +export function sqliteQueueMigrations( + tableName: string, + prefixes: readonly PrefixColumn[] +): IMigration[] { + const component = `queue:sqlite:${tableName}`; + const prefixColumnsSql = buildPrefixColumnsSql(SqliteDialect, prefixes); + const prefixIndexPrefix = getPrefixIndexPrefix(prefixes); + const indexSuffix = getPrefixIndexSuffix(prefixes); + const postV3ColumnList = buildSqliteQueuePostV3ColumnList(prefixes); return [ { @@ -157,12 +174,9 @@ export function sqliteQueueMigrations( // metadata such as explicit NULL/COLLATE/CHECK clauses, so deriving // DDL from it would silently erase future schema details during this // rebuild step. - const newTable = `"${tableName}__new_v3"`; + const newTable = `${tableName}__new_v3`; db.exec(` - CREATE TABLE ${newTable} ( - id INTEGER PRIMARY KEY, - ${postV3ColumnSql} - ); + ${buildSqliteQueuePostV3TableSql(`${tableName}__new_v3`, prefixColumnsSql)}; INSERT INTO ${newTable} (${postV3ColumnList}) SELECT ${postV3ColumnList} FROM ${tableName}; DROP TABLE ${tableName}; ALTER TABLE ${newTable} RENAME TO ${tableName}; From 7eebbda6f7eaee8f2a17748bf8b72d7f657df814 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 19 May 2026 15:51:59 +0000 Subject: [PATCH 7/7] fix(sqlite): tighten schema helper typing Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com> --- .../queueMigrationsParity.integration.test.ts | 2 +- providers/sqlite/src/migrations/sqliteQueueMigrations.ts | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts index 3e1cad787..a804ef5ac 100644 --- a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts +++ b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts @@ -205,7 +205,7 @@ describe("sqlite queue migrations: canonical v3 schema", () => { await new SqliteMigrationRunner(sqlite).run(sqliteQueueMigrations("jobs", [])); const row = sqlite - .prepare<[{ readonly name: string }], { readonly sql: string | null }>( + .prepare<[string], { readonly sql: string | null }>( "SELECT sql FROM sqlite_schema WHERE type = 'table' AND name = ?" ) .get("jobs"); diff --git a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts index 75c3c7bcc..79356697a 100644 --- a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts +++ b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts @@ -15,6 +15,9 @@ import { } from "@workglow/storage"; export function buildSqliteQueuePostV3ColumnSql(prefixColumnsSql: string): string { + // buildPrefixColumnsSql(...) returns either "" or an already-indented, + // comma-terminated prefix block, so it is safe to splice directly into + // the canonical column list here. return `${prefixColumnsSql}fingerprint TEXT NOT NULL, queue TEXT NOT NULL, job_run_id TEXT NOT NULL, @@ -164,7 +167,7 @@ export function sqliteQueueMigrations( // the correct default using SQLite's documented 12-step procedure // (https://www.sqlite.org/lang_altertable.html#otheralter). const tableSqlRow = db - .prepare<[{ readonly name: string }], { readonly sql: string | null }>( + .prepare<[string], { readonly sql: string | null }>( "SELECT sql FROM sqlite_schema WHERE type = 'table' AND name = ?" ) .get(tableName);