Skip to content

feat(worker): Add worker-initiated requests support#138

Merged
taras merged 34 commits intomainfrom
feature/worker-initiated-requests
Feb 18, 2026
Merged

feat(worker): Add worker-initiated requests support#138
taras merged 34 commits intomainfrom
feature/worker-initiated-requests

Conversation

@taras
Copy link
Member

@taras taras commented Jan 31, 2026

Motivation

The current @effectionx/worker package only supports host-to-worker communication: the host can send messages to workers and receive responses. However, some use cases require the reverse direction - workers initiating requests to the host.

A concrete example is the sweatpants project's web worker transport for MCP tool sessions. In this architecture:

  • Tools run in web workers (the "Principal" that initiates requests)
  • The host responds to tool requests (the "Operative")

This is the opposite of the current pattern where the host is always the initiator.

What's New

Progress Streaming Support

This PR now includes bidirectional progress streaming - the host can send progress updates back to the worker during request processing:

Worker side (receive progress):

await workerMain<never, never, string, void, string, string>(
  function* ({ send }) {
    // Simple request/response (unchanged)
    const response = yield* send("hello");
    
    // With progress streaming
    const subscription = yield* send.stream<number>("process");
    let next = yield* subscription.next();
    while (!next.done) {
      console.log("Progress:", next.value);  // 25, 50, 75
      next = yield* subscription.next();
    }
    const response = next.value;  // "done"
  }
);

Host side (send progress):

yield* worker.forEach<string, string, number>(function* (request, ctx) {
  yield* ctx.progress(25);
  yield* ctx.progress(50);
  yield* ctx.progress(75);
  return "done";
});

Backpressure Semantics

The progress() method implements true backpressure:

  • ctx.progress() blocks until the worker calls subscription.next()
  • The host cannot send progress faster than the worker can receive it
  • If the worker does async work between next() calls, the host remains blocked

This ensures the worker is never overwhelmed with progress updates. The ACK is sent inside subscription.next(), so the host waits for the worker to be ready for the next value.

Key Features

  • True backpressure: Host blocks until worker is ready for next progress
  • Backwards compatible: Existing code without progress continues to work
  • Type-safe: Progress type is a separate type parameter

Approach

This PR adds symmetric bidirectional communication by introducing:

New Worker-Side API: send() function

Workers receive a send function in their workerMain options:

await workerMain<never, never, string, void, string, string>(
  function* ({ send }) {
    const response = yield* send("hello");  // Worker initiates request
    return `received: ${response}`;
  }
);

New Host-Side API: worker.forEach() method

Hosts can handle worker requests using forEach:

const worker = yield* useWorker<never, never, string, void>(workerUrl, opts);

const result = yield* worker.forEach<string, string>(function* (request) {
  return `echo: ${request}`;  // Host handles worker's request
});

Channel Primitives

Introduces reusable channel primitives for request-response over MessageChannel:

  • useChannelResponse<TResponse, TProgress>(options?) - Requester side

    • Creates MessageChannel, returns port to transfer and operation to await response
    • yield* response - waits for response, ignores progress
    • yield* response.progress - returns Subscription that yields progress then response
    • Races message vs close events (detects responder crash)
    • Optional timeout via @effectionx/timebox
    • Returns SerializedResult<T> for type-safe error handling
  • useChannelRequest<TResponse, TProgress>(port) - Responder side

    • Wraps received port with resolve(value), reject(error), and progress(data) operations
    • progress() sends update and waits for ACK (true backpressure)
    • Handles serialization internally (wraps in SerializedResult)
    • Races ACK vs close events (detects requester cancellation)

Wire Protocol

Messages sent over channel (Host → Worker):

| { type: "progress"; data: TProgress }
| { type: "response"; result: SerializedResult<TResponse> }

Acknowledgements (Worker → Host):

| { type: "ack" }           // After response
| { type: "progress_ack" }  // After each progress (sent inside next())

