refactor: remove pre-v1 backward-compat code paths#523
Merged
Conversation
Drop @deprecated markers, legacy shims, and dual old-way/new-way code paths across the job-queue, storage, and task-graph packages. Squash all job-queue DB migrations (IndexedDB / Postgres / SQLite) into a single v1 with the final schema since pre-v1 has no deployed users to migrate. Job queue: - Remove `saveResult`/`saveError` from IJobStore and all implementations. Workers now call `claim.ack(result)` / `claim.fail(opts)` directly, with `completeWithResult` / `failWithError` as the no-active-claim fallback. - Drop the transient pending-write buffer plumbing (`PendingWrite`, `Pending*Write`) from wrapQueueStorage and the InMemory / IndexedDB / SQLite / Postgres / Supabase claim+jobstore+messagequeue trios. - Drop the legacy `storage` option on JobQueueServer / Client / Worker; they now require the paired `messageQueue` + `jobStore`. Callers wrap a raw IQueueStorage via `wrapQueueStorage()` themselves. - Drop the `core` re-export from createInMemoryQueue / createIndexedDbQueue / createSqliteQueue / createPostgresQueue / createSupabaseQueue. - Drop the deprecated `includeWorkerId` parameter on `storageToClass()`. - Promote `IClaim.disable()` from optional to required; SQS + Cloudflare claims implement it via `saveStatus(DISABLED)` plus message.ack(). Migrations (final-version-only): - IndexedDB: collapse v1 (legacy run_after/etc.) + v2 (rename backfill) → v1 - Postgres: collapse v1–v5 (legacy columns, renames, fingerprint index convergence) → v1 - SQLite: collapse v1–v6 (legacy columns, renames, max_attempts rebuild, fingerprint index convergence) → v1 - Supabase: drop the in-place RENAME COLUMN block from createSchema(). Task-graph / storage / util: - Drop the `cacheable = false` one-time deprecation warning from Task.getCachePolicy. - Drop `@deprecated` from QueryOptions.offset and getOffsetPage (still valid features, no longer flagged as legacy). - Drop the "exists for backwards compat" comment from collectStream. - Promote IDisposeStrategy.onRunStart from optional to required. Tests: update to new IJobStore / IClaim shape, delete obsolete deprecation and migration-parity tests.
@workglow/cli
@workglow/ai
@workglow/browser-control
@workglow/indexeddb
@workglow/javascript
@workglow/job-queue
@workglow/knowledge-base
@workglow/mcp
@workglow/storage
@workglow/task-graph
@workglow/tasks
@workglow/util
workglow
@workglow/anthropic
@workglow/bun-webview
@workglow/chrome-ai
@workglow/electron
@workglow/google-gemini
@workglow/huggingface-inference
@workglow/huggingface-transformers
@workglow/node-llama-cpp
@workglow/ollama
@workglow/openai
@workglow/playwright
@workglow/postgres
@workglow/sqlite
@workglow/supabase
@workglow/tf-mediapipe
commit: |
Contributor
There was a problem hiding this comment.
Pull request overview
This PR removes pre-v1 backward-compatibility paths across the job-queue, storage, task-graph, util, and provider adapters, and collapses queue migrations to a single “final schema” v1 for each backend.
Changes:
- Refactors job-queue to remove legacy
storagewiring and theIJobStore.saveResult/saveErrorbuffering path; workers now settle jobs viaclaim.ack(result)/claim.fail(opts)and use job-store terminal helpers as fallback. - Collapses IndexedDB/Postgres/SQLite (and Supabase schema setup) to single-version “final schema” migrations and removes legacy rename/backfill paths.
- Updates util resource disposal strategies and tests to match new required interface hooks and new job-queue construction patterns.
Reviewed changes
Copilot reviewed 62 out of 62 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/supabase/src/job-queue/SupabaseQueueStorage.ts | Removes legacy rename/alter schema steps; keeps idempotent create + indexes. |
| providers/supabase/src/job-queue/SupabaseMessageQueue.ts | Removes pending-write buffer; claim acks/fails persist directly. |
| providers/supabase/src/job-queue/SupabaseJobStore.ts | Removes deprecated saveResult/saveError buffer plumbing. |
| providers/supabase/src/job-queue/createSupabaseQueue.ts | Simplifies queue factory to return only messageQueue + jobStore. |
| providers/sqlite/src/migrations/sqliteQueueMigrations.ts | Collapses SQLite queue migrations into v1 canonical schema. |
| providers/sqlite/src/job-queue/SqliteMessageQueue.ts | Removes pending-write buffer; claim acks/fails persist directly. |
| providers/sqlite/src/job-queue/SqliteJobStore.ts | Removes deprecated saveResult/saveError buffer plumbing. |
| providers/sqlite/src/job-queue/createSqliteQueue.ts | Simplifies queue factory to return only messageQueue + jobStore. |
| providers/postgres/src/migrations/postgresQueueMigrations.ts | Collapses Postgres queue migrations into v1 canonical schema + unique partial index. |
| providers/postgres/src/job-queue/PostgresMessageQueue.ts | Removes pending-write buffer; claim acks/fails persist directly. |
| providers/postgres/src/job-queue/PostgresJobStore.ts | Removes deprecated saveResult/saveError buffer plumbing. |
| providers/postgres/src/job-queue/createPostgresQueue.ts | Simplifies queue factory to return only messageQueue + jobStore. |
| providers/cloudflare/src/job-queue/CloudflareClaim.ts | Implements required disable() on Cloudflare claim. |
| providers/aws/src/job-queue/SqsClaim.ts | Implements required disable() on SQS claim. |
| packages/util/src/resource/strategies/RunCompletionStrategy.ts | Adds required onRunStart hook implementation. |
| packages/util/src/resource/strategies/NeverDisposeStrategy.ts | Adds required onRunStart hook implementation. |
| packages/util/src/resource/ResourceScope.ts | Makes runStart() always call strategy onRunStart. |
| packages/util/src/resource/DisposeStrategy.ts | Makes IDisposeStrategy.onRunStart required (non-optional). |
| packages/test/src/test/task/FetchTask.test.ts | Updates tests to use wrapQueueStorage / createInMemoryQueue. |
| packages/test/src/test/task-graph-job-queue/SqliteTaskGraphJobQueue.integration.test.ts | Updates server/client setup to use wrapped queue storage. |
| packages/test/src/test/task-graph-job-queue/InMemoryTaskGraphJobQueue.test.ts | Updates server/client setup to use wrapped queue storage. |
| packages/test/src/test/task-graph-job-queue/IndexedDbTaskGraphJobQueue.integration.test.ts | Updates server/client setup to use wrapped queue storage. |
| packages/test/src/test/task-graph-cache/CachePolicy.test.ts | Removes coverage for removed legacy cacheable=false shim. |
| packages/test/src/test/task-graph-cache/CacheableDeprecation.test.ts | Deletes legacy cacheable=false deprecation tests. |
| packages/test/src/test/storage-migrations/queueMigrationsParity.integration.test.ts | Deletes migration-parity and convergence tests removed by v1-only migrations. |
| packages/test/src/test/storage-migrations/IndexedDbQueueMigrations.integration.test.ts | Deletes IndexedDB v1→v2 migration regression tests (no longer applicable). |
| packages/test/src/test/job-queue/SupabaseJobStoreExtensions.integration.test.ts | Updates migration invocation to use queue components rather than core. |
| packages/test/src/test/job-queue/PostgresJobStoreExtensions.integration.test.ts | Updates migration invocation to use queue components rather than core. |
| packages/test/src/test/job-queue/JobStoreFingerprintRace.test.ts | Updates migration-shape assertions and queue setup for v1-only migrations. |
| packages/test/src/test/job-queue/JobStoreExtensions.test.ts | Removes deprecated error prepopulation path; updates SQLite backend setup. |
| packages/test/src/test/job-queue/JobQueueWorker.test.ts | Updates worker tests to use wrapped storage and new client/server options. |
| packages/test/src/test/job-queue/JobErrorRegistry.test.ts | Updates client construction to use createInMemoryQueue; wraps storage for server/client. |
| packages/test/src/test/job-queue/InMemoryJobQueue.test.ts | Updates tests to use wrapped storage for server/client construction. |
| packages/test/src/test/job-queue/genericJobQueueTests.ts | Updates shared test suite to use wrapQueueStorage throughout. |
| packages/test/src/test/cloudflare/job-queue/handleQueueBatch.test.ts | Updates in-memory store construction after pending buffer removal. |
| packages/test/src/test/cloudflare/job-queue/handleQueueBatch.terminalRedelivery.test.ts | Updates in-memory store construction after pending buffer removal. |
| packages/test/src/test/cloudflare/job-queue/CloudflareMessageQueue.test.ts | Updates in-memory store construction after pending buffer removal. |
| packages/test/src/test/cloudflare/job-queue/CloudflareMessageQueue.sendBatch.transient.test.ts | Updates in-memory store construction after pending buffer removal. |
| packages/test/src/test/cloudflare/job-queue/CloudflareGenericQueue.test.ts | Updates in-memory store construction after pending buffer removal. |
| packages/test/src/test/cloudflare/job-queue/CloudflareClaim.test.ts | Updates in-memory store construction after pending buffer removal. |
| packages/test/src/test/ai/StreamingAiTaskPhases.test.ts | Updates server/client setup to use wrapped queue storage. |
| packages/test/src/test/ai-provider/StreamingProvider.test.ts | Updates server/client setup to use wrapped queue storage. |
| packages/test/src/test/ai-provider/AiProviderRegistry.test.ts | Updates server/client setup to use wrapped queue storage. |
| packages/test/src/test/ai-provider-hft/ZeroShotTasks.integration.test.ts | Updates server/client setup to use wrapped queue storage. |
| packages/task-graph/src/task/Task.ts | Removes one-time legacy cacheable=false warning/shim path. |
| packages/task-graph/src/task/JobQueueFactory.ts | Updates factory to wrap storage and pass messageQueue/jobStore explicitly. |
| packages/storage/src/tabular/ITabularStorage.ts | Removes deprecated annotations/comments from offset paging APIs. |
| packages/job-queue/src/queue-storage/wrapQueueStorage.ts | Removes transient pending-write buffer and legacy saveResult/saveError support. |
| packages/job-queue/src/queue-storage/InMemoryMessageQueue.ts | Removes pending-write buffer; claim acks/fails persist directly. |
| packages/job-queue/src/queue-storage/InMemoryJobStore.ts | Removes deprecated saveResult/saveError buffer plumbing. |
| packages/job-queue/src/queue-storage/IJobStore.ts | Removes saveResult/saveError from the job store interface. |
| packages/job-queue/src/queue-storage/IClaim.ts | Makes disable() required and updates fail/disable docs. |
| packages/job-queue/src/queue-storage/createInMemoryQueue.ts | Simplifies queue factory to return only messageQueue + jobStore. |
| packages/job-queue/src/job/JobStorageConverters.ts | Removes legacy includeWorkerId option; always includes leaseOwner. |
| packages/job-queue/src/job/JobQueueWorker.ts | Removes legacy saveResult/saveError fallback usage; uses terminal helpers when no claim. |
| packages/job-queue/src/job/JobQueueServer.ts | Removes legacy storage option and server.getStorage() API. |
| packages/job-queue/src/job/JobQueueClient.ts | Removes legacy storage option and updates storageToClass call signature. |
| packages/indexeddb/src/migrations/indexedDbQueueMigrations.ts | Collapses IndexedDB migrations into v1 final schema only. |
| packages/indexeddb/src/job-queue/IndexedDbMessageQueue.ts | Removes pending-write buffer; claim acks/fails persist directly. |
| packages/indexeddb/src/job-queue/IndexedDbJobStore.ts | Removes deprecated saveResult/saveError buffer plumbing. |
| packages/indexeddb/src/job-queue/createIndexedDbQueue.ts | Simplifies queue factory to return only messageQueue + jobStore. |
| packages/ai/src/capability/collectStream.ts | Removes backwards-compat framing from doc comment. |
Comments suppressed due to low confidence (1)
providers/supabase/src/job-queue/SupabaseQueueStorage.ts:235
migrate()now only runsCREATE TABLE IF NOT EXISTSand then creates indexes that referencevisible_at,attempts,max_attempts, etc. If an existing Supabase table still has the legacy column names (e.g.run_after,run_attempts,max_retries,worker_id), index creation and subsequent queries will fail with "column does not exist" errors. If dropping legacy support is intentional, consider detecting the legacy schema early and throwing a clear, actionable error (or keeping a one-time rename step) so failures aren’t deferred to later index/query statements.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+34
to
+37
| const enumLiteral = Object.values(JobStatus) | ||
| .map((v) => `'${v}'`) | ||
| .join(","); | ||
|
|
Comment on lines
+83
to
+86
| async disable(): Promise<void> { | ||
| this.message.ack(); | ||
| await this.jobStore.saveStatus(this.id, JobStatus.DISABLED); | ||
| } |
Comment on lines
+123
to
+131
| async disable(): Promise<void> { | ||
| await this.sqs.send( | ||
| new DeleteMessageCommand({ | ||
| QueueUrl: this.queueUrl, | ||
| ReceiptHandle: this.receiptHandle, | ||
| }) | ||
| ); | ||
| await this.jobStore.saveStatus(this.id, JobStatus.DISABLED); | ||
| } |
Comment on lines
31
to
47
| return [ | ||
| { | ||
| component, | ||
| version: 1, | ||
| description: "Create queue object store + indexes", | ||
| up({ db }) { | ||
| if (!db.objectStoreNames.contains(tableName)) { | ||
| const store = db.createObjectStore(tableName, { keyPath: "id" }); | ||
| store.createIndex("queue_status", k(["queue", "status"]), { unique: false }); | ||
| store.createIndex("queue_status_run_after", k(["queue", "status", "run_after"]), { | ||
| store.createIndex("queue_status_visible_at", k(["queue", "status", "visible_at"]), { | ||
| unique: false, | ||
| }); | ||
| store.createIndex("queue_job_run_id", k(["queue", "job_run_id"]), { unique: false }); | ||
| store.createIndex("queue_fingerprint_status", k(["queue", "fingerprint", "status"]), { | ||
| unique: false, | ||
| }); | ||
| } |
Comment on lines
888
to
896
| job.error = error.message; | ||
| job.errorCode = jobErrorPersistedCode(error); | ||
|
|
||
| // H2 atomic fail: hand error/errorCode/abortRequested directly to | ||
| // Atomic fail: hand error/errorCode/abortRequested directly to | ||
| // claim.fail() so they land in a single storage write together with | ||
| // status=FAILED. Eliminates the saveError-then-fail two-write window | ||
| // where a crash could leave the row PROCESSING with an `error` | ||
| // already written. | ||
| // status=FAILED. | ||
| const abortRequested = error instanceof AbortSignalJobError; | ||
| const persistedCode = jobErrorPersistedCode(error); | ||
| const claim = this.getClaim(job.id); |
Comment on lines
223
to
228
| } | ||
| } | ||
|
|
||
| // Add new columns to existing tables and rename old columns (idempotent) | ||
| const alterSqls = [ | ||
| `ALTER TABLE ${this.tableName} ADD COLUMN IF NOT EXISTS abort_requested_at timestamp with time zone`, | ||
| `ALTER TABLE ${this.tableName} ADD COLUMN IF NOT EXISTS lease_expires_at timestamp with time zone`, | ||
| `ALTER TABLE ${this.tableName} RENAME COLUMN run_after TO visible_at`, | ||
| `ALTER TABLE ${this.tableName} RENAME COLUMN last_ran_at TO last_attempted_at`, | ||
| `ALTER TABLE ${this.tableName} RENAME COLUMN run_attempts TO attempts`, | ||
| `ALTER TABLE ${this.tableName} RENAME COLUMN max_retries TO max_attempts`, | ||
| `ALTER TABLE ${this.tableName} RENAME COLUMN worker_id TO lease_owner`, | ||
| ]; | ||
| for (const sql of alterSqls) { | ||
| const { error } = await this.client.rpc("exec_sql", { query: sql }); | ||
| // 42703 = undefined_column: expected on re-run after a RENAME COLUMN has | ||
| // already applied (the old column no longer exists). All other errors | ||
| // (permission denied, wrong table name, etc.) are re-thrown. | ||
| if (error && error.code !== "42703") { | ||
| throw new Error(`Failed to rename column: ${error.message}`); | ||
| } | ||
| } | ||
|
|
||
| // Create indexes with prefix columns prepended | ||
| const indexes = [ | ||
| `CREATE INDEX IF NOT EXISTS job_fetcher${indexSuffix}_idx ON ${this.tableName} (${prefixIndexPrefix}id, status, visible_at)`, |
Address PR #523 review feedback: Cloudflare's and SQS's IClaim.disable() were calling jobStore.saveStatus(DISABLED), which only updates the status column — not the full atomic terminal write the IClaim.disable contract describes (status + lease_owner=null + progress reset + completed_at). Other backends (InMemory/IndexedDB/SQLite/Postgres/Supabase) already had this via their wrapped core.finalize call, but cloud adapters don't share a wrapped IQueueStorage with their store. Add markDisabled() to IJobStore and implement it across every backend (InMemory/IndexedDb/SQLite/Postgres/Supabase + the wrapQueueStorage shim + the per-backend QueueStorage cores that already expose completeWithResult/failWithError). Cloudflare and SQS disable() now call jobStore.markDisabled(id) instead of saveStatus, restoring contract parity across all backends. Also fix the duplicate jobErrorPersistedCode(error) call in JobQueueWorker.failJob (computed once and reused).
Code-review caught a regression introduced by the prior markDisabled commit and a Supabase divergence: - JobQueueWorker.disableJob no-claim fallback was still calling jobStore.saveStatus(DISABLED), which writes status only — the exact divergence markDisabled was added to fix. The fallback now calls jobStore.markDisabled(id), so the no-claim path produces the same row state (lease_owner=null, progress=0, completed_at set) as claim.disable(). - SupabaseQueueStorage.markDisabled was unconditionally overwriting completed_at on every call, while all other backends COALESCE on the existing value. PostgREST has no COALESCE on .update(), so we read-then-write (mirrors the failWithError pattern) to preserve a previously-stamped completed_at. - Add direct test coverage for jobStore.markDisabled in genericJobQueueTests so every backend (InMemory, IndexedDB, SQLite, Postgres, Supabase) exercises the new code path. Tests assert the full disable contract (status=DISABLED, lease_owner=null, progress cleared, completed_at stamped, error fields untouched) and verify that an existing completed_at is preserved. - Drop two stale `@deprecated` JSDoc markers that survived the prior back-compat sweep (ITask.cacheable, TaskRunner.outputCache). The fields themselves stay (cache cleanup was explicitly skipped) but the @deprecated framing contradicts the PR's "no @deprecated" rule.
…t microtask ordering
The earlier promotion of `onRunStart` from optional to required broke
5 task-abort tests (SingleTask, DelayTask, TaskSubGraphRunner) that all
share the pattern:
const promise = task.run();
task.abort();
await promise;
Even with a no-op `onRunStart() {}` on RunCompletionStrategy / NeverDisposeStrategy,
`ResourceScope.runStart()` now contained an `await this.strategy.onRunStart(this)`
which adds a microtask hop. The hop is enough for the synchronous `task.abort()`
to fire its abort listener — and clear `this.currentCtx` inside `handleAbort` —
before `handleStart` resumes past its `await this.resourceScope.runStart()`.
On resume, `run()` does `const ctx = this.currentCtx!` and reads undefined,
then crashes at `ctx.abortController.signal.aborted` with TypeError.
Revert to the guarded shape so strategies with no pre-run work keep
`runStart()` synchronous (zero internal awaits), preserving the
microtask shape the runner's abort-during-handleStart path was written
against. Drop the no-op `onRunStart` overrides added in the prior commit
on the two default strategies; InactivityStrategy keeps its real one.
The reverted change was a code-review cosmetic ("optional shape is
backward-compat"), not a contract change anyone depended on — restoring
optional preserves the in-tree contract and unblocks the abort tests.
Coverage Report
File Coverage
|
||||||||||||||||||||||||||||||||||||||
… instance state Removes the TOCTOU class of bugs where `this.currentCtx` could be nullified across an await by the abort listener firing inside `controller.abort()`. The original bug surfaced as a TypeError on `ctx.abortController` after `handleStart` resumed past a microtask-yielding strategy hook; the same shape would surface again any time something running during an await nullified the instance field. Three rules now hold: 1. `handleStart` returns the ctx. `run()` reads it once from the return value and never re-reads `this.currentCtx`. The instance field is kept purely as the *external* pointer used by no-arg public methods (`abort`, `disable`); internal flow uses the local exclusively. 2. Listeners and timers capture the ctx they were attached to. The abort listener registered in `handleStart` closes over the local `ctx`, and the timeout timer fires `ctx.abortController.abort()` directly rather than going through `this.abort()`. 3. Terminal handlers (`handleAbort` / `handleComplete` / `handleError` / `handleDisable`) are idempotent per-ctx via a new `ctx.terminated` flag set before any await. The instance-field clear is CAS-style — `if (this.currentCtx === ctx) this.currentCtx = undefined` — so a stale handler can't clobber a newer run's pointer. The guard moves off `task.status` (externally observable, can be mutated by adjacent runs) onto `ctx.terminated` (per-run, internal). Signatures changed: `handleStart` now returns `TaskRunContext`; `handleAbort` / `handleComplete` / `handleError` / `clearTimeoutTimer` take a `ctx` parameter. No external API change — `run()` / `abort()` / `disable()` are unchanged. Adds a regression test that exercises the exact sync run/abort pattern that broke before, asserting no TypeError leaks from the runner.
Comment on lines
178
to
182
| // ctx is threaded through locals from here; nothing inside run() re-reads | ||
| // this.currentCtx (which can be nulled by handleAbort firing on the abort | ||
| // listener during an interleaved abort). | ||
| const ctx = await this.handleStart(effectiveConfig); | ||
|
|
Comment on lines
381
to
386
| public getCachePolicy(_inputs: Input): CachePolicy { | ||
| const ctor = this.constructor as typeof Task; | ||
| const hasLegacyOverride = | ||
| Object.prototype.hasOwnProperty.call(ctor, "cacheable") && (ctor as any).cacheable === false; | ||
| const hasPolicyOverride = Object.prototype.hasOwnProperty.call(ctor, "cachePolicy"); | ||
|
|
||
| if (hasLegacyOverride && !hasPolicyOverride) { | ||
| if (!Task.__cacheableDeprecationWarned.has(ctor.type)) { | ||
| Task.__cacheableDeprecationWarned.add(ctor.type); | ||
| getLogger().warn( | ||
| `Task "${ctor.type}": static \`cacheable = false\` is deprecated. ` + | ||
| `Use \`static cachePolicy: CachePolicy = { kind: "none" }\` instead.` | ||
| ); | ||
| } | ||
| return { kind: "none" }; | ||
| } | ||
| if (this.runConfig?.cacheable === false || this.config?.cacheable === false) | ||
| return { kind: "none" }; // per-instance shim, no warning | ||
| return { kind: "none" }; | ||
| return ctor.cachePolicy ?? DEFAULT_CACHE_POLICY; | ||
| } |
Comment on lines
+256
to
+260
| async markDisabled(id: MessageId): Promise<void> { | ||
| const current = await this.storage.get(id); | ||
| const completedAt = current?.completed_at ?? new Date().toISOString(); | ||
| await this.storage.finalize(id, { | ||
| status: "DISABLED", |
Comment on lines
+981
to
+993
| /** | ||
| * Atomically writes status=DISABLED, releases the lease, clears progress | ||
| * fields, and stamps `completed_at`. Does NOT write error/error_code — | ||
| * DISABLED is not an error transition. | ||
| */ | ||
| public async markDisabled(id: unknown): Promise<void> { | ||
| // Fetch first so we can preserve an existing completed_at (parity with | ||
| // Postgres/SQLite COALESCE and the InMemory/IndexedDb/WrappedJobStore | ||
| // `current?.completed_at ?? now` pattern). PostgREST has no COALESCE on | ||
| // .update(), so we read-then-write. | ||
| const existing = await this.get(id); | ||
| const completedAt = existing?.completed_at ?? new Date().toISOString(); | ||
| let query = this.client |
Comment on lines
+123
to
+131
| async disable(): Promise<void> { | ||
| await this.sqs.send( | ||
| new DeleteMessageCommand({ | ||
| QueueUrl: this.queueUrl, | ||
| ReceiptHandle: this.receiptHandle, | ||
| }) | ||
| ); | ||
| await this.jobStore.markDisabled(this.id); | ||
| } |
Comment on lines
+83
to
+86
| async disable(): Promise<void> { | ||
| this.message.ack(); | ||
| await this.jobStore.markDisabled(this.id); | ||
| } |
Comment on lines
+34
to
+37
| const enumLiteral = Object.values(JobStatus) | ||
| .map((v) => `'${v}'`) | ||
| .join(","); | ||
|
|
Comment on lines
33
to
44
| export interface IDisposeStrategy { | ||
| onRegister(key: string, disposer: () => Promise<void>, scope: ResourceScope): () => Promise<void>; | ||
| /** | ||
| * Called by `ResourceScope.runStart()` before a run begins. Optional — | ||
| * external strategies written against the original four-method shape | ||
| * continue to work; `ResourceScope` no-ops when this is undefined. | ||
| * Called by `ResourceScope.runStart()` before a run begins. | ||
| * Optional — `ResourceScope.runStart` no-ops when the strategy does not | ||
| * implement this hook. Keeping it optional preserves the microtask shape | ||
| * of `runStart()` (zero internal awaits) for strategies that have no | ||
| * pre-run work; making it required adds an `await undefined` that changes | ||
| * abort-race timing in the task runner. | ||
| */ | ||
| onRunStart?(scope: ResourceScope): Promise<void> | void; | ||
| touch(key: string): void; |
Six issues from the Copilot review on the latest commits: **TaskRunner.run() now wraps handleStart in try/catch/finally** handleStart can throw at `resourceScope.runStart()` or telemetry init, leaving task.status=PROCESSING, currentCtx set, and the owned ResourceScope undisposed. Wrap the call in the outer try so the finally always runs scope cleanup, and add a catch arm that drives handleError on the partial ctx (or unwinds task.status manually if the throw happened before TaskRunContext was created). **Task.getCachePolicy honors static `cacheable = false` again** The prior back-compat sweep removed the static-honoring branch along with the deprecation warning, silently breaking subclasses that set `static cacheable = false`. Restore the mapping (without the warning): resolution order is now per-instance `cacheable=false` → static `cacheable=false` → static `cachePolicy` → DEFAULT_CACHE_POLICY. Static `cachePolicy` declared on the same class still wins to support the canonical migration path. **wrapQueueStorage.markDisabled delegates to core.markDisabled** Was doing get() + finalize() — two ops, racy on completed_at and the status/lease fields between them. `IQueueStorage.markDisabled` is now a required method on the interface; every core implements it as a single-op atomic write. IndexedDbQueueStorage gains a `markDisabled` helper (IDB is single-threaded per-origin so the read+write pair is observably atomic). TelemetryQueueStorage forwards with tracing. IndexedDbJobStore.markDisabled now delegates to `core.markDisabled` for parity with the other backends. **SupabaseQueueStorage.markDisabled uses exec_sql + COALESCE** PostgREST `.update()` cannot reference existing column values, so the previous read-then-write opened a race window where a concurrent update could clobber the disable transition (and the docstring claimed atomicity). Mirror the failWithError pattern: exec_sql with `COALESCE(completed_at, NOW())` in the SET clause. **SqsClaim.disable + CloudflareClaim.disable persist BEFORE acking** Previously: delete/ack the queue message, then markDisabled. If markDisabled threw, the message was gone but the row remained PROCESSING with no retry path. Reverse: markDisabled first, ack after — if ack/delete then fails, the redelivered message hits SqsMessageQueue/handleQueueBatch's terminal-status drop and is filtered. Added regression tests asserting the call-order invariant via `vi.fn().mock.invocationCallOrder` and an SDK-mock callback that probes row status mid-command. Regression tests added for the cacheable static flag and both claim ordering invariants. All 2364 tests pass.
Three conflict resolutions:
**Task.ts** — Kept HEAD's getCachePolicy (per-instance + static cacheable=false
→ {kind:"none"}). Dropped main's H7 reintroduction of the @deprecated warning
and the __cacheableDeprecationWarned tracking Set, since the whole PR thesis
is "no @deprecated, current version is the only version". The static
`cacheable` flag still works; it just no longer warns.
**IQueueStorage.ts** — Both sides added a new method: HEAD added
`markDisabled` (required), main added optional `findActiveByFingerprint`.
Kept both — orthogonal additions.
**wrapQueueStorage.ts** — Three regions:
1. Module-scope: kept main's MAX_FINGERPRINT_SCAN + bounded-scan warning
gate; dropped main's PendingWrite type (back-compat plumbing the PR
removed alongside saveResult/saveError on IJobStore).
2. WrappedClaim.ack: adopted main's "don't fall back to current.output"
semantics (commit 091cbbd/66ebc6e — prior-attempt output must not bleed
into the next ack), simplified to `result ?? null` since the `buf`
pending-write source no longer exists.
3. WrappedClaim.fail: same — `opts.error ?? null` and `opts.errorCode ?? null`,
no fallback to current.
Auto-merged SqsClaim.ts / CloudflareClaim.ts cleanly combine main's
persistWithRetry on ack/fail with this PR's reversed-order disable()
(markDisabled before ack/delete).
Test result: 2364 task/graph/queue/cloudflare/aws tests + 3615 unit
tests pass. No regressions from the merge.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Drop @deprecated markers, legacy shims, and dual old-way/new-way code paths
across the job-queue, storage, and task-graph packages. Squash all job-queue
DB migrations (IndexedDB / Postgres / SQLite) into a single v1 with the final
schema since pre-v1 has no deployed users to migrate.
Job queue:
saveResult/saveErrorfrom IJobStore and all implementations.Workers now call
claim.ack(result)/claim.fail(opts)directly, withcompleteWithResult/failWithErroras the no-active-claim fallback.PendingWrite,Pending*Write) from wrapQueueStorage and the InMemory / IndexedDB /SQLite / Postgres / Supabase claim+jobstore+messagequeue trios.
storageoption on JobQueueServer / Client / Worker;they now require the paired
messageQueue+jobStore. Callers wrap araw IQueueStorage via
wrapQueueStorage()themselves.corere-export from createInMemoryQueue / createIndexedDbQueue/ createSqliteQueue / createPostgresQueue / createSupabaseQueue.
includeWorkerIdparameter onstorageToClass().IClaim.disable()from optional to required; SQS + Cloudflareclaims implement it via
saveStatus(DISABLED)plus message.ack().Migrations (final-version-only):
convergence) → v1
fingerprint index convergence) → v1
Task-graph / storage / util:
cacheable = falseone-time deprecation warning fromTask.getCachePolicy.
@deprecatedfrom QueryOptions.offset and getOffsetPage (stillvalid features, no longer flagged as legacy).
Tests: update to new IJobStore / IClaim shape, delete obsolete deprecation
and migration-parity tests.