diff --git a/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts b/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts index 14e1da9b1..eb112af09 100644 --- a/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts +++ b/packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts @@ -12,7 +12,7 @@ import type { QueueStorageOptions, QueueSubscribeOptions, } from "@workglow/job-queue"; -import { JobStatus } from "@workglow/job-queue"; +import { JobStatus, validateLeaseMs } from "@workglow/job-queue"; import { HybridSubscriptionManager } from "@workglow/storage"; import { createServiceToken, deepEqual, makeFingerprint, uuid4 } from "@workglow/util"; import { IndexedDbMigrationRunner } from "../migrations/IndexedDbMigrationRunner"; @@ -266,11 +266,12 @@ export class IndexedDbQueueStorage implements IQueueStorage | undefined> { + const leaseMs = opts?.leaseMs ?? 30000; + validateLeaseMs(leaseMs, "leaseMs"); const db = await this.getDb(); const tx = db.transaction(this.tableName, "readwrite"); const store = tx.objectStore(this.tableName); const now = new Date().toISOString(); - const leaseMs = opts?.leaseMs ?? 30000; const leaseExpiry = new Date(Date.now() + leaseMs).toISOString(); const prefixKeyValues = this.getPrefixKeyValues(); @@ -421,6 +422,7 @@ export class IndexedDbQueueStorage implements IQueueStorage { + validateLeaseMs(ms, "ms"); const job = await this.get(id); if (!job || job.status !== JobStatus.PROCESSING || job.lease_owner !== workerId) { throw new Error( @@ -537,7 +539,10 @@ export class IndexedDbQueueStorage implements IQueueStorage { const cursor = cursorReq.result; - if (!cursor) return; + if (!cursor) { + // Recreate the renamed index under the new key path. Skip if a + // previous partial run already created it — IDB forbids two + // indexes sharing a name and would throw mid-upgrade. + if (!Array.from(store.indexNames).includes("queue_status_visible_at")) { + store.createIndex("queue_status_visible_at", k(["queue", "status", "visible_at"]), { + unique: false, + }); + } + return; + } const row = cursor.value as Record | null; - if (row && row.visible_at === undefined && row.run_after !== undefined) { - row.visible_at = row.run_after; - cursor.update(row); + if (row) { + let dirty = false; + if (row.run_after !== undefined && row.visible_at === undefined) { + row.visible_at = row.run_after; + delete row.run_after; + dirty = true; + } + if (row.run_attempts !== undefined && row.attempts === undefined) { + row.attempts = row.run_attempts; + delete row.run_attempts; + dirty = true; + } + if (row.last_ran_at !== undefined && row.last_attempted_at === undefined) { + row.last_attempted_at = row.last_ran_at; + delete row.last_ran_at; + dirty = true; + } + if (row.max_retries !== undefined && row.max_attempts === undefined) { + row.max_attempts = row.max_retries; + delete row.max_retries; + dirty = true; + } + if (row.worker_id !== undefined && row.lease_owner === undefined) { + row.lease_owner = row.worker_id; + delete row.worker_id; + dirty = true; + } + if (dirty) cursor.update(row); } cursor.continue(); }; - - // Recreate the index under the new name keyed on `visible_at`. - // Skip if a previous partial run already created it (defensive — IDB - // forbids two indexes sharing a name and would throw mid-upgrade). - if (!Array.from(store.indexNames).includes("queue_status_visible_at")) { - store.createIndex("queue_status_visible_at", k(["queue", "status", "visible_at"]), { - unique: false, - }); - } }, }, ]; diff --git a/packages/job-queue/src/common.ts b/packages/job-queue/src/common.ts index 57031b177..83b6e1c75 100644 --- a/packages/job-queue/src/common.ts +++ b/packages/job-queue/src/common.ts @@ -33,6 +33,7 @@ export * from "./queue-storage/InMemoryMessageQueue"; export * from "./queue-storage/InMemoryQueueStorage"; export * from "./queue-storage/TelemetryQueueStorage"; export * from "./queue-storage/createInMemoryQueue"; +export * from "./queue-storage/validateLeaseMs"; export * from "./queue-storage/wrapQueueStorage"; export * from "./rate-limiter-storage/IRateLimiterStorage"; diff --git a/packages/job-queue/src/job/JobQueueWorker.ts b/packages/job-queue/src/job/JobQueueWorker.ts index f4bbb5d92..cf04441fc 100644 --- a/packages/job-queue/src/job/JobQueueWorker.ts +++ b/packages/job-queue/src/job/JobQueueWorker.ts @@ -615,20 +615,31 @@ export class JobQueueWorker< }) : undefined; + let limiterReleased = false; try { // The limiter slot was already atomically reserved by tryAcquire() in - // the main loop (or processNext), so we no longer call recordJobStart - // here — doing so would double-count. + // the main loop (or processNext). We register the abort controller + // BEFORE validateJobState so the controller is observable from inside + // validateJobState (which checks activeJobAbortControllers for a + // pre-execute abort flag). Previously the controller was created + // AFTER validation, making that abort-before-execute branch dead code. + const abortController = this.createAbortController(job.id); + try { await this.validateJobState(job); } catch (validationErr) { - // Throw — the outer finally block's limiter.complete() will release - // the slot. Do NOT call limiter.release() here too; that would - // double-decrement the counter and admit one extra concurrent job. + // RateLimiter.complete() is a no-op on the success/error paths + // (the slot only ages out of the window naturally) — so on a + // validation failure here we MUST call release() to actually free + // the slot. Without this, a DEADLINE-EXCEEDED or pre-aborted job + // permanently consumed a RateLimiter window slot. The + // limiterReleased flag gates the outer finally's complete() call so + // we don't double-handle the slot. + await this.limiter.release(limiterToken); + limiterReleased = true; throw validationErr; } - const abortController = this.createAbortController(job.id); this.events.emit("job_start", job.id); let leaseInterval: ReturnType | undefined; @@ -701,13 +712,22 @@ export class JobQueueWorker< }); span?.setStatus(SpanStatusCode.UNSET); } + } else if (error instanceof JobDisabledError) { + // Route through disableJob so the row transitions to DISABLED + // (not FAILED). Without this branch, attempting to disable a job + // mid-flight clobbered the DISABLED status with FAILED and the + // H5 atomic-disable code path was unreachable. + await this.disableJob(job); + span?.setStatus(SpanStatusCode.UNSET); } else { await this.failJob(job, error); span?.setStatus(SpanStatusCode.ERROR, error.message); } span?.setAttributes({ "workglow.job.error": spanErrorMessage }); } finally { - await this.limiter.complete(limiterToken); + if (!limiterReleased) { + await this.limiter.complete(limiterToken); + } span?.end(); // Guard against a concurrent processSingleJob for the same jobId (which // can start before this finally block runs, e.g. after a reschedule). diff --git a/packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts b/packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts index 6a3b87320..abf00ce0a 100644 --- a/packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts +++ b/packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts @@ -20,6 +20,7 @@ import { QueueStorageOptions, QueueSubscribeOptions, } from "./IQueueStorage"; +import { validateLeaseMs } from "./validateLeaseMs"; /** * Event listeners for queue storage events @@ -147,6 +148,7 @@ export class InMemoryQueueStorage implements IQueueStorage | undefined> { await sleep(0); const leaseMs = opts?.leaseMs ?? 30000; + validateLeaseMs(leaseMs, "leaseMs"); const now = new Date().toISOString(); const leaseExpiry = new Date(Date.now() + leaseMs).toISOString(); @@ -192,6 +194,7 @@ export class InMemoryQueueStorage implements IQueueStorage { + validateLeaseMs(ms, "ms"); await sleep(0); const job = this.jobQueue.find((j) => j.id === id && this.matchesPrefixes(j)); if (!job || job.status !== JobStatus.PROCESSING || job.lease_owner !== workerId) { diff --git a/packages/job-queue/src/queue-storage/validateLeaseMs.ts b/packages/job-queue/src/queue-storage/validateLeaseMs.ts new file mode 100644 index 000000000..31c7a869e --- /dev/null +++ b/packages/job-queue/src/queue-storage/validateLeaseMs.ts @@ -0,0 +1,17 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Validates `leaseMs` / `extendLease ms` inputs across all job-queue + * backends. NaN/Infinity produce "Invalid Date" ISO strings that + * poison `lease_expires_at`; negative values immediately re-expire + * the lease a worker just claimed. `0` is permitted (instant expiry). + */ +export function validateLeaseMs(ms: number, fieldName = "leaseMs"): void { + if (!Number.isFinite(ms) || ms < 0) { + throw new RangeError(`${fieldName} must be a non-negative finite number; got ${ms}`); + } +} diff --git a/packages/test/src/test/job-queue/JobQueueWorker.test.ts b/packages/test/src/test/job-queue/JobQueueWorker.test.ts new file mode 100644 index 000000000..980becd1f --- /dev/null +++ b/packages/test/src/test/job-queue/JobQueueWorker.test.ts @@ -0,0 +1,258 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { IJobExecuteContext } from "@workglow/job-queue"; +import { + InMemoryQueueStorage, + InMemoryRateLimiterStorage, + Job, + JobDisabledError, + JobQueueClient, + JobQueueServer, + JobQueueWorker, + JobStatus, + RateLimiter, +} from "@workglow/job-queue"; +import { setLogger, sleep, uuid4 } from "@workglow/util"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { getTestingLogger } from "../../binding/TestingLogger"; + +interface TI { + readonly taskType?: string; + readonly data?: string; + readonly [key: string]: unknown; +} +interface TO { + readonly result?: string; + readonly [key: string]: unknown; +} + +/** + * Test job that supports a few input flavours. + * - `long_running` — hangs on a promise that only resolves when its signal fires. + * - `disable_on_abort` — throws JobDisabledError when its signal fires (used to + * exercise the JobDisabledError dispatch branch in processSingleJob's catch). + * - anything else — resolves immediately. + */ +class TJob extends Job { + public static executeCalls = 0; + + public override async execute(input: TI, context: IJobExecuteContext): Promise { + TJob.executeCalls += 1; + if (input.taskType === "disable_on_abort") { + return new Promise((_, reject) => { + context.signal.addEventListener( + "abort", + () => reject(new JobDisabledError(`Job ${String(input.data)} was disabled`)), + { once: true } + ); + }); + } + if (input.taskType === "long_running") { + return new Promise((_, reject) => { + context.signal.addEventListener("abort", () => reject(new Error("Aborted")), { + once: true, + }); + }); + } + return { result: "done" }; + } +} + +async function waitUntil( + predicate: () => boolean | Promise, + ceilingMs = 1000, + stepMs = 5 +): Promise { + const deadline = Date.now() + ceilingMs; + while (Date.now() < deadline) { + if (await predicate()) return true; + await sleep(stepMs); + } + return false; +} + +describe("JobQueueWorker — PR #511 follow-up regressions", () => { + setLogger(getTestingLogger()); + let storage: InMemoryQueueStorage; + let queueName: string; + + beforeEach(async () => { + TJob.executeCalls = 0; + queueName = `worker-followup-${uuid4()}`; + storage = new InMemoryQueueStorage(queueName); + await storage.migrate(); + }); + + afterEach(async () => { + await storage.deleteAll(); + }); + + it("DEADLINE-EXCEEDED job releases its RateLimiter slot", async () => { + // Two-slot rate limiter. Without the fix, a job whose deadline_at is in + // the past consumes one slot at tryAcquire and never returns it (the + // outer finally calls limiter.complete(), which is a no-op for + // RateLimiter). The fix calls limiter.release() before rethrowing + // the validation error. + const limiter = new RateLimiter(new InMemoryRateLimiterStorage(), queueName, { + maxExecutions: 2, + windowSizeInSeconds: 60, + }); + + const server = new JobQueueServer(TJob, { + storage: storage as any, + queueName, + pollIntervalMs: 5, + stopTimeoutMs: 0, + limiter, + }); + const client = new JobQueueClient({ storage: storage as any, queueName }); + client.attach(server); + + // Insert a job whose deadline is already in the past so validateJobState + // rejects with PermanentJobError ("has exceeded its deadline"). We use + // storage.add directly because client.send only accepts a positive + // timeoutSeconds. + const pastDeadline = new Date(Date.now() - 60_000).toISOString(); + const id = await storage.add({ + input: { taskType: "deadline-exceeded", data: "x" }, + visible_at: null, + completed_at: null, + deadline_at: pastDeadline, + } as any); + + await server.start(); + + // Wait for the job to leave PENDING (it should fail validation and reach + // FAILED very quickly). + const left = await waitUntil(async () => { + const j = await storage.get(id); + return !!j && j.status !== JobStatus.PENDING; + }); + expect(left).toBe(true); + + // Give the finally block a tick to run. + await sleep(20); + + // Both slots must still be free — the failed validation must not have + // burned a slot. + const t1 = await limiter.tryAcquire(); + const t2 = await limiter.tryAcquire(); + expect(t1).not.toBeNull(); + expect(t2).not.toBeNull(); + + await limiter.release(t1); + await limiter.release(t2); + + await server.stop(); + }); + + it("disabling a job mid-flight transitions it to DISABLED, not FAILED", async () => { + // The user task throws JobDisabledError when its abort signal fires — + // the realistic shape of consumer code that detects disablement on + // re-checking state. Without the JobDisabledError dispatch branch in + // processSingleJob's catch, this error fell into the generic failJob + // branch and clobbered status to FAILED. + const server = new JobQueueServer(TJob, { + storage: storage as any, + queueName, + pollIntervalMs: 5, + stopTimeoutMs: 0, + }); + const client = new JobQueueClient({ storage: storage as any, queueName }); + client.attach(server); + + await server.start(); + + const handle = await client.send({ taskType: "disable_on_abort", data: "disable-me" }); + + // Wait for PROCESSING. + const reached = await waitUntil(async () => { + const j = await storage.get(handle.id); + return j?.status === JobStatus.PROCESSING; + }); + expect(reached).toBe(true); + + // Flag the row as DISABLED in storage and signal the worker to abort it. + await storage.saveStatus(handle.id, JobStatus.DISABLED); + // Restore PROCESSING so the worker's checkForAbortingJobs (which peeks + // PROCESSING rows) observes abort_requested_at and fires the abort + // controller — the user task then throws JobDisabledError. + await storage.saveStatus(handle.id, JobStatus.PROCESSING); + await storage.abort(handle.id); + + // Wait for terminal state — DISABLED. + const disabled = await waitUntil(async () => { + const j = await storage.get(handle.id); + return j?.status === JobStatus.DISABLED; + }); + + await server.stop(); + expect(disabled).toBe(true); + + const final = await storage.get(handle.id); + expect(final?.status).toBe(JobStatus.DISABLED); + }); + + it("pre-execute abort flag is observed during validateJobState", async () => { + class PreAbortedWorker extends JobQueueWorker { + protected override createAbortController(jobId: unknown): AbortController { + const controller = super.createAbortController(jobId); + controller.abort(); + return controller; + } + } + + class PreAbortedServer extends JobQueueServer { + protected override createWorker(): JobQueueWorker { + return new PreAbortedWorker(this.jobClass, { + messageQueue: this.messageQueue, + jobStore: this.jobStore, + queueName: this.queueName, + limiter: this.limiter, + pollIntervalMs: this.pollIntervalMs, + stopTimeoutMs: this.stopTimeoutMs, + deadLetter: this.deadLetter, + prefetch: this.prefetch, + }); + } + } + + // Before the fix, createAbortController() ran AFTER validateJobState, so + // aborting the controller here would have been too late to trip the + // activeJobAbortControllers.get(...).signal.aborted branch. The worker + // would enter execute() instead of failing during validation. + const server = new PreAbortedServer(TJob, { + storage: storage as any, + queueName, + pollIntervalMs: 5, + stopTimeoutMs: 0, + }); + const client = new JobQueueClient({ storage: storage as any, queueName }); + client.attach(server); + + const handle = await client.send({ taskType: "long_running", data: "pre-abort" }); + + await server.start(); + + const reached = await waitUntil(async () => { + const j = await storage.get(handle.id); + return ( + j?.status === JobStatus.FAILED || + j?.status === JobStatus.COMPLETED || + j?.status === JobStatus.DISABLED + ); + }); + expect(reached).toBe(true); + + await server.stop(); + + const final = await storage.get(handle.id); + expect(final?.status).toBe(JobStatus.FAILED); + expect(final?.abort_requested_at).toBeTruthy(); + expect(TJob.executeCalls).toBe(0); + }); +}); diff --git a/packages/test/src/test/job-queue/genericJobQueueTests.ts b/packages/test/src/test/job-queue/genericJobQueueTests.ts index e27d37be9..e4605abb2 100644 --- a/packages/test/src/test/job-queue/genericJobQueueTests.ts +++ b/packages/test/src/test/job-queue/genericJobQueueTests.ts @@ -606,6 +606,59 @@ export function runGenericJobQueueTests( }); }); + describe("leaseMs / extendLease input validation (PR #511 follow-up)", () => { + // PR #511 added Number.isFinite guards to Supabase only; the other + // backends silently produced "Invalid Date" ISO strings (poisoning + // lease_expires_at) or runtime SQL errors. These tests assert the + // unified RangeError contract across every backend exercised by this + // generic suite. + it("next(): negative leaseMs rejects with RangeError", async () => { + await expect(storage.next("rangeerr-worker", { leaseMs: -1 })).rejects.toThrow(RangeError); + }); + + it("next(): NaN leaseMs rejects with RangeError", async () => { + await expect(storage.next("rangeerr-worker", { leaseMs: Number.NaN })).rejects.toThrow( + RangeError + ); + }); + + it("next(): Infinity leaseMs rejects with RangeError", async () => { + await expect( + storage.next("rangeerr-worker", { leaseMs: Number.POSITIVE_INFINITY }) + ).rejects.toThrow(RangeError); + }); + + it("next(): leaseMs === 0 is accepted (instant expiry)", async () => { + // 0 is permitted — it means "lease expires immediately", which is a + // valid (if unusual) configuration. The call must not throw a + // RangeError; it may legitimately return a job or undefined depending + // on what's enqueued. + let threw: unknown = null; + try { + await storage.next("zerolease-worker", { leaseMs: 0 }); + } catch (e) { + threw = e; + } + expect(threw).toBeNull(); + }); + + it("extendLease(): negative ms rejects with RangeError", async () => { + await expect(storage.extendLease("any-id", "any-worker", -1)).rejects.toThrow(RangeError); + }); + + it("extendLease(): NaN ms rejects with RangeError", async () => { + await expect(storage.extendLease("any-id", "any-worker", Number.NaN)).rejects.toThrow( + RangeError + ); + }); + + it("extendLease(): Infinity ms rejects with RangeError", async () => { + await expect( + storage.extendLease("any-id", "any-worker", Number.POSITIVE_INFINITY) + ).rejects.toThrow(RangeError); + }); + }); + describe("Same-process optimizations", () => { const itFastWake = skipFastWakeTests ? it.skip : it; @@ -1343,5 +1396,26 @@ export function runGenericJobQueueTests( expect(second?.attempts).toBe(attemptsBeforeReclaim + 1); expect(second?.abort_requested_at ?? null).toBe(null); }); + + it("abort(PENDING) does not bump attempts (cross-backend contract)", async () => { + // Regression for PR #511 follow-up: IndexedDbQueueStorage.abort(PENDING) + // previously routed through complete() which bumps attempts. The + // cross-backend contract (InMemory/Postgres) is that aborting a row + // the worker never claimed must NOT consume retry budget — the worker + // never actually attempted execution. + const handle = await client.send({ taskType: "task1", data: "abort-pending-no-bump" }); + const id = handle.id; + const before = await storage.get(id); + expect(before?.status).toBe(JobStatus.PENDING); + expect(before?.attempts ?? 0).toBe(0); + + await storage.abort(id); + + const after = await storage.get(id); + expect(after?.status).toBe(JobStatus.FAILED); + expect(after?.abort_requested_at).toBeTruthy(); + expect(after?.completed_at).toBeTruthy(); + expect(after?.attempts ?? 0).toBe(0); + }); }); } diff --git a/packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts b/packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts index 56611db31..e845bd85a 100644 --- a/packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts +++ b/packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts @@ -152,4 +152,153 @@ describe("indexedDB queue migrations: v1 → v2 rename + backfill", () => { }; }); }); + + it("v2 migrates all five legacy field renames", async () => { + // Regression for PR #511 follow-up: v2 previously only backfilled + // run_after → visible_at. The other four renames (run_attempts → attempts, + // last_ran_at → last_attempted_at, max_retries → max_attempts, + // worker_id → lease_owner) were silently dropped, so existing + // browser-deployed queues lost retry budgets, last-attempt timestamps, + // and lease ownership on first migration. + const dbName = `wglw_idb_qm5_${Math.random().toString(36).slice(2)}`; + const tableName = "jobs"; + + // ── (1) Hand-build a v1 DB with a row that uses all five legacy names. + await new Promise((resolve, reject) => { + const req = indexedDB.open(dbName, 2); + req.onerror = () => reject(req.error); + req.onupgradeneeded = () => { + const db = req.result; + if (!db.objectStoreNames.contains("_storage_migrations")) { + db.createObjectStore("_storage_migrations", { keyPath: ["component", "version"] }); + } + if (!db.objectStoreNames.contains(tableName)) { + const store = db.createObjectStore(tableName, { keyPath: "id" }); + store.createIndex("queue_status", ["queue", "status"], { unique: false }); + store.createIndex("queue_status_run_after", ["queue", "status", "run_after"], { + unique: false, + }); + store.createIndex("queue_job_run_id", ["queue", "job_run_id"], { unique: false }); + store.createIndex("queue_fingerprint_status", ["queue", "fingerprint", "status"], { + unique: false, + }); + } + }; + req.onsuccess = () => { + const db = req.result; + const tx = db.transaction(["_storage_migrations", tableName], "readwrite"); + tx.oncomplete = () => { + db.close(); + resolve(); + }; + tx.onerror = () => { + db.close(); + reject(tx.error); + }; + tx.objectStore("_storage_migrations").put({ + component: `queue:indexeddb:${tableName}`, + version: 1, + description: "Create queue object store + indexes", + appliedAt: new Date().toISOString(), + }); + // Seed a row with ALL five legacy field names populated, NO post-rename + // names. The migration must move every value and delete every old key. + tx.objectStore(tableName).put({ + id: 42, + queue: "test", + fingerprint: "fp42", + job_run_id: "jr42", + status: "PENDING", + input: "{}", + run_after: "2026-02-01T00:00:00.000Z", + run_attempts: 3, + last_ran_at: "2026-01-31T12:00:00.000Z", + max_retries: 7, + worker_id: "worker-legacy-1", + }); + }; + }); + + // ── (2) Run the migration chain. + const runner = new IndexedDbMigrationRunner(dbName); + await runner.run(indexedDbQueueMigrations(tableName, [])); + + // ── (3) Verify all five renames landed: new keys carry the values, + // legacy keys are deleted, and the queue_status_visible_at + // index can serve a range query keyed on the migrated + // [queue, status, visible_at] tuple. + await new Promise((resolve, reject) => { + const req = indexedDB.open(dbName); + req.onerror = () => reject(req.error); + req.onsuccess = () => { + const db = req.result; + try { + const tx = db.transaction([tableName], "readonly"); + const store = tx.objectStore(tableName); + + const getReq = store.get(42); + getReq.onsuccess = () => { + try { + const row = getReq.result as Record | undefined; + expect(row).toBeDefined(); + + // All five new keys carry the migrated values. + expect(row!.visible_at).toBe("2026-02-01T00:00:00.000Z"); + expect(row!.attempts).toBe(3); + expect(row!.last_attempted_at).toBe("2026-01-31T12:00:00.000Z"); + expect(row!.max_attempts).toBe(7); + expect(row!.lease_owner).toBe("worker-legacy-1"); + + // All five legacy keys are gone. + expect(row!.run_after).toBeUndefined(); + expect(row!.run_attempts).toBeUndefined(); + expect(row!.last_ran_at).toBeUndefined(); + expect(row!.max_retries).toBeUndefined(); + expect(row!.worker_id).toBeUndefined(); + } catch (e) { + reject(e); + return; + } + + // Index range query on the migrated tuple returns the row. + try { + const idx = store.index("queue_status_visible_at"); + const range = IDBKeyRange.bound( + ["test", "PENDING", "2026-01-01T00:00:00.000Z"], + ["test", "PENDING", "2026-12-31T00:00:00.000Z"] + ); + const cReq = idx.openCursor(range); + cReq.onsuccess = () => { + const cursor = cReq.result; + try { + expect(cursor).not.toBeNull(); + expect((cursor!.value as { id: number }).id).toBe(42); + } catch (e) { + reject(e); + } + }; + tx.oncomplete = () => { + db.close(); + resolve(); + }; + tx.onerror = () => { + db.close(); + reject(tx.error); + }; + } catch (e) { + db.close(); + reject(e); + } + }; + getReq.onerror = () => { + db.close(); + reject(getReq.error); + }; + } catch (e) { + db.close(); + reject(e); + } + }; + }); + }); }); diff --git a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts index ff565787d..a804ef5ac 100644 --- a/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts +++ b/packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts @@ -8,7 +8,7 @@ import { PGlite } from "@electric-sql/pglite"; import { postgresQueueMigrations } from "@workglow/postgres/job-queue"; import type { Pool } from "@workglow/postgres/storage"; import { PostgresMigrationRunner } from "@workglow/postgres/storage"; -import { sqliteQueueMigrations } from "@workglow/sqlite/job-queue"; +import { buildSqliteQueuePostV3TableSql, sqliteQueueMigrations } from "@workglow/sqlite/job-queue"; import { Sqlite, SqliteMigrationRunner } from "@workglow/sqlite/storage"; import { describe, expect, it } from "vitest"; @@ -131,3 +131,90 @@ describe("sqlite queue migrations: v1→v2→v3 schema parity", () => { } }); }); + +/** + * PR #511 follow-up: SQLite v3 originally renamed `max_retries → max_attempts` + * but did not adjust the column default, leaving SQLite fresh installs at + * `DEFAULT 23` while Postgres landed on `DEFAULT 10`. Callers omitting + * `maxAttempts` got divergent retry behavior across backends. This block + * compares the post-migration column DEFAULTS (not just the column names) + * so future drift is caught. + */ +describe("queue migrations: cross-backend default parity", () => { + it("max_attempts default is 10 on both Postgres and SQLite after v3", async () => { + // ── (1) Fresh PGlite + run all Postgres queue migrations. + const pg = new PGlite(); + // ── (2) Fresh SQLite :memory: + run all SQLite queue migrations. + await Sqlite.init(); + const sqlite = new Sqlite.Database(":memory:"); + try { + await new PostgresMigrationRunner(pg as unknown as Pool).run( + postgresQueueMigrations("jobs", []) + ); + await new SqliteMigrationRunner(sqlite).run(sqliteQueueMigrations("jobs", [])); + + // ── (3) Read Postgres defaults via information_schema.columns. + const pgRows = ( + await (pg as unknown as Pool).query<{ + column_name: string; + column_default: string | null; + }>( + `SELECT column_name, column_default FROM information_schema.columns + WHERE table_name = $1 AND table_schema = current_schema() + AND column_name IN ('max_attempts', 'attempts')`, + ["jobs"] + ) + ).rows; + const pgMaxAttempts = pgRows.find((r) => r.column_name === "max_attempts"); + const pgAttempts = pgRows.find((r) => r.column_name === "attempts"); + expect(pgMaxAttempts).toBeDefined(); + expect(pgAttempts).toBeDefined(); + + // ── (4) Read SQLite defaults via PRAGMA table_info(jobs). + type SqliteCol = { name: string; dflt_value: string | null }; + const sqliteRows = sqlite + .prepare<[], SqliteCol>(`PRAGMA table_info(jobs)`) + .all() + .filter((r: SqliteCol) => r.name === "max_attempts" || r.name === "attempts"); + const sqliteMaxAttempts = sqliteRows.find((r) => r.name === "max_attempts"); + const sqliteAttempts = sqliteRows.find((r) => r.name === "attempts"); + expect(sqliteMaxAttempts).toBeDefined(); + expect(sqliteAttempts).toBeDefined(); + + // ── (5) Both backends MUST agree on the integer values of the + // defaults. Number(...) coerces the SQL-literal string forms + // ("10", "0") into JS numbers so the comparison is back-end-agnostic. + expect(Number(pgMaxAttempts!.column_default)).toBe(10); + expect(Number(sqliteMaxAttempts!.dflt_value)).toBe(10); + + // Sanity check: `attempts` defaults to 0 on both backends. + expect(Number(pgAttempts!.column_default)).toBe(0); + expect(Number(sqliteAttempts!.dflt_value)).toBe(0); + } finally { + await pg.close(); + sqlite.close(); + } + }); +}); + +describe("sqlite queue migrations: canonical v3 schema", () => { + it("rebuilds the table from the explicit post-v3 CREATE TABLE statement", async () => { + await Sqlite.init(); + const sqlite = new Sqlite.Database(":memory:"); + try { + await new SqliteMigrationRunner(sqlite).run(sqliteQueueMigrations("jobs", [])); + + const row = sqlite + .prepare<[string], { readonly sql: string | null }>( + "SELECT sql FROM sqlite_schema WHERE type = 'table' AND name = ?" + ) + .get("jobs"); + expect(row?.sql).toBeDefined(); + + const normalizeSql = (sql: string): string => sql.replace(/\s+/g, " ").trim(); + expect(normalizeSql(row!.sql!)).toBe(normalizeSql(buildSqliteQueuePostV3TableSql("jobs", ""))); + } finally { + sqlite.close(); + } + }); +}); diff --git a/providers/postgres/src/job-queue/PostgresQueueStorage.ts b/providers/postgres/src/job-queue/PostgresQueueStorage.ts index c382ae812..4b2271675 100644 --- a/providers/postgres/src/job-queue/PostgresQueueStorage.ts +++ b/providers/postgres/src/job-queue/PostgresQueueStorage.ts @@ -12,7 +12,7 @@ import type { QueueStorageOptions, QueueSubscribeOptions, } from "@workglow/job-queue"; -import { JobStatus } from "@workglow/job-queue"; +import { JobStatus, validateLeaseMs } from "@workglow/job-queue"; import type { Pool } from "@workglow/postgres/storage"; import { assertPrefixesSafe, @@ -238,6 +238,7 @@ export class PostgresQueueStorage implements IQueueStorage | undefined> { const leaseMs = opts?.leaseMs ?? 30000; + validateLeaseMs(leaseMs, "leaseMs"); // Parameters: $1=PROCESSING, $2=now+leaseMs interval, $3=queue, $4=workerId, $5=PENDING, $6=PROCESSING, $7+=prefix params const { conditions: prefixConditions, params: prefixParams } = this.buildPrefixWhereClause(7); const result = await this.db.query< @@ -296,6 +297,7 @@ export class PostgresQueueStorage implements IQueueStorage { + validateLeaseMs(ms, "ms"); const { conditions: prefixConditions, params: prefixParams } = this.buildPrefixWhereClause(5); const result = await this.db.query( `UPDATE ${this.tableName} diff --git a/providers/sqlite/src/job-queue/SqliteQueueStorage.ts b/providers/sqlite/src/job-queue/SqliteQueueStorage.ts index 20345efee..1e9a3cf3e 100644 --- a/providers/sqlite/src/job-queue/SqliteQueueStorage.ts +++ b/providers/sqlite/src/job-queue/SqliteQueueStorage.ts @@ -12,7 +12,7 @@ import type { QueueStorageOptions, QueueSubscribeOptions, } from "@workglow/job-queue"; -import { JobStatus } from "@workglow/job-queue"; +import { JobStatus, validateLeaseMs } from "@workglow/job-queue"; import type { Sqlite } from "@workglow/sqlite/storage"; import { assertPrefixesSafe, @@ -358,6 +358,7 @@ export class SqliteQueueStorage implements IQueueStorage | undefined> { const now = new Date().toISOString(); const leaseMs = opts?.leaseMs ?? 30000; + validateLeaseMs(leaseMs, "leaseMs"); const leaseExpiry = new Date(Date.now() + leaseMs).toISOString(); const prefixConditions = this.buildPrefixWhereClause(); const prefixParams = this.getPrefixParamValues(); @@ -429,6 +430,7 @@ export class SqliteQueueStorage implements IQueueStorage { + validateLeaseMs(ms, "ms"); const leaseExpiry = new Date(Date.now() + ms).toISOString(); const prefixConditions = this.buildPrefixWhereClause(); const prefixParams = this.getPrefixParamValues(); diff --git a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts index 78e174851..79356697a 100644 --- a/providers/sqlite/src/migrations/sqliteQueueMigrations.ts +++ b/providers/sqlite/src/migrations/sqliteQueueMigrations.ts @@ -14,6 +14,49 @@ import { SqliteDialect, } from "@workglow/storage"; +export function buildSqliteQueuePostV3ColumnSql(prefixColumnsSql: string): string { + // buildPrefixColumnsSql(...) returns either "" or an already-indented, + // comma-terminated prefix block, so it is safe to splice directly into + // the canonical column list here. + return `${prefixColumnsSql}fingerprint TEXT NOT NULL, + queue TEXT NOT NULL, + job_run_id TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'PENDING', + input TEXT NOT NULL, + output TEXT, + attempts INTEGER DEFAULT 0, + max_attempts INTEGER DEFAULT 10, + visible_at TEXT NOT NULL, + last_attempted_at TEXT, + created_at TEXT NOT NULL, + completed_at TEXT, + deadline_at TEXT, + error TEXT, + error_code TEXT, + progress REAL DEFAULT 0, + progress_message TEXT DEFAULT '', + progress_details TEXT NULL, + lease_owner TEXT, + abort_requested_at TEXT, + lease_expires_at TEXT`; +} + +export function buildSqliteQueuePostV3ColumnList(prefixes: readonly PrefixColumn[]): string { + return `${prefixes.map((p) => p.name).join(", ")}${ + prefixes.length > 0 ? ", " : "" + }fingerprint, queue, job_run_id, status, input, output, attempts, max_attempts, visible_at, last_attempted_at, created_at, completed_at, deadline_at, error, error_code, progress, progress_message, progress_details, lease_owner, abort_requested_at, lease_expires_at`; +} + +export function buildSqliteQueuePostV3TableSql( + tableName: string, + prefixColumnsSql: string +): string { + return `CREATE TABLE "${tableName}" ( + id INTEGER PRIMARY KEY, + ${buildSqliteQueuePostV3ColumnSql(prefixColumnsSql)} + )`; +} + /** * Initial migration set for the SQLite queue table identified by `tableName`. * @@ -32,6 +75,7 @@ export function sqliteQueueMigrations( const prefixColumnsSql = buildPrefixColumnsSql(SqliteDialect, prefixes); const prefixIndexPrefix = getPrefixIndexPrefix(prefixes); const indexSuffix = getPrefixIndexSuffix(prefixes); + const postV3ColumnList = buildSqliteQueuePostV3ColumnList(prefixes); return [ { @@ -84,15 +128,14 @@ export function sqliteQueueMigrations( component, version: 3, description: - "Rename run_after→visible_at, last_ran_at→last_attempted_at, run_attempts→attempts, max_retries→max_attempts, worker_id→lease_owner; drop run_after-keyed index and recreate visible_at-keyed", + "Rename run_after→visible_at, last_ran_at→last_attempted_at, run_attempts→attempts, max_retries→max_attempts, worker_id→lease_owner; drop run_after-keyed index and recreate visible_at-keyed; lower max_attempts DEFAULT 23→10 to match Postgres parity", up(db: Sqlite.Database) { // PRAGMA table_info guards each rename so fresh installs (which // arrive at v3 having just created the v1 schema in this same // migration run) are no-ops here. - const cols: string[] = db - .prepare<[], { name: string }>(`PRAGMA table_info(${tableName})`) - .all() - .map((r) => r.name); + type ColInfo = { readonly name: string }; + const colInfos: ColInfo[] = db.prepare<[], ColInfo>(`PRAGMA table_info(${tableName})`).all(); + const cols: string[] = colInfos.map((r) => r.name); const renames: [string, string][] = [ ["run_after", "visible_at"], ["last_ran_at", "last_attempted_at"], @@ -114,6 +157,38 @@ export function sqliteQueueMigrations( DROP INDEX IF EXISTS job_queue_fetcher${indexSuffix}_idx; CREATE INDEX IF NOT EXISTS job_queue_fetcher${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, status, visible_at); `); + + // Postgres v3 explicitly applied `ALTER COLUMN max_attempts SET + // DEFAULT 10`; SQLite has no `ALTER COLUMN ... SET DEFAULT` syntax, + // so the original v3 migration left the SQLite default at 23 from + // v1's CREATE TABLE. Fresh SQLite installs ended up at default 23 + // while Postgres was 10 — callers omitting `maxAttempts` got + // divergent retry behavior across backends. Rebuild the table with + // the correct default using SQLite's documented 12-step procedure + // (https://www.sqlite.org/lang_altertable.html#otheralter). + const tableSqlRow = db + .prepare<[string], { readonly sql: string | null }>( + "SELECT sql FROM sqlite_schema WHERE type = 'table' AND name = ?" + ) + .get(tableName); + if (tableSqlRow?.sql && !tableSqlRow.sql.includes("max_attempts INTEGER DEFAULT 10")) { + // Rebuild from the canonical post-v3 CREATE TABLE statement instead + // of reconstructing from PRAGMA table_info. table_info drops + // metadata such as explicit NULL/COLLATE/CHECK clauses, so deriving + // DDL from it would silently erase future schema details during this + // rebuild step. + const newTable = `${tableName}__new_v3`; + db.exec(` + ${buildSqliteQueuePostV3TableSql(`${tableName}__new_v3`, prefixColumnsSql)}; + INSERT INTO ${newTable} (${postV3ColumnList}) SELECT ${postV3ColumnList} FROM ${tableName}; + DROP TABLE ${tableName}; + ALTER TABLE ${newTable} RENAME TO ${tableName}; + + CREATE INDEX IF NOT EXISTS job_queue_fetcher${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, status, visible_at); + CREATE INDEX IF NOT EXISTS job_queue_fingerprint${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, fingerprint, status); + CREATE INDEX IF NOT EXISTS job_queue_job_run_id${indexSuffix}_idx ON ${tableName} (${prefixIndexPrefix}queue, job_run_id); + `); + } }, }, ]; diff --git a/providers/supabase/src/job-queue/SupabaseQueueStorage.ts b/providers/supabase/src/job-queue/SupabaseQueueStorage.ts index 7aad321a6..5856278e9 100644 --- a/providers/supabase/src/job-queue/SupabaseQueueStorage.ts +++ b/providers/supabase/src/job-queue/SupabaseQueueStorage.ts @@ -14,7 +14,7 @@ import type { QueueStorageOptions, QueueSubscribeOptions, } from "@workglow/job-queue"; -import { JobStatus } from "@workglow/job-queue"; +import { JobStatus, validateLeaseMs } from "@workglow/job-queue"; import { PollingSubscriptionManager } from "@workglow/storage"; import { createServiceToken, deepEqual, makeFingerprint, uuid4 } from "@workglow/util"; @@ -369,9 +369,7 @@ export class SupabaseQueueStorage implements IQueueStorage | undefined> { const leaseMs = opts?.leaseMs ?? 30000; - if (!Number.isFinite(leaseMs) || leaseMs < 0) { - throw new Error(`Invalid leaseMs: ${leaseMs}`); - } + validateLeaseMs(leaseMs, "leaseMs"); const prefixConditions = this.buildPrefixWhereSql(); const validatedQueueName = this.validateSqlValue(this.queueName, "queueName"); const validatedWorkerId = this.validateSqlValue(workerId, "workerId"); @@ -425,15 +423,15 @@ export class SupabaseQueueStorage implements IQueueStorage { + // Validate lease arg FIRST so callers get a consistent RangeError across + // backends regardless of whether the id happens to be invalid too. + validateLeaseMs(ms, "ms"); const validatedWorkerId = this.validateSqlValue(workerId, "workerId"); const escapedWorkerId = this.escapeSqlString(validatedWorkerId); const numericId = Number(id); if (!Number.isFinite(numericId)) { throw new Error(`Invalid job id: ${id}`); } - if (!Number.isFinite(ms) || ms < 0) { - throw new Error(`Invalid lease extension ms: ${ms}`); - } const prefixConditions = this.buildPrefixWhereSql();