Key Design Decisions

  1. MessageChannel per request - Each worker request creates a MessageChannel for correlation (no IDs needed)
  2. SerializedResult type - Type-safe { ok: true, value } or { ok: false, error: SerializedError } for cross-boundary communication
  3. Error serialization - Errors are serialized as { name, message, stack } and wrapped with cause on the receiving side
  4. ACK mechanism - Responses require acknowledgment before cleanup, guaranteeing delivery
  5. Close detection - Both sides detect if the other crashes/cancels via MessagePort close events
  6. Timeout support - Optional timeout parameter on useChannelResponse using @effectionx/timebox
  7. Request queueing - Requests sent before forEach() is called are queued (unbounded)
  8. Concurrent forEach guard - Only one forEach() call allowed at a time
  9. Message loop aligned with resource lifecycle - Uses a pre-attached subscription, resolves the initial "open" via withResolvers, and ensures outcome settles during teardown if the close message arrives late
  10. True backpressure - Progress ACK is sent inside next(), so host waits for worker readiness

Type Parameter Naming

  • TSend/TRecv - Host-initiated communication (host sends, worker receives/responds)
  • WRequest/WResponse - Worker-initiated communication (worker sends, host receives/responds)
  • WProgress - Progress updates from host to worker

Tests

All 54 worker tests pass (4 skipped):

Channel primitive tests (32 tests):

  • Basic channel creation and round-trip
  • ACK verification
  • Close detection (responder crashes/exits)
  • Timeout behavior (slow responder, fast responder)
  • Cancellation handling (requester cancels, responder exits)
  • Progress streaming tests (15 tests):
    • Receives multiple progress then response
    • yield* response ignores progress
    • Error response after progress
    • Port close during progress
    • Backpressure verification (host blocks until worker ready)
    • Order preservation
    • Complex progress data
    • Zero progress updates
    • Requester cancellation during progress
    • Timeout applies to entire exchange

Worker-initiated request tests (15 tests):

  • Single request handling
  • Multiple sequential requests
  • Concurrent requests from worker
  • Error propagation with cause chain
  • Bidirectional communication
  • Request queueing before forEach
  • Concurrent forEach guard
  • Nested send() inside messages.forEach handler

Policy Compliance

Tests comply with the No-Sleep Test Synchronization Policy:

  • Uses withResolvers() for callback synchronization
  • Uses sleep(0) only for yielding control
  • sleep(ms) only inside spawned tasks to trigger conditions

Related Issues

Summary by CodeRabbit

  • New Features

    • Bidirectional worker↔host requests with worker-initiated send (including stream variant) and host-side forEach handling.
    • Real-time progress streaming with backpressure and optional timeouts.
  • Documentation

    • Expanded worker docs with usage examples for worker-initiated requests, progress streaming, and backpressure.
  • Tests

    • Large new test suite and numerous test assets covering channels, progress streaming, timeouts, cancellation, sequencing, and error propagation.
  • Chores

    • Package bumped to 0.5.0 and minor ignore entry added.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Jan 31, 2026

Open in StackBlitz

npm i https://pkg.pr.new/thefrontside/effectionx/@effectionx/worker@138

commit: 6bc2592

@taras taras force-pushed the feature/worker-initiated-requests branch from 5157da1 to 2ae43a1 Compare January 31, 2026 19:19
@taras taras requested a review from cowboyd January 31, 2026 20:05
@taras taras force-pushed the feature/worker-initiated-requests branch 2 times, most recently from 3ba18eb to f7e2e67 Compare February 1, 2026 15:36
});
next = yield* requestSubscription.next();
}
return yield* outcome.operation;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think forEach should not resolve with the worker return value.

For example, a user might assume that the return type of worker.forEach(function*() { return 'some' }) is a string meanwhile the worker actually returns a value of a different type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be a misunderstanding of the type parameters - let me clarify:

  • WResponse is the per-request response that gets sent back to the worker. When the worker calls send('hello'), your handler returns WResponse, and the worker receives it.

  • TReturn (from useWorker<TSend, TRecv, TReturn, TData>) is the worker's final return value - what the worker's main function returns after all its work is done.

These are intentionally different:

// Worker-side
await workerMain(function* ({ send }) {
  const r1 = yield* send('req1');  // Gets back WResponse
  const r2 = yield* send('req2');  // Gets back WResponse  
  return 'all done';               // This is TReturn
});

