Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type {
QueueStorageOptions,
QueueSubscribeOptions,
} from "@workglow/job-queue";
import { JobStatus } from "@workglow/job-queue";
import { JobStatus, validateLeaseMs } from "@workglow/job-queue";
import { HybridSubscriptionManager } from "@workglow/storage";
import { createServiceToken, deepEqual, makeFingerprint, uuid4 } from "@workglow/util";
import { IndexedDbMigrationRunner } from "../migrations/IndexedDbMigrationRunner";
Expand Down Expand Up @@ -266,11 +266,12 @@ export class IndexedDbQueueStorage<Input, Output> implements IQueueStorage<Input
workerId: string,
opts?: { leaseMs?: number }
): Promise<JobStorageFormat<Input, Output> | undefined> {
const leaseMs = opts?.leaseMs ?? 30000;
validateLeaseMs(leaseMs, "leaseMs");
const db = await this.getDb();
const tx = db.transaction(this.tableName, "readwrite");
const store = tx.objectStore(this.tableName);
const now = new Date().toISOString();
const leaseMs = opts?.leaseMs ?? 30000;
const leaseExpiry = new Date(Date.now() + leaseMs).toISOString();
const prefixKeyValues = this.getPrefixKeyValues();

Expand Down Expand Up @@ -421,6 +422,7 @@ export class IndexedDbQueueStorage<Input, Output> implements IQueueStorage<Input
* @param ms - Number of milliseconds to extend the lease by
*/
public async extendLease(id: unknown, workerId: string, ms: number): Promise<void> {
validateLeaseMs(ms, "ms");
const job = await this.get(id);
if (!job || job.status !== JobStatus.PROCESSING || job.lease_owner !== workerId) {
throw new Error(
Expand Down Expand Up @@ -537,7 +539,10 @@ export class IndexedDbQueueStorage<Input, Output> implements IQueueStorage<Input
job.status = JobStatus.FAILED;
job.abort_requested_at = now;
job.completed_at = now;
await this.complete(job);
// Use put() (not complete()) so attempts is NOT bumped — the worker
// never actually attempted this job. Matches the cross-backend
// contract verified in InMemory/Postgres.
await this.put(job);
} else if (job.status === JobStatus.PROCESSING) {
job.abort_requested_at = now;
await this.put(job);
Expand Down
68 changes: 50 additions & 18 deletions packages/indexeddb/src/migrations/indexedDbQueueMigrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export function indexedDbQueueMigrations(
component,
version: 2,
description:
"Rename queue_status_run_after → queue_status_visible_at; backfill run_after → visible_at",
"Rename queue_status_run_after → queue_status_visible_at; backfill five legacy field renames (run_after → visible_at, run_attempts → attempts, last_ran_at → last_attempted_at, max_retries → max_attempts, worker_id → lease_owner)",
up({ tx }) {
// IDB upgrade transactions auto-commit as soon as `up()` returns —
// we MUST NOT `await` anything between IDB operations. All cursor
Expand All @@ -73,30 +73,62 @@ export function indexedDbQueueMigrations(
store.deleteIndex("queue_status_run_after");
}

// Walk every row and copy `run_after` → `visible_at`. `openCursor()`
// returns a request whose `onsuccess` fires once per row plus a
// final time with `cursor === null`; the upgrade tx stays open as
// long as new requests are issued from the current callback.
// Walk every row and migrate all five legacy field renames in a
// single cursor pass. The storage layer reads the post-rename names
// (visible_at, attempts, last_attempted_at, max_attempts,
// lease_owner); without backfilling each one, existing
// browser-deployed queues silently lose retry budgets, last-attempt
// timestamps, and lease ownership on first migration.
//
// The createIndex call for queue_status_visible_at is deferred into
// the terminal `cursor === null` branch so the index is built off
// the post-migration rows rather than racing the cursor walk.
const cursorReq = store.openCursor();
cursorReq.onsuccess = () => {
const cursor = cursorReq.result;
if (!cursor) return;
if (!cursor) {
// Recreate the renamed index under the new key path. Skip if a
// previous partial run already created it — IDB forbids two
// indexes sharing a name and would throw mid-upgrade.
if (!Array.from(store.indexNames).includes("queue_status_visible_at")) {
store.createIndex("queue_status_visible_at", k(["queue", "status", "visible_at"]), {
unique: false,
});
}
return;
}
const row = cursor.value as Record<string, unknown> | null;
if (row && row.visible_at === undefined && row.run_after !== undefined) {
row.visible_at = row.run_after;
cursor.update(row);
if (row) {
let dirty = false;
if (row.run_after !== undefined && row.visible_at === undefined) {
row.visible_at = row.run_after;
delete row.run_after;
dirty = true;
}
if (row.run_attempts !== undefined && row.attempts === undefined) {
row.attempts = row.run_attempts;
delete row.run_attempts;
dirty = true;
}
if (row.last_ran_at !== undefined && row.last_attempted_at === undefined) {
row.last_attempted_at = row.last_ran_at;
delete row.last_ran_at;
dirty = true;
}
if (row.max_retries !== undefined && row.max_attempts === undefined) {
row.max_attempts = row.max_retries;
delete row.max_retries;
dirty = true;
}
if (row.worker_id !== undefined && row.lease_owner === undefined) {
row.lease_owner = row.worker_id;
delete row.worker_id;
dirty = true;
}
if (dirty) cursor.update(row);
}
cursor.continue();
};

// Recreate the index under the new name keyed on `visible_at`.
// Skip if a previous partial run already created it (defensive — IDB
// forbids two indexes sharing a name and would throw mid-upgrade).
if (!Array.from(store.indexNames).includes("queue_status_visible_at")) {
store.createIndex("queue_status_visible_at", k(["queue", "status", "visible_at"]), {
unique: false,
});
}
},
},
];
Expand Down
1 change: 1 addition & 0 deletions packages/job-queue/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export * from "./queue-storage/InMemoryMessageQueue";
export * from "./queue-storage/InMemoryQueueStorage";
export * from "./queue-storage/TelemetryQueueStorage";
export * from "./queue-storage/createInMemoryQueue";
export * from "./queue-storage/validateLeaseMs";
export * from "./queue-storage/wrapQueueStorage";

export * from "./rate-limiter-storage/IRateLimiterStorage";
Expand Down
34 changes: 27 additions & 7 deletions packages/job-queue/src/job/JobQueueWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,20 +615,31 @@ export class JobQueueWorker<
})
: undefined;

let limiterReleased = false;
try {
// The limiter slot was already atomically reserved by tryAcquire() in
// the main loop (or processNext), so we no longer call recordJobStart
// here — doing so would double-count.
// the main loop (or processNext). We register the abort controller
// BEFORE validateJobState so the controller is observable from inside
// validateJobState (which checks activeJobAbortControllers for a
// pre-execute abort flag). Previously the controller was created
// AFTER validation, making that abort-before-execute branch dead code.
const abortController = this.createAbortController(job.id);

try {
await this.validateJobState(job);
} catch (validationErr) {
// Throw — the outer finally block's limiter.complete() will release
// the slot. Do NOT call limiter.release() here too; that would
// double-decrement the counter and admit one extra concurrent job.
// RateLimiter.complete() is a no-op on the success/error paths
// (the slot only ages out of the window naturally) — so on a
// validation failure here we MUST call release() to actually free
// the slot. Without this, a DEADLINE-EXCEEDED or pre-aborted job
// permanently consumed a RateLimiter window slot. The
// limiterReleased flag gates the outer finally's complete() call so
// we don't double-handle the slot.
await this.limiter.release(limiterToken);
limiterReleased = true;
throw validationErr;
}

const abortController = this.createAbortController(job.id);
this.events.emit("job_start", job.id);

let leaseInterval: ReturnType<typeof setInterval> | undefined;
Expand Down Expand Up @@ -701,13 +712,22 @@ export class JobQueueWorker<
});
span?.setStatus(SpanStatusCode.UNSET);
}
} else if (error instanceof JobDisabledError) {
// Route through disableJob so the row transitions to DISABLED
// (not FAILED). Without this branch, attempting to disable a job
// mid-flight clobbered the DISABLED status with FAILED and the
// H5 atomic-disable code path was unreachable.
await this.disableJob(job);
span?.setStatus(SpanStatusCode.UNSET);
} else {
await this.failJob(job, error);
span?.setStatus(SpanStatusCode.ERROR, error.message);
}
span?.setAttributes({ "workglow.job.error": spanErrorMessage });
} finally {
await this.limiter.complete(limiterToken);
if (!limiterReleased) {
await this.limiter.complete(limiterToken);
}
span?.end();
// Guard against a concurrent processSingleJob for the same jobId (which
// can start before this finally block runs, e.g. after a reschedule).
Expand Down
3 changes: 3 additions & 0 deletions packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
QueueStorageOptions,
QueueSubscribeOptions,
} from "./IQueueStorage";
import { validateLeaseMs } from "./validateLeaseMs";

