refactor(job-queue): drop legacy limiter methods, fix QueuedExecutionStrategy release, rename releaseClaim#511
Merged
Merged
Conversation
4737b39 to
fec8ccd
Compare
@workglow/cli
@workglow/ai
@workglow/browser-control
@workglow/indexeddb
@workglow/javascript
@workglow/job-queue
@workglow/knowledge-base
@workglow/mcp
@workglow/storage
@workglow/task-graph
@workglow/tasks
@workglow/util
workglow
@workglow/anthropic
@workglow/bun-webview
@workglow/chrome-ai
@workglow/electron
@workglow/google-gemini
@workglow/huggingface-inference
@workglow/huggingface-transformers
@workglow/node-llama-cpp
@workglow/ollama
@workglow/openai
@workglow/playwright
@workglow/postgres
@workglow/sqlite
@workglow/supabase
@workglow/tf-mediapipe
commit: |
@workglow/cli
@workglow/ai
@workglow/browser-control
@workglow/indexeddb
@workglow/javascript
@workglow/job-queue
@workglow/knowledge-base
@workglow/mcp
@workglow/storage
@workglow/task-graph
@workglow/tasks
@workglow/util
workglow
@workglow/anthropic
@workglow/bun-webview
@workglow/chrome-ai
@workglow/electron
@workglow/google-gemini
@workglow/huggingface-inference
@workglow/huggingface-transformers
@workglow/node-llama-cpp
@workglow/ollama
@workglow/openai
@workglow/playwright
@workglow/postgres
@workglow/sqlite
@workglow/supabase
@workglow/tf-mediapipe
commit: |
Coverage Report
File CoverageNo changed files found. |
Contributor
There was a problem hiding this comment.
Pull request overview
This PR refactors the job-queue subsystem toward a leased-claim model and a split “message queue + job store” facade, while removing legacy limiter APIs and the ABORTING job state. It updates all storage providers/migrations and adjusts tests/callers to the new naming (visible_at, attempts, max_attempts, lease_owner, etc.) and limiter semantics.
Changes:
- Introduces
IMessageQueue,IJobStore, andIClaim(plus factories/wrappers) and updatesJobQueueClient/Server/Workerto use the split abstractions (with a legacystoragewrapper). - Replaces ABORTING with
abort_requested_at, adds lease expiry +extendLease, and renames core queue columns (run_after→visible_at,max_retries→max_attempts, etc.) across providers + migrations. - Removes legacy limiter methods (
canProceed/recordJobStart/recordJobCompletion) in favor oftryAcquire/release/complete, and updates limiters/tests accordingly.
Reviewed changes
Copilot reviewed 62 out of 63 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/supabase/src/job-queue/SupabaseQueueStorage.ts | Supabase queue schema rename + lease/abort support + claim release rename. |
| providers/supabase/src/job-queue/SupabaseMessageQueue.ts | New Supabase IMessageQueue wrapper with claims + buffered writes. |
| providers/supabase/src/job-queue/SupabaseJobStore.ts | New Supabase IJobStore wrapper with buffered result/error staging. |
| providers/supabase/src/job-queue/createSupabaseQueue.ts | Factory returning paired messageQueue/jobStore/core for Supabase. |
| providers/supabase/src/job-queue/common.ts | Re-exports new Supabase queue façade and factory. |
| providers/sqlite/src/migrations/sqliteQueueMigrations.ts | Updates SQLite queue migrations for renamed columns + lease/abort columns. |
| providers/sqlite/src/job-queue/SqliteQueueStorage.ts | SQLite queue storage updated for renamed columns + lease reclaim + extendLease + abort behavior. |
| providers/sqlite/src/job-queue/SqliteMessageQueue.ts | New SQLite IMessageQueue wrapper with claims + buffered writes. |
| providers/sqlite/src/job-queue/SqliteJobStore.ts | New SQLite IJobStore wrapper with buffered result/error staging. |
| providers/sqlite/src/job-queue/createSqliteQueue.ts | Factory returning paired messageQueue/jobStore/core for SQLite. |
| providers/sqlite/src/job-queue/common.ts | Re-exports new SQLite queue façade and factory. |
| providers/postgres/src/migrations/postgresQueueMigrations.ts | Updates Postgres queue migrations for renamed columns + lease/abort columns. |
| providers/postgres/src/job-queue/PostgresQueueStorage.ts | Postgres queue storage updated for renamed columns + lease reclaim + extendLease + abort behavior. |
| providers/postgres/src/job-queue/PostgresMessageQueue.ts | New Postgres IMessageQueue wrapper with claims + buffered writes. |
| providers/postgres/src/job-queue/PostgresJobStore.ts | New Postgres IJobStore wrapper with buffered result/error staging. |
| providers/postgres/src/job-queue/createPostgresQueue.ts | Factory returning paired messageQueue/jobStore/core for Postgres. |
| providers/postgres/src/job-queue/common.ts | Re-exports new Postgres queue façade and factory. |
| packages/test/src/test/task/FetchTask.test.ts | Updates test client API from submit to send. |
| packages/test/src/test/task-graph-job-queue/genericTaskGraphJobQueueTests.ts | Updates queue client options (maxRetries→maxAttempts, submit→send). |
| packages/test/src/test/job-queue/TelemetryQueueStorage.test.ts | Updates job shape field (run_after→visible_at). |
| packages/test/src/test/job-queue/RateLimiter.test.ts | Removes tests for deleted legacy limiter methods. |
| packages/test/src/test/job-queue/Limiters.test.ts | Updates limiter tests to tryAcquire/release/clear semantics. |
| packages/test/src/test/job-queue/InMemoryJobQueue.test.ts | Adds coverage for abort_requested_at + lease expiry + DLQ + (claimed) prefetch scenarios. |
| packages/test/src/test/job-queue/genericQueueStorageSubscriptionTests.ts | Updates job shape field (run_after→visible_at) throughout. |
| packages/test/src/test/job-queue/genericPrefixedQueueStorageTests.ts | Updates job shape field (run_after→visible_at) and related comments. |
| packages/test/src/test/job-queue/genericJobQueueTests.ts | Updates client API (send), retries naming, and ABORTING expectations. |
| packages/tasks/src/task/FetchUrlTask.ts | Updates queue client API to send and maxAttempts. |
| packages/job-queue/src/queue-storage/wrapQueueStorage.ts | New adapter that wraps legacy IQueueStorage into messageQueue/jobStore + claims. |
| packages/job-queue/src/queue-storage/TelemetryQueueStorage.ts | Traces new extendLease and renames release→releaseClaim; updates next signature. |
| packages/job-queue/src/queue-storage/IQueueStorage.ts | Removes ABORTING, renames core fields, adds lease/abort fields + extendLease + releaseClaim. |
| packages/job-queue/src/queue-storage/InMemoryQueueStorage.ts | Implements lease expiry reclaim, abort_requested_at behavior, extendLease, renamed fields. |
| packages/job-queue/src/queue-storage/InMemoryMessageQueue.ts | New in-memory IMessageQueue wrapper with claims + buffered writes. |
| packages/job-queue/src/queue-storage/InMemoryJobStore.ts | New in-memory IJobStore wrapper with buffered result/error staging. |
| packages/job-queue/src/queue-storage/IMessageQueue.ts | New IMessageQueue interface + send options. |
| packages/job-queue/src/queue-storage/IJobStore.ts | New IJobStore interface for read/mutation operations. |
| packages/job-queue/src/queue-storage/IClaim.ts | New claim abstraction with ack/retry/fail/extendLease. |
| packages/job-queue/src/queue-storage/createInMemoryQueue.ts | Factory returning paired in-memory messageQueue/jobStore/core. |
| packages/job-queue/src/limiter/RateLimiter.ts | Removes legacy APIs; adds complete() no-op; relies on storage atomic reserve. |
| packages/job-queue/src/limiter/NullLimiter.ts | Removes legacy APIs; adds complete() no-op. |
| packages/job-queue/src/limiter/ILimiter.ts | Removes legacy APIs; adds complete(token) semantics distinct from release(token). |
| packages/job-queue/src/limiter/EvenlySpacedRateLimiter.ts | Removes legacy start/completion hooks; adds complete() no-op; keeps atomic tryAcquire. |
| packages/job-queue/src/limiter/DelayLimiter.ts | Removes legacy APIs; adds complete() no-op; keeps rollback-capable release(token). |
| packages/job-queue/src/limiter/ConcurrencyLimiter.ts | Removes legacy APIs; implements complete() to decrement running count. |
| packages/job-queue/src/limiter/CompositeLimiter.ts | Removes legacy APIs; adds complete() propagation to children. |
| packages/job-queue/src/job/MessageQueueClient.ts | New producer-only client abstraction over IMessageQueue. |
| packages/job-queue/src/job/JobStorageConverters.ts | Renames fields in storage↔class conversions; maps worker_id→lease_owner; adds lease/abort fields. |
| packages/job-queue/src/job/JobQueueWorker.ts | Switches to messageQueue/jobStore, adds lease extension, DLQ support, prefetch loop, abort polling by abort_requested_at. |
| packages/job-queue/src/job/JobQueueServer.ts | Switches to messageQueue/jobStore with legacy wrapper; removes restart fixup in favor of lease reclaim; adds DLQ plumbing. |
| packages/job-queue/src/job/JobQueueEventListeners.ts | Renames retry event payload to visibleAt. |
| packages/job-queue/src/job/JobQueueClient.ts | Switches to messageQueue/jobStore with legacy wrapper; renames submit→send and runAfter→delaySeconds, maxRetries→maxAttempts. |
| packages/job-queue/src/job/Job.ts | Renames fields (visibleAt/attempts/maxAttempts/leaseOwner) and adds abort/lease timestamps. |
| packages/job-queue/src/job/DeadLetter.ts | Adds dead-letter payload type. |
| packages/job-queue/src/common.ts | Re-exports new queue abstractions, factories, and MessageQueueClient/DeadLetter. |
| packages/indexeddb/src/migrations/indexedDbQueueMigrations.ts | Renames IndexedDB index to visible_at. |
| packages/indexeddb/src/job-queue/IndexedDbQueueStorage.ts | Updates IndexedDB queue for renamed fields + lease reclaim + extendLease + abort semantics. |
| packages/indexeddb/src/job-queue/IndexedDbMessageQueue.ts | New IndexedDB IMessageQueue wrapper with claims + buffered writes. |
| packages/indexeddb/src/job-queue/IndexedDbJobStore.ts | New IndexedDB IJobStore wrapper with buffered result/error staging. |
| packages/indexeddb/src/job-queue/createIndexedDbQueue.ts | Factory returning paired IndexedDB messageQueue/jobStore/core. |
| packages/indexeddb/src/job-queue/common.ts | Re-exports new IndexedDB queue façade and factory. |
| packages/ai/src/job/AiJob.ts | Removes ABORTING checks; relies on AbortSignal only. |
| packages/ai/src/execution/QueuedExecutionStrategy.ts | Updates limiter acquisition to return token and release on exit. |
| examples/web/src/status/QueueStatus.tsx | Removes ABORTING status display. |
| .gitignore | Ignores .claude/worktrees/. |
Comments suppressed due to low confidence (1)
providers/supabase/src/job-queue/SupabaseQueueStorage.ts:430
- extendLease() interpolates ms into SQL via Number(ms) but does not validate ms is finite and non-negative. This can lead to invalid SQL or shortening leases unexpectedly. Consider validating ms similarly to id (Number.isFinite, >=0) before issuing the UPDATE.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+832
to
+836
| const existing = await this.jobStore.get(job.id); | ||
| const claim = this.requireClaim(job.id); | ||
| if (claim) { | ||
| await claim.fail(); | ||
| } |
| await this.storage.complete(this.classToStorage(job)); | ||
| this.events.emit("job_retry", job.id, job.runAfter); | ||
| const claim = this.requireClaim(job.id); | ||
| const delaySeconds = Math.max(0, Math.floor((job.visibleAt.getTime() - Date.now()) / 1000)); |
Comment on lines
+758
to
+761
| /** Internal — resolve the active claim for a job id, throw if missing. */ | ||
| private requireClaim(jobId: unknown): IClaim<JobStorageFormat<Input, Output>> | undefined { | ||
| return this.activeClaims.get(jobId); | ||
| } |
| ); | ||
| } finally { | ||
| await limiter.recordJobCompletion(); | ||
| await limiter.release(token); |
Comment on lines
+379
to
+383
| UPDATE ${this.tableName} | ||
| SET status = '${JobStatus.PROCESSING}', last_ran_at = NOW() AT TIME ZONE 'UTC', worker_id = '${escapedWorkerId}' | ||
| SET status = '${JobStatus.PROCESSING}', | ||
| last_attempted_at = NOW() AT TIME ZONE 'UTC', | ||
| lease_owner = '${escapedWorkerId}', | ||
| lease_expires_at = NOW() AT TIME ZONE 'UTC' + (${Number(leaseMs)} * INTERVAL '1 millisecond') |
Comment on lines
34
to
37
| options?: { | ||
| /** @deprecated renamed to includeLeaseOwner; both names accepted */ | ||
| readonly includeWorkerId?: boolean; | ||
| } |
Comment on lines
+219
to
+223
| EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN run_after TO visible_at'; | ||
| EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN last_ran_at TO last_attempted_at'; | ||
| EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN run_attempts TO attempts'; | ||
| EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN max_retries TO max_attempts'; | ||
| EXECUTE 'ALTER TABLE ${tableName} RENAME COLUMN worker_id TO lease_owner'; |
8 tasks
Comment on lines
+206
to
+211
| // --------------------------------------------------------------------------- | ||
| // Minimal in-memory IMessageQueue for DLQ testing | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| import type { IClaim, IMessageQueue, MessageId } from "@workglow/job-queue"; | ||
|
|
| job.status = JobStatus.FAILED; | ||
| job.abort_requested_at = now; | ||
| job.completed_at = now; | ||
| await this.complete(job); |
Comment on lines
622
to
628
| try { | ||
| await this.validateJobState(job); | ||
| } catch (validationErr) { | ||
| // Validation failed before we ran any actual work — release THIS | ||
| // limiter slot (by token, not by recency) so it doesn't count toward | ||
| // the rate limit and we don't accidentally release another worker's | ||
| // slot. | ||
| try { | ||
| await this.limiter.release(limiterToken); | ||
| slotReleased = true; | ||
| } catch { | ||
| // best-effort | ||
| } | ||
| // Throw — the outer finally block's limiter.complete() will release | ||
| // the slot. Do NOT call limiter.release() here too; that would | ||
| // double-decrement the counter and admit one extra concurrent job. | ||
| throw validationErr; |
Comment on lines
+238
to
+242
| if (!current) return; | ||
| // Re-use complete() but preserve attempts by subtracting 1 to offset the | ||
| // mandatory increment in legacy storage backends. | ||
| await this.storage.complete({ ...current, status, attempts: (current.attempts ?? 1) - 1 }); | ||
| } |
Comment on lines
+231
to
+234
| `ALTER TABLE ${this.tableName} RENAME COLUMN last_ran_at TO last_attempted_at`, | ||
| `ALTER TABLE ${this.tableName} RENAME COLUMN run_attempts TO attempts`, | ||
| `ALTER TABLE ${this.tableName} RENAME COLUMN max_retries TO max_attempts`, | ||
| `ALTER TABLE ${this.tableName} RENAME COLUMN worker_id TO lease_owner`, |
…elease methods Add `complete(token)` to `ILimiter` as the normal-path post-job release. Semantics differ from `release(token)`: release undoes a reservation as if the job never ran; complete finalises a reservation for a job that actually executed (allowing rate-limiters to record the slot as consumed rather than retracting it from the window). Remove the legacy `acquire`, `recordJobStart`, and `waitUntilAvailable` methods that conflated check+record into a non-atomic sequence. Replace with the existing atomic `tryAcquire` + new `complete` pair. All limiter implementations (`ConcurrencyLimiter`, `RateLimiter`, `DelayLimiter`, `EvenlySpacedRateLimiter`, `NullLimiter`, `CompositeLimiter`) and `QueuedExecutionStrategy` updated accordingly. Merge the `RateLimiter.test.ts` suite into `Limiters.test.ts` and expand coverage of the new `complete` / `release` contracts. https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
… lease expiry Remove the ABORTING job status. Abort is now a two-phase signal: - PENDING jobs are immediately marked FAILED with abort_requested_at set. - PROCESSING jobs get abort_requested_at written; the running worker observes it via checkForAbortingJobs() and triggers its AbortController. The job transitions to FAILED when the worker's execute() rejects. Add two new columns/fields across all backends (InMemory, SQLite, Postgres, Supabase, IndexedDB) and the JobStorageFormat type: - abort_requested_at — timestamp set when abort() is called - lease_expires_at — tracks when the current PROCESSING lease expires, enabling crash-recovery re-claim without a separate lease table Add extendLease(id, workerId, ms) to IQueueStorage; implement across all backends with guard that lease_owner must match. Add opt-in `extendLeaseWhileRunning` option to JobQueueWorker that sets a periodic setInterval to renew the lease at 50% of leaseMs while the job runs. Guard the delaySeconds precision: use sub-second resolution (remove Math.floor) so short-interval tests do not re-claim before the lease floor. New tests in InMemoryJobQueue.test.ts cover: PENDING→FAILED abort path, PROCESSING abort_requested_at, lease expiry re-claim, extendLease survival, and extendLease ownership guard. https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
Rename storage columns across all backends (InMemory, SQLite, Postgres, Supabase, IndexedDB), JobStorageFormat, Job, and all test helpers: run_after → visible_at last_ran_at → last_attempted_at run_attempts → attempts max_retries → max_attempts (default bumped from 20 to 10) worker_id → lease_owner Rename the producer API on JobQueueClient: enqueue() → send() enqueueBatch() → sendBatch() Add idempotent v3 migration for Postgres that checks each column individually with IF EXISTS so a partially-applied migration doesn't skip all remaining renames. Also sets DEFAULT 10 when renaming max_retries. Add idempotent migrations for SQLite with per-column existence guards. Fix Postgres migration schema scope check to use current_schema(). Fix Supabase rename error handling to re-throw non-42703 errors per rename step rather than short-circuiting the loop. https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
…t InMemory; add wrapQueueStorage
Split the monolithic IQueueStorage into three focused interfaces:
IMessageQueue<Body> — send, sendBatch, receive, releaseClaim, and
optional subscribeToChanges. Pure transport.
IClaim<Body> — per-message handle: ack, fail, retry, extendLease.
IJobStore<I,O> — job-record CRUD: get, peek, size, getByRunId,
saveProgress, saveResult, saveError, saveStatus,
abort, delete, deleteAll, deleteByStatusAndAge.
Implement InMemoryMessageQueue and InMemoryJobStore backed by the existing
InMemoryQueueStorage core. Add createInMemoryQueue() factory that returns
{ messageQueue, jobStore }.
Add wrapQueueStorage(storage) adapter that wraps any IQueueStorage into
the split pair, enabling legacy backends to work with the new worker/server
while native implementations are built out. A PendingWrite buffer defers
saveResult/saveError until claim settlement so complete() is only called
once per job (avoiding double-incrementing attempts).
Add MessageQueueClient as a thin wrapper around IMessageQueue for callers
that only need to send messages without the full JobQueueClient machinery.
Refactor JobQueueServer, JobQueueWorker, and JobQueueClient to accept
{ messageQueue, jobStore } instead of a raw IQueueStorage. Legacy
IQueueStorage constructors are preserved via wrapQueueStorage so existing
code continues to work unchanged.
Add IQueueStorage.saveStatus?() as an optional method; wrapQueueStorage
delegates to it when present (avoiding the complete()+attempts-1 hack).
https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
…ackends
Add native IMessageQueue + IJobStore implementations for every backend,
each paired with a createXxxQueue() factory. Backends no longer require
the wrapQueueStorage adapter for first-class usage.
SQLite (providers/sqlite):
SqliteMessageQueue — receive() loops up to max claims; leaseMs forwarded
to next(); subscribeToChanges() via polling.
SqliteJobStore — delegates to SqliteQueueStorage core; saveStatus() via
direct UPDATE (no attempts bump).
createSqliteQueue(db, queueName, opts?) → { messageQueue, jobStore }
IndexedDB (packages/indexeddb):
IndexedDbMessageQueue — same receive() loop; BroadcastChannel notify.
IndexedDbJobStore — delegates to IndexedDbQueueStorage; saveStatus()
via IDB get+put transaction (preserves attempts).
createIndexedDbQueue(queueName, opts?) → { messageQueue, jobStore }
Postgres (providers/postgres):
PostgresMessageQueue — receive() loop; LISTEN/NOTIFY via pg Pool.
PostgresJobStore — delegates to PostgresQueueStorage; saveStatus() via
parameterized UPDATE.
createPostgresQueue(pool, queueName, opts?) → { messageQueue, jobStore }
Supabase (providers/supabase):
SupabaseMessageQueue — receive() loop; polling-based subscription.
SupabaseJobStore — delegates to SupabaseQueueStorage; saveStatus() via
Supabase client .update().
createSupabaseQueue(client, queueName, opts?) → { messageQueue, jobStore }
Add Number.isFinite guards before raw-string SQL interpolation in
leaseMs and extendLease ms parameters.
Add ALTER COLUMN max_attempts SET DEFAULT 10 in the migration path after
renaming max_retries so existing deployments match fresh installs.
All *MessageQueue and *JobStore types use provider-prefixed PendingWrite
aliases (SqlitePendingWrite, etc.) to avoid name collisions in the
@workglow/workglow meta-package barrel.
https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
…ness fixes
Dead-letter queue (DLQ):
Add DeadLetter<Input> type. JobQueueServer accepts an optional
`deadLetter: IMessageQueue<DeadLetter<Input>> | "discard"` option.
When a job exhausts maxAttempts, the worker forwards the original input,
error, attempt count, queueName, and jobRunId to the DLQ before marking
the job FAILED. "discard" suppresses forwarding entirely. Any
IMessageQueue can serve as a DLQ including a second job queue.
Prefetch batching:
Add `prefetch?: number` to JobQueueServerOptions (default 1). The server
passes it through to the worker's receive() call so each poll iteration
claims up to `prefetch` jobs in one round-trip instead of claiming one
and immediately polling for the next.
disableJob implementation:
Replace placeholder with real logic: fail the active claim (if any)
then call jobStore.saveStatus(id, DISABLED) to persist the status
without bumping attempts.
Correctness fixes absorbed from code review:
- ILimiter: validateJobState() failures now call limiter.release(token)
(not complete()) so RateLimiter window slots are not permanently
consumed for jobs that never ran. A limiterReleased flag prevents the
finally block from calling complete() a second time.
- IndexedDbQueueStorage.abort(PENDING): use put() not complete() so
aborting a pending job does not bump attempts.
- wrapQueueStorage.saveStatus: delegate to storage.saveStatus() when
present; throw for legacy storages that lack it. Removes the broken
complete()+attempts-1 workaround (backends that compute attempts+1
from stored state ignored the provided value).
- IQueueStorage: add optional saveStatus?() so the wrap adapter can
duck-type check without a cast.
- SqliteQueueStorage.saveStatus: fix signature to async/Promise<void>.
- JobStorageConverters: add includeLeaseOwner option alias alongside
the deprecated includeWorkerId.
- JobQueueWorker.getClaim: rename from requireClaim throughout.
- JobQueueWorker.releaseClaim fallback: document that adapters whose
claim.id differs from job.id (e.g. SQS receipt handles) should treat
the fallback releaseClaim(job.id) call as best-effort no-op.
- InMemoryJobQueue.test.ts: move mid-file `import type` to the
top-level import block.
https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
9bcc79c to
196c211
Compare
…model Split subscribeToChangesBlock into subscribeToChanges.eventDriven (strict commit order) and subscribeToChanges.polling (set equality + count). Promote usesPolling to required when supportsSubscriptions: true via a discriminated TabularStorageContractOpts union. Add a meta-test covering the failure modes each block is designed to catch. https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
Add optional onRunStart(scope) to IDisposeStrategy and ResourceScope.runStart(). Both TaskRunner.handleStart and TaskGraphRunner.handleStart await scope.runStart() before user code. InactivityStrategy.onRunStart clears all pending timers, closing the race where a timer armed at runComplete could fire mid-next-run and dispose a resource the new run was about to use. InactivityStrategy.onRegister additionally clears any pending timer for the re-registered key. https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
… lease-expiry attempt bump
Migrations (H3/C1):
Restore postgres and sqlite v1 migrations byte-for-byte to the pre-PR
shape (run_after / run_attempts / max_retries / last_ran_at / worker_id
and run_after-keyed indexes). v2 adds abort_requested_at / lease_expires_at.
v3 carries the renames and index swap (drops run_after-keyed indexes,
recreates them keyed on visible_at) guarded by IF EXISTS (postgres) /
PRAGMA table_info (sqlite) so fresh installs and upgraded installs land
on the same schema.
IndexedDB gets a v2 migration that drops the old queue_status_run_after
index, cursor-walks every row copying run_after → visible_at, and
recreates queue_status_visible_at. Synchronous inside the IDB upgrade
transaction (no awaits between requests).
Add a parity integration test that walks v1→v2→v3 and asserts the final
schema matches a fresh install for both postgres and sqlite.
Abort/retry correctness (H1/H4):
All backends (InMemory, SQLite, Postgres, Supabase, IndexedDB, and the
wrapQueueStorage adapter) now clear abort_requested_at on:
(a) complete() PENDING-retry branch
(b) releaseClaim()
(c) next() lease-expiry reclaim
The next() lease-expiry reclaim additionally increments attempts via
CASE WHEN status = 'PROCESSING' THEN attempts + 1 ELSE attempts END
so a crashed-worker lease leak terminates via MAX_ATTEMPTS_REACHED
instead of looping forever. PENDING claims do not bump.
https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
finalize() — non-incrementing terminal write (C2/M4):
Add IQueueStorage.finalize(id, fields) as a partial-overwrite that sets
output/error/error_code/status/completed_at/abort_requested_at without
touching attempts. Implement across all backends. WrappedClaim.ack and
WrappedClaim.fail now call finalize instead of complete, removing the
double-attempts-increment that occurred when a job settled after a
lease-expiry reclaim had already charged the attempt.
InMemoryQueueStorage.updateJobStatus renamed to saveStatus to align with
the IQueueStorage optional interface.
Atomic ack/fail with result/error args (H2):
IClaim.ack accepts optional result; IClaim.fail accepts
{ error, errorCode, abortRequested, permanent }. completeJob calls
claim.ack(output ?? null) and failJob calls claim.fail({...}) directly,
eliminating the two-write window between jobStore.saveResult and
claim.ack where a crash left a PROCESSING row with output written but
status not updated. IJobStore.saveResult/saveError marked @deprecated
and kept as buffered no-ops for one minor release.
Atomic disableJob (H5):
Add optional IClaim.disable() and extend finalize() to also write
lease_owner and progress fields. disableJob now performs a single
storage write setting status=DISABLED, releasing the lease, and
clearing progress — no error/error_code. Replaces the legacy two-write
path that briefly persisted FAILED before DISABLED and could emit a
spurious job_error event to subscribers.
https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3
sroussey
added a commit
that referenced
this pull request
May 19, 2026
* fix(job-queue): RateLimiter slot leak, disableJob dispatch, pre-execute abort check Three follow-up fixes to PR #511 that the commit message claimed but that did not land in the code: 1. RateLimiter slot leak (CRITICAL): validateJobState() failures now call limiter.release() (not complete()) before rethrow, and the outer finally is gated on a limiterReleased flag so the slot is not double-decremented. Without this, every DEADLINE-EXCEEDED or pre-aborted job permanently consumed a RateLimiter window slot. 2. disableJob dispatch (HIGH): JobDisabledError now routes through disableJob() instead of falling into failJob(), so attempting to disable a job no longer clobbers the DISABLED status with FAILED. The H5 atomic-disable code path is now reachable. 3. Pre-execute abort check (HIGH): createAbortController(job.id) is moved before validateJobState() so the pre-execute abort flag check at the top of validateJobState (which reads activeJobAbortControllers) is reachable. Previously the controller was created after validation, making the branch dead code. Adds JobQueueWorker.test.ts with regressions for all three. * fix(job-queue,indexeddb): abort(PENDING) attempts bump + v2 migration backfill Two follow-up fixes to PR #511: 1. IndexedDbQueueStorage.abort(PENDING) (CRITICAL): replaced `this.complete(job)` with a direct `put()` that sets status=FAILED, abort_requested_at, completed_at WITHOUT bumping attempts. Matches the cross-backend contract verified in InMemoryQueueStorage/PostgresQueueStorage and asserted in a new genericJobQueueTests case. 2. IndexedDB v2 migration (CRITICAL): v2 previously copied only run_after → visible_at, leaving run_attempts, last_ran_at, max_retries, worker_id orphaned on upgrade. The storage layer reads the post-rename names, so existing browser-deployed queues silently lost retry budgets, last-attempt timestamps, and lease ownership on first migration. Extend the cursor body to migrate all five renames idempotently. Move the queue_status_visible_at createIndex into the terminal cursor branch so the index is built off post-migration rows. Adds IndexedDbQueueMigrations.integration test for the five-rename case and a cross-backend abort(PENDING) attempts-stability assertion. * fix(sqlite): v3 max_attempts default = 10 to match Postgres parity PR #511 lowered the default retry budget from 20 (Postgres) / 23 (SQLite v1) to 10. Postgres v3 explicitly applied `ALTER COLUMN max_attempts SET DEFAULT 10`; SQLite v3 renamed the column but did not adjust the default. Fresh SQLite installs ended up at default 23, Postgres at 10, so callers omitting maxAttempts got divergent retry behavior across backends. SQLite has no `ALTER COLUMN ... SET DEFAULT` syntax, so the fix uses the documented 12-step table-rebuild procedure (https://www.sqlite.org/lang_altertable.html#otheralter): build the new CREATE TABLE statement from the post-rename PRAGMA table_info, swap max_attempts's default literal, copy rows, drop old, rename new, recreate indexes. The rebuild is gated on the current default not already being '10' so re-running v3 is idempotent. Extend the migrations parity integration test to compare DEFAULTS, not just column names, so future drift is caught. * fix(job-queue): validate leaseMs / extendLease ms inputs across all backends PR #511 added Number.isFinite guards to Supabase only; Postgres, SQLite, InMemoryQueueStorage, and IndexedDB passed leaseMs / ms directly into new Date(Date.now() + ms).toISOString() (yields "Invalid Date" for NaN/Infinity, poisoning the row) or into parameterized SQL fragments (runtime error for non-finite). A negative leaseMs immediately re-expires the lease a worker just claimed. Extract validation into a shared validateLeaseMs() helper in @workglow/job-queue; call it at the top of every next() and extendLease() across all 5 backends. Migrate Supabase from its inline Error throw to the shared RangeError so all backends report the same exception type. ms === 0 remains valid (instant expiry). Adds a cross-backend contract test that runs against every backend via the shared genericJobQueueTests harness. * fix(sqlite): harden v3 rebuild and tighten follow-up tests Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com> * test(sqlite): share canonical v3 schema helper Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com> * fix(sqlite): tighten schema helper typing Agent-Logs-Url: https://github.com/workglow-dev/libs/sessions/a1392ebd-9668-440a-a244-8a70762ce7de Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: sroussey <127349+sroussey@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
https://claude.ai/code/session_01MVz87icx5fozgdfEPSQih3