// Host-side
const worker = yield* useWorker<TSend, TRecv, TReturn>(...);
const result = yield* worker.forEach<WRequest, WResponse>(function* (req) {
  return 'response';  // This is WResponse, sent to worker
});
// result is TReturn ('all done'), not WResponse

forEach serves two purposes:

  1. Handle each request from the worker (returning WResponse)
  2. Block until the worker completes (returning TReturn)

The alternative would be separate methods like worker.handleRequests() + worker.result(), but combining them in forEach felt natural since you typically want to wait for the worker to finish anyway.

Does this clarify the design? Happy to discuss if you think separate methods would be clearer!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yield* worker gives the return value. Why not keep just that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two ways of getting return values are meant for different use cases.

forEach is meant to be used when you expect worker to send you messages. In this case, it seems logical that forEach would give you each message and return value would give you the close value (this is how forEach helper works in stream-helpers)

yield* worker used when you just want to get return value.

They both would effective give you same thing, but with different workers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. That makes sense

Copy link
Collaborator

@joshamaju joshamaju left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've dropped some of my observations.

@coderabbitai
Copy link

coderabbitai bot commented Feb 16, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds bidirectional, MessagePort channel-based worker-host communication: worker-initiated requests with optional progress streaming and backpressure, ACKs and timeouts, serialized errors/results crossing the boundary, new types and APIs (send/send.stream, forEach), many test assets and comprehensive tests, package/tsconfig updates, and removal of a legacy helper.

Changes

