Capability-based dispatch + Promise+emit run-fn migration#494
Merged
Conversation
@workglow/cli
@workglow/ai
@workglow/browser-control
@workglow/indexeddb
@workglow/javascript
@workglow/job-queue
@workglow/knowledge-base
@workglow/mcp
@workglow/storage
@workglow/task-graph
@workglow/tasks
@workglow/util
workglow
@workglow/anthropic
@workglow/bun-webview
@workglow/chrome-ai
@workglow/electron
@workglow/google-gemini
@workglow/huggingface-inference
@workglow/huggingface-transformers
@workglow/node-llama-cpp
@workglow/ollama
@workglow/openai
@workglow/playwright
@workglow/postgres
@workglow/sqlite
@workglow/supabase
@workglow/tf-mediapipe
commit: |
3 tasks
Replaces the per-task-type provider registry with capability-based dispatch.
Each provider registers `{ serves: Capability[], runFn }` entries; the registry
picks the run-fn whose `serves` is the smallest superset of the task's
`requires`, with strict gating (`model.capabilities ⊇ task.requires`).
- Capability registry foundation with strict gating + most-specific-superset
selection (ties broken by registration order)
- Rename ModelConfig.tasks → ModelConfig.capabilities
- Static readonly `requires` field on AiTask base classes and all 46 concrete
task subclasses; intermediate bases drop redundant overrides
- Extract registerAiTasks to its own file; hoist strict gating to streaming
path; fix AiChatTask getJobInput
- Correct collectStream object-delta and multi-port text semantics
…tion[] (Phase 5)
Migrates every vendor provider to the capability-set registration list shape:
each provider's constructor now declares `AiProviderRunFnRegistration[]` instead
of per-task-type registrations. Dispatch happens via the capability registry
introduced in Phase 3.
- OpenAI (5a), Anthropic (5b), Google Gemini (5c), Ollama (5d)
- HuggingFace Transformers (5e), HuggingFace Inference (5f)
- node-llama-cpp (5g), TF MediaPipe (5h), Chrome AI (5i)
- Skip legacy contract test type-checking pending Phase 9 rewrite (5j)
- Tighten Anthropic 3.5/3.7 family regex to cover claude-3-5-haiku
- Include all 9 vendor provider packages in pkg-pr-new preview publish so
downstream consumers can pin to per-PR previews
Addresses Copilot review on PR #479 and unblocks CI for the post-Phase-5 state. - Declare `requires` on AiChatWithKbTask and KbSearchTask - Skip model-capability gate for lifecycle tasks (download/dispose) - Unblock bun test discovery for legacy contract tests - Skip whole AiProvider conformance suite pending Phase 9 rewrite - Phase 9 publish-preview workflow + drop dead Anthropic_Chat_Stream alias - Update todo and dependabot config
…test harness Adds infrastructure for streaming progress through tasks and for running the large-model integration tests reliably under CI memory constraints. - bridgeProgress utility + unit tests - Integration-test scaffolding for large-model scenarios - Restore RAG test parallelism with 15-minute job timeouts - GitHub Actions workflow updates
Cascade of fixes for OOMs in the RAG integration suite caused by ONNX/WASM
runtime instances leaking across tests.
- hft: dispose ONNX pipelines on clearPipelineCache and after RAG tests
- ai: unregister existing provider before re-registering to prevent run-fn
accumulation across test setup
- ai: null bridgeProgress captures in finally to release tensor refs
- ai/hft: release WASM via awaited dispose + macrotask yield per AiJob
- ci: force vitest single-file-at-a-time for RAG suite; bump per-test timeout
to 600s; restore RAG parallel=1 with 25-min job timeout
- Includes diagnostic instrumentation (now removed) used to locate the leak,
plus a revert of one speculative dispose path that proved unnecessary
Introduces the building blocks for the new run-fn shape: a single
`Promise<void>` that emits events through an injected `AiEmit` callback,
replacing the previous AsyncIterable<AiStreamEvent> return shape. The
consumer (StreamingAiTask / TaskRunner) is now solely responsible for
accumulating deltas; providers become stateless.
- AiEmit type + noopEmit helper
- createEmitQueue: single-consumer push-queue utility
- StreamEventAccumulator: factored from collectStream
- accumulatingEmit factory for terminal-consumer materialization
- AiProviderRunFn shape + registerLegacyStreamFn adapter for old run-fns
- util/worker: registerRunFunction + handleRunCall + callWorkerRunFunction
+ worker proxy threading AiEmit across the worker boundary
Migrates the AI execution path from AsyncIterable returns to the new
Promise+emit shape end-to-end. Also fixes abort leaks uncovered during the
rewrite.
- AiJob.execute collapses to a single Promise<void> + emit
- Strategy interface collapses to single execute(emit) Promise<void>;
DirectExecutionStrategy rewritten
- QueuedExecutionStrategy uses the rate limiter only; storage-queue path
dropped
- AiTask.execute uses accumulatingEmit at the terminal-consumer boundary
- StreamingAiTask + AiChatTask + AiChatWithKbTask bridge to the new shape
- AiProvider base routes legacy run-fns through registerLegacyStreamFn so
not-yet-migrated providers keep working
Brings tests in line with the new run-fn shape and adds RSS-aware reporting to make memory regressions visible. - Align ai-provider + contract test fixtures with Promise+emit shape - RSS-bounded stress test for AiJob run-fn dispatch - Remove obsolete AiProviderRegistry tests - timing: memory usage reporting utility (reportMem + baseline snap) - Use the new memory utility across multiple test suites - Prettier format pass
Final cleanup once all callers and providers are on the Promise+emit shape.
- Drop type annotations from static `requires` declarations (TS infers)
- Transition every remaining caller from legacy stream function to
Promise+emit run-fn
- Remove the bridgeProgress utility (its responsibilities are now covered
by accumulatingEmit / the consumer-side accumulator) and update affected
components
- Rename model download/unload tasks to align with the dispose vocabulary
- Streamline streaming text handling across AiProvider implementations
Removes the postmortem markdown from the working tree and the test comment that referenced it. The leak investigation is captured in the commit history; the .md file doesn't need to live in main. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Matches the capability key (`model.dispose`) that the lookup queries. The legacy `unload` vocabulary was renamed to `dispose` earlier in this branch; this catches a stray local. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WorkerServerBase.handleAbort previously dropped abort messages whose ids had no AbortController yet, so an `abort` racing ahead of its `call` over the message port would be silently lost and the run-fn would execute with an un-aborted signal. Track pending aborts in a bounded set and consume them when handleCall/handleStreamCall/handleRunCall constructs its AbortController, posting the error response immediately when the abort arrives first. Bounded to 1000 entries with LRU-style eviction to match the existing completed-requests cap.
Shared bridge used by StreamingAiTask, AiChatTask, and AiChatWithKbTask:
dispatches a strategy.execute(...) into an emit-queue and yields events
through an `AsyncIterable`, while ensuring that an early `break` /
`return` from the consumer (or an abort on the parent context) cancels
the underlying run-fn.
Mechanism:
- A `localAbort` is linked from `context.signal` (and propagates back to
the parent: if `localAbort` aborts first we leave the parent alone).
- The strategy is invoked with a context whose `signal` is the
`localAbort.signal`. Providers therefore see the abort whenever the
consumer stops iterating.
- A `finally` around the consumer for-await calls `localAbort.abort()`
and `queue.fail(...)`, then awaits `runPromise` (swallowing the
expected abort error) so we never leak a dangling Promise.
…nWithIterable Before: consumer `break` / `return` only closed the local queue; the strategy kept running with the parent `context.signal` (which never fires on its own) and emitted into a closed queue until it eventually finished. A caller-side abort was not visible to the provider stream. Use the new `runWithIterable` helper which wraps a `localAbort` controller into the context's `signal` before handing it to the strategy. On any consumer exit the local signal aborts, the provider tears down, and the run promise is awaited so we never leak it.
Same fix as StreamingAiTask: a consumer that breaks out of the `for await` over a chat turn previously left the provider stream running and the underlying run-promise dangling. Use `runWithIterable` so the local abort fires and the inner-turn run-fn is cancelled when the consumer exits.
Mirror the AiChatTask fix so KB-grounded chat also propagates consumer abort to the inner provider stream. Replaces the inline `createEmitQueue` + `runPromise` pattern with a `runWithIterable` invocation that wires a local AbortController into the strategy's context.
Drives runWithIterable directly via a fake strategy whose execute() never resolves until aborted. The consumer breaks out of the iterator after the first event; the test asserts: - the strategy's signal flips to `aborted` - the run promise settles cleanly - no further events are yielded to the consumer Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… lookup
The resourceScope disposer in AiTask was renamed from looking up
`model.unload` to `model.dispose` during the rename-and-cleanup pass,
intentionally distinguishing in-memory eviction from on-disk removal
(`model.download-remove`). Two places were left out of sync:
- The disposer's stale doc comment still referred to
`model.download-remove`; clarified to call out the dispose / remove
distinction.
- `AiChatWithKbTask.test.ts` still registered a fake run-fn under
`model.download-remove` for the disposer hook, so the disposer
never resolved and `unloadCalls` stayed 0. Register under
`model.dispose` instead.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…main Rebased onto main (#488 added the prettier-organize-imports plugin + husky hook). With -X theirs winning conflicts in our favor, three `export * from` barrels lost main's `// organize-imports-ignore` header. Re-add the comment so the plugin doesn't alphabetize the barrel export order (which would break runtime init). - packages/ai/src/common.ts (existed pre-PR, comment lost in rebase) - packages/ai/src/task/index.ts (existed pre-PR, comment lost in rebase) - packages/ai/src/capability/index.ts (new barrel introduced by this PR) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
6fb6f56 to
20d801a
Compare
d240225 to
ecb45ba
Compare
- Introduced new test files for AiChatTask and AiChatWithKbTask, covering schema validation, input/output handling, and streaming behavior. - Updated runAiProviderConformance to use describe.skipIf for conditional skipping of tests based on options. - Enhanced ImageGenerationPreviewChain and SessionCaching tests to ensure proper functionality and error handling. - Added structured tests for session management in AiProviderRunFn, ensuring sessionId is correctly passed and handled. These changes improve test coverage and reliability for AI task implementations.
… bun tests from CI - Updated various dependencies in package.json and bun.lock, including: - Incremented versions for @types/bun, @types/node, @typescript-eslint/eslint-plugin, @typescript-eslint/parser, @vitest/coverage-v8, @vitest/ui, pkg-pr-new, vitest, ink, react-resizable-panels, vite, better-sqlite3, playwright, and @anthropic-ai/sdk. - Adjusted devDependencies to ensure compatibility with the latest versions. - Commented out test jobs in GitHub Actions workflow for future reference.
ecb45ba to
f4ead3b
Compare
sroussey
pushed a commit
that referenced
this pull request
May 15, 2026
CreateStandardKbStrategyFirstStage.test was cherry-picked from PR #496, where it was authored against the pre-capabilities API. Since then PR #494 (capabilities-squash) landed in main and changed: - registerRunFn(provider, taskType, fn) → registerRunFn(provider, { serves, runFn }) - run-fn return value → emit({ type: "finish", data }) - ModelRecord.tasks → ModelRecord.capabilities Update the test to the post-capabilities shape, mirroring the pattern already used by KnowledgeBaseStandardStrategy.test. Also drop the `as never` casts on the spies — those defeated the type system and caused mockResolvedValue to fail typecheck. https://claude.ai/code/session_01Ya54WFZhpDFzAqRh1qG8Ex
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR squashes 80 commits from
capabilities-eventinto 9 logical commits againstmain. The work introduces capability-based provider dispatch (replacing the per-task-type registry) and migrates the AI execution path to a new Promise+emit run-fn shape.The final tree is identical to
capabilities-event; this branch only restructures history for review.Commits in order
feat(ai): introduce capability-based dispatch (Phases 0-4)— capability registry foundation, renameModelConfig.tasks→capabilities, staticrequireson every AI task class, replace per-task-type registry with capability-set dispatch.refactor(providers): migrate all providers to AiProviderRunFnRegistration[] (Phase 5)— every vendor provider (OpenAI, Anthropic, Gemini, Ollama, HFT, HFI, node-llama-cpp, tf-mediapipe, chrome-ai) migrated to the new registration list shape; pkg-pr-new wired for all 9 vendor packages.fix(ai,providers,test): Phase 5 review feedback and CI/test fixes— addresses Copilot review on PR Add capability system and collectStream utility for AI tasks #479, declaresrequireson remaining tasks, unblocks bun test discovery and conformance suite.feat(ai,test,ci): bridgeProgress utility and large-model integration test harness— streaming progress utility + integration-test scaffolding + CI workflow updates.fix(ai,hft,test,ci): resolve RAG WASM/ONNX memory leaks— cascade of fixes for ONNX/WASM runtime instances leaking across RAG tests (pipeline disposal, provider re-registration, tensor refs, CI parallelism + timeouts).feat(ai,util/worker): Promise+emit run-fn shape foundation—AiEmit,createEmitQueue,StreamEventAccumulator,accumulatingEmit,AiProviderRunFn+ legacy adapter, and worker-sideregisterRunFunction/handleRunCall/callWorkerRunFunction.refactor(ai): migrate execution path to Promise+emit shape—AiJob.execute, both execution strategies,AiTask.execute,StreamingAiTask/AiChatTask/AiChatWithKbTask, andAiProviderbase all collapse to Promise+emit; abort leaks fixed.test(ai,timing): align fixtures and add memory tooling for Promise+emit— fixtures updated; RSS-bounded stress test;reportMem/baselinesnaputility; obsoleteAiProviderRegistrytests removed.refactor(ai): finalize Promise+emit migration and cleanup— drops legacy stream shape, removesbridgeProgressutility, renames model download/dispose tasks, streamlines streaming text handling, drops redundant type annotations.Test plan
bun run build:packagesandbun run build:typessucceedbun scripts/test.ts vitestpasses for the AI + providers + task-graph suitesmain)