/**
* Event listeners for queue storage events
Expand Down Expand Up @@ -147,6 +148,7 @@ export class InMemoryQueueStorage<Input, Output> implements IQueueStorage<Input,
): Promise<JobStorageFormat<Input, Output> | undefined> {
await sleep(0);
const leaseMs = opts?.leaseMs ?? 30000;
validateLeaseMs(leaseMs, "leaseMs");
const now = new Date().toISOString();
const leaseExpiry = new Date(Date.now() + leaseMs).toISOString();

Expand Down Expand Up @@ -192,6 +194,7 @@ export class InMemoryQueueStorage<Input, Output> implements IQueueStorage<Input,
* @param ms - Number of milliseconds to extend the lease by
*/
public async extendLease(id: unknown, workerId: string, ms: number): Promise<void> {
validateLeaseMs(ms, "ms");
await sleep(0);
const job = this.jobQueue.find((j) => j.id === id && this.matchesPrefixes(j));
if (!job || job.status !== JobStatus.PROCESSING || job.lease_owner !== workerId) {
Expand Down
17 changes: 17 additions & 0 deletions packages/job-queue/src/queue-storage/validateLeaseMs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* @license
* Copyright 2026 Steven Roussey <sroussey@gmail.com>
* SPDX-License-Identifier: Apache-2.0
*/

/**
* Validates `leaseMs` / `extendLease ms` inputs across all job-queue
* backends. NaN/Infinity produce "Invalid Date" ISO strings that
* poison `lease_expires_at`; negative values immediately re-expire
* the lease a worker just claimed. `0` is permitted (instant expiry).
*/
export function validateLeaseMs(ms: number, fieldName = "leaseMs"): void {
if (!Number.isFinite(ms) || ms < 0) {
throw new RangeError(`${fieldName} must be a non-negative finite number; got ${ms}`);
}
}
Loading
Loading