Cohort / File(s) Summary
Channel primitives & tests
worker/channel.ts, worker/channel.test.ts
New channel-based request/response primitives: useChannelResponse/useChannelRequest, ChannelMessage/ACK protocol, progress streaming with per-next ACK/backpressure, timeouts, cancellation and port-close handling; large test suite covering round-trips, ACKs, timeouts, backpressure, cancellation, ordering, and error serialization.
Worker runtime & types
worker/types.ts, worker/worker-main.ts, worker/worker.ts
Extends worker model with cross-boundary types (WorkerToHost, SerializedResult, SerializedError, ChannelMessage, ChannelAck), error (de)serialization helpers, WorkerSend and ForEachContext types; workerMain signature expanded to support send/send.stream; host WorkerResource gains send and forEach with single in-flight forEach, buffered request handling, lifecycle and error propagation changes.
Test assets
worker/test-assets/*.ts
Adds many worker test assets exercising single/concurrent/sequential requests, error handling (cause propagation), progress/slow flows, no-requests scenarios, bidirectional and send-inside-foreach patterns.
Tests
worker/worker.test.ts
Adds extensive tests for worker-initiated requests, concurrency, error propagation, bidirectional behavior, progress streaming, and related sequencing; adjusts existing assertions to string containment where appropriate.
Docs, packaging & config
worker/README.md, worker/package.json, worker/tsconfig.json, .gitignore
Docs expanded with usage for worker-initiated requests and progress streaming (including backpressure notes); package bumped to v0.5.0 and adds @effectionx/timebox dependency; tsconfig references timebox; .gitignore now ignores .opencode/plans/.
Removed legacy helper
worker/message-channel.ts
Removes previous useMessageChannel() helper (file deleted), replaced by channel primitives in worker/channel.ts.

Sequence Diagrams

sequenceDiagram
    participant Host
    participant WorkerThread
    participant MessagePort as MessagePort

    Host->>WorkerThread: spawn via useWorker()
    WorkerThread->>Host: post { type: "open" }
    rect rgba(100, 150, 200, 0.5)
        Note over Host,WorkerThread: Worker-initiated request (with response port)
        WorkerThread->>MessagePort: send WorkerToHost { type: "request", value, response: port }
        Host->>MessagePort: receive on port1
        Host->>Host: forEach handler invoked
        Host->>MessagePort: send progress / final response (ChannelMessage)
        WorkerThread->>MessagePort: receive progress/response
        WorkerThread->>MessagePort: send ack / progress_ack
    end
Loading
sequenceDiagram
    participant Worker as WorkerThread
    participant Port as MessagePort
    participant Host

    rect rgba(150, 100, 200, 0.5)
        Note over Worker,Host: Progress streaming (Worker → Host) with backpressure
        Worker->>Port: send progress message
        Host->>Port: receive progress
        Host->>Host: ctx.progress(...) processes progress
        Host->>Port: send progress_ack
        Worker->>Port: await progress_ack
    end

    rect rgba(200, 150, 100, 0.5)
        Note over Worker,Host: Final response and ACK handshake
        Worker->>Port: send final SerializedResult
        Host->>Port: receive final result
        Host->>Port: send ack
        Worker->>Port: receive ack and complete
    end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'feat(worker): Add worker-initiated requests support' directly and accurately describes the main feature being introduced—bidirectional communication allowing workers to initiate requests to the host.
Description check ✅ Passed The PR description comprehensively addresses both required sections: Motivation explains the problem (reverse direction communication for sweatpants project), and Approach provides detailed summary of changes including new APIs, channel primitives, wire protocol, design decisions, and test coverage.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main
Policy Compliance ✅ Passed Pull request complies with Package.json Metadata, Version Bump, and No-Sleep Test Synchronization policies.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/worker-initiated-requests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@worker/channel.ts`:
- Around line 164-211: The current Symbol.iterator implementation (inside
waitForResponse) relies on once(channel.port1, "close") which doesn't exist in
browsers; update the code so that if the MessagePort does not support a "close"
event you either (A) require/validate options.timeout is provided and throw a
clear error when omitted, or (B) implement an explicit liveness protocol: send
periodic "ping"/"keepalive" messages from the caller and expect
"pong"/"progress" or dedicated keepalive ACKs from the responder and treat
missed pings as a crash—modify waitForResponse and the surrounding iterator to
feature-detect support for port close (e.g., check for onclose/event support or
that once(channel.port1,"close") would never resolve) and branch to the
timeout-or-heartbeat strategy accordingly (update related logic in timebox,
options.timeout handling, and the channel.port1 ACK/send paths).

In `@worker/worker.test.ts`:
- Around line 249-251: The test uses time-based sleeps inside the worker.forEach
handler (the yield* sleep(...) call) which makes ordering flaky; replace those
sleeps with a deterministic synchronization primitive (e.g., withResolvers, an
explicit latch/promise pair, or a handshake from the worker asset) so you can
block/unblock the handler deterministically during the test; update the affected
test functions that call worker.forEach<number, number> (and any other handlers
around the mentioned locations) to await or resolve the latch at the precise
points where you currently sleep, ensuring the test controls request timing
without timeouts.
🧹 Nitpick comments (2)
worker/types.ts (1)

84-90: Consider preserving error.cause in serialization.

The serializeError function serializes name, message, and stack, but does not preserve error.cause. If the original error has a cause chain, that context is lost. This may be intentional (since cause could contain non-serializable data), but worth noting.

Optional: Recursively serialize cause if present
 export function serializeError(error: Error): SerializedError {
+  const cause = error.cause instanceof Error 
+    ? serializeError(error.cause) 
+    : undefined;
   return {
     name: error.name,
     message: error.message,
     stack: error.stack,
+    cause,
   };
 }

This would require adding cause?: SerializedError to the SerializedError interface.

worker/worker.ts (1)

16-21: Separate type imports from value imports.

Per coding guidelines, prefer type imports for type-only imports.

♻️ Suggested refactor
 import { useChannelRequest, useChannelResponse } from "./channel.ts";
-import {
-  type ForEachContext,
-  type SerializedError,
-  errorFromSerialized,
-} from "./types.ts";
+import type { ForEachContext, SerializedError } from "./types.ts";
+import { errorFromSerialized } from "./types.ts";

@taras taras requested a review from joshamaju February 16, 2026 19:31
taras added a commit that referenced this pull request Feb 16, 2026
…rEach test

Replace sleep(100) with withResolvers latch for deterministic test
synchronization per No-Sleep Test Sync policy.

Addresses CodeRabbit feedback on PR #138.
@taras
Copy link
Member Author

taras commented Feb 16, 2026

Addressed CodeRabbit Feedback

1. Browser MessagePort close event (Major)

Created issue #161 to track this. The browser MessagePort doesn't have a close event - this is a Node.js-only feature. Will address in a separate PR with options:

  • Document that browser usage requires timeout
  • Implement heartbeat/keepalive protocol
  • Feature detection for different environments

2. Sleep-based test synchronization (Minor)

Fixed in c66624d - replaced sleep(100) with deterministic withResolvers latch in the "throws error when forEach is called concurrently" test.

Note: Other sleep usages are acceptable per the No-Sleep Test Sync policy:

  • sleep(0) for yielding control (explicitly allowed)
  • sleep(request * 10) simulates processing time, not synchronization

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
worker/worker.test.ts (1)

249-252: Consider deterministic ordering instead of timing-based delays.

The sleep(request * 10) simulates different processing times to test concurrent request handling. While this isn't strictly test synchronization, timing-based ordering can be flaky under heavy system load. Consider using latches or signals to control completion order deterministically if flakiness is observed.

taras added a commit that referenced this pull request Feb 16, 2026
…rEach test

Replace sleep(100) with withResolvers latch for deterministic test
synchronization per No-Sleep Test Sync policy.

Addresses CodeRabbit feedback on PR #138.
@taras taras force-pushed the feature/worker-initiated-requests branch from c66624d to 8383206 Compare February 16, 2026 19:44
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
Before applying any fix, first verify the finding against the current code and
decide whether a code change is actually needed. If the finding is not valid or
no change is required, do not modify code for that item and briefly explain why
it was skipped.

In `@worker/channel.test.ts`:
- Around line 8-11: The test helper sleepThenTimeout and the race-based,
time-sleep waits should be replaced with deterministic synchronization using the
existing helpers: remove sleepThenTimeout and stop relying on long sleep(ms) in
race patterns; instead use withResolvers to create controllable resolvers, wait
for the channel's close event (e.g., subscribe to the channel close or use the
channel.close promise) and use when/is with a short timeout wrapper to assert
ordering, using sleep(0) only if you need to yield the event loop; update
occurrences that call sleepThenTimeout or perform time-based races (including
the race patterns around close detection) to resolve via withResolvers +
close-event + when(timeout) so tests are deterministic.

In `@worker/channel.ts`:
- Around line 326-378: Extract the duplicated ACK-waiting logic from the
generator methods *resolve and *reject into a small shared helper (e.g.,
waitForAck or awaitAck) that accepts the MessagePort (port) and returns after
validating an incoming ChannelAck or returns early on port "close"; use the same
race([ once(port,"message"), once(port,"close") ]) logic, cast the event to
Event/MessageEvent, check for close and for ack?.type === "ack" and throw the
same error if validation fails, and keep port cleanup in the existing finally
blocks; then replace the duplicated blocks in *resolve and *reject with a single
call to this helper (referencing ChannelMessage<TResponse,TProgress>, ChannelAck
and serializeError from the current scope).

In `@worker/types.ts`:
- Around line 84-90: serializeError currently omits nested causes; update the
SerializedError type to include an optional "cause?: SerializedError | string |
undefined" and modify the serializeError function to capture error.cause — if
cause is an Error, recursively call serializeError(cause); if cause is a string
or other primitive, store it directly (or convert to string); if absent, leave
undefined. Ensure you reference the serializeError function and the
SerializedError type so the change is propagated where serialized errors are
created and consumed.

In `@worker/worker-main.ts`:
- Around line 106-116: The package version for the worker package was not bumped
for this new feature; update the package manifest by changing the "version"
field in package.json for the worker package to 0.5.0 (semantic minor bump for a
new feature), ensure any lockfile or workspace manifest (e.g.,
package-lock.json, pnpm-lock.yaml, or root package.json workspaces) is
updated/committed to reflect the new version, and add a short changelog entry
noting the feature if one exists; refer to the workerMain symbol in
worker-main.ts to locate the package that needs the version bump.

taras added a commit that referenced this pull request Feb 16, 2026
Extract duplicate ACK-waiting code from resolve(), reject(), and progress()
methods into a shared waitForAck() helper function. This reduces code
duplication and makes the ACK handling more maintainable.

Addresses CodeRabbit nitpick feedback on PR #138.
taras added a commit that referenced this pull request Feb 16, 2026
Add recursive serialization of error.cause in serializeError() so
nested error chains are preserved when errors cross the worker boundary.
This provides better debugging context when errors have underlying causes.

Addresses CodeRabbit nitpick feedback on PR #138.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
Verify each finding against the current code and only fix it if needed.


In `@worker/channel.test.ts`:
- Around line 723-731: The timing assertions for progressDurations are flaky on
slow CI; update the test that checks progressDurations[0] and
progressDurations[1] (which compares against processingTime) to avoid tight
millisecond bounds—either increase the tolerances (e.g., use larger thresholds
than 20ms and processingTime - 10) or replace the timing checks with
ordering/signal-based checks (resolve a Promise or set flags when the first
progress completes and assert the second progress happens after that signal) so
the test verifies backpressure/order without relying on brittle exact timing.

In `@worker/channel.ts`:
- Around line 260-269: The timeout applied in the progress subscription is
per-call (each invocation of waitForNext via next()), not cumulative for the
whole exchange; update the code around the timebox(timeout, waitForNext) block
(symbols: timebox, waitForNext, timeout, next()) to add a concise comment
stating this per-call semantics and, if you want cumulative behavior, implement
tracking of elapsed time (record start time, subtract elapsed from timeout for
subsequent timebox calls) or mention that alternative in the comment so callers
understand how to change it.

Add 14 new test cases and 9 test asset workers for the upcoming
worker-initiated requests feature. This allows workers to send
requests to the host, enabling bidirectional communication.

Tests are written first (TDD) and will fail until implementation
is complete. The PLAN-worker-requests.md contains the detailed
implementation plan with all design decisions documented.
Implements the worker→host request/response pattern allowing workers
to send requests to the host and receive responses. This enables use
cases like tool sessions where workers need to call back to the host.

Changes:
- Add send() function to workerMain options for worker→host requests
- Add forEach() method to WorkerResource for host to handle requests
- Implement error serialization with cause chain for cross-boundary errors
- Use race() pattern so forEach exits cleanly when worker closes
- Add concurrent forEach guard with try/finally cleanup
- Fix bidirectional worker test to spawn messages.forEach in background

All 14 new tests pass along with existing tests.
Verifies that workers can call send() to the host while handling
a message from the host inside messages.forEach. This tests the
nested bidirectional communication pattern.
Clarifies that these type parameters are for worker-initiated requests,
distinguishing them from the host-initiated TSend/TRecv types.

Also replaces race() with spawn() for the request processor loop to
improve compatibility with effection 3.0.0.
Ensures compatibility with effection 3.0.0 on macOS by explicitly
halting the request processor task when the outcome resolves, rather
than relying on implicit scope cleanup.
Replace signal/each pattern with a callback-based approach that avoids
the race condition in effection v3. Instead of spawning a processor
task that blocks on a signal iterator, we now:

1. Queue requests that arrive before forEach is called
2. Set a handler callback when forEach is active
3. Dispatch requests via scope.run() from the message handler
4. Clear the handler when forEach exits

This eliminates the need to halt a blocking iterator, which had
inconsistent behavior between effection v3 and v4.
Add two complementary channel primitives for request-response communication:

- useChannelResponse: Requester side - creates MessageChannel, returns port
  to transfer and operation to await response (with automatic ACK)
- useChannelRequest: Responder side - wraps received port, provides
  resolve/reject operations that wait for ACK before closing

Features:
- ACK mechanism guarantees response delivery before port cleanup
- Simple validation throws on unexpected messages
- Proper resource cleanup with finally blocks
- Full test coverage (9 tests)
Plan for integrating useChannelResponse/useChannelRequest into worker
implementation. Includes prerequisite fix for cancellation handling:
- Race ACK wait against port close event
- Prevents responder from hanging when requester is cancelled
taras added 25 commits February 18, 2026 08:52
- Add SerializedResult<T> type to fix error typing mismatch
- Clarify that ACK is sent for both Ok and Err responses
- Update all code examples to use SerializedOk/SerializedErr
- Note that resolve() is used for both success/error (reject is for Operation errors)
Explicitly document that useChannelResponse and useChannelRequest
handle port.start() and port.close() internally, so removing these
calls from worker code is intentional, not a regression.
Add tests to plan for:
- ACK sent/received on error path (not just success)
- Port closes if responder exits without calling resolve/reject
- Port closes if responder throws before responding
…internally

- resolve(value) wraps in { ok: true, value } internally
- reject(error) serializes and wraps in { ok: false, error } internally
- Callers use natural resolve/reject semantics
- Channel primitive handles SerializedResult wrapping
- Remove SerializedOk/SerializedErr helper functions from exports
- ChannelResponse.operation returns SerializedResult<T>
- Use withResolvers for synchronization instead of sleep
- Add tests for requester cancellation while waiting
- Add tests for requester scope exit without calling operation
- All tests use explicit signals for coordination
- Add SerializedResult<T> type for type-safe cross-boundary error handling
- Update useChannelResponse with close detection and optional timeout
- Update useChannelRequest to wrap responses in SerializedResult internally
- Replace manual MessageChannel usage in worker.ts and worker-main.ts
- Add @effectionx/timebox dependency for timeout support
- Add comprehensive tests for close detection, timeout, and cancellation
- Add .opencode/plans/ to .gitignore
- Remove redundant port.close() from resolve/reject (finally handles it)
- Document reject() as application-level error, not transport error
- Document close event runtime behavior across Node.js/browser/Deno
- Refactor fragile cancellation test to use raw postMessage for precise signaling
Make ChannelResponse<T> extend Operation<SerializedResult<T>> so callers
can yield the response object directly instead of using a separate
.operation property.

Before:
  const { port, operation } = yield* useChannelResponse<T>();
  const result = yield* operation;

After:
  const response = yield* useChannelResponse<T>();
  const result = yield* response;
Extend channel primitives to support bidirectional progress streaming:

- useChannelResponse: add 'progress' property returning Subscription
- useChannelRequest: add 'progress()' method with backpressure (ACK)
- forEach: pass ForEachContext with progress() to handler
- send.stream<TProgress>(): receive progress updates from host

Wire protocol extended with progress/progress_ack message types.
All tests comply with no-sleep-test-sync policy using withResolvers()
for deterministic synchronization.

Tests: 53 pass (14 new progress streaming tests)
- Add progress streaming section to README with examples
- Document true backpressure: host blocks until worker calls next()
- Add test proving backpressure behavior (host waits for worker readiness)
Address reviewer feedback: errors in forEach handler should crash the host,
not be silently swallowed. The error is still forwarded to the worker so
it knows the request failed, but then re-thrown so the host doesn't
continue running after a handler failure.

This change also serializes request handling (one at a time) which makes
the error semantics cleaner - if a handler fails, we stop immediately.
…rEach test

Replace sleep(100) with withResolvers latch for deterministic test
synchronization per No-Sleep Test Sync policy.

Addresses CodeRabbit feedback on PR #138.
This release adds worker-initiated requests with progress streaming,
bidirectional communication, and channel-based messaging primitives.
Use @effectionx/timebox for timeout handling instead of custom
sleepThenTimeout helper. This provides cleaner semantics with
the Timeboxed<T> discriminated union type.

Addresses CodeRabbit feedback on sleep-based test synchronization.
Extract duplicate ACK-waiting code from resolve(), reject(), and progress()
methods into a shared waitForAck() helper function. This reduces code
duplication and makes the ACK handling more maintainable.

Addresses CodeRabbit nitpick feedback on PR #138.
Add recursive serialization of error.cause in serializeError() so
nested error chains are preserved when errors cross the worker boundary.
This provides better debugging context when errors have underlying causes.

Addresses CodeRabbit nitpick feedback on PR #138.
Increase tolerance bounds in backpressure timing test to avoid
flakiness on slow CI systems:
- First progress: 20ms -> 50ms
- Second progress: processingTime - 10 -> processingTime - 20

Addresses CodeRabbit nitpick on timing-based assertions.
…tion

Add comment explaining that timeout is applied per next() call,
not cumulatively for the entire exchange. This helps callers
understand the behavior when dealing with many progress updates.

Addresses CodeRabbit nitpick on timeout semantics documentation.
@taras taras force-pushed the feature/worker-initiated-requests branch from add0c15 to 6bc2592 Compare February 18, 2026 13:53
@taras taras merged commit a43c89f into main Feb 18, 2026
7 checks passed
@taras taras deleted the feature/worker-initiated-requests branch February 18, 2026 13:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants