Skip to content

Add temporalio.contrib.pubsub module#1423

Open
jssmith wants to merge 57 commits intomainfrom
contrib/pubsub
Open

Add temporalio.contrib.pubsub module#1423
jssmith wants to merge 57 commits intomainfrom
contrib/pubsub

Conversation

@jssmith
Copy link
Copy Markdown
Contributor

@jssmith jssmith commented Apr 7, 2026

What was changed

Adds temporalio.contrib.pubsub, a reusable pub/sub primitive for streaming data out of Temporal workflows.

Why?

Streaming incremental results from long-running workflows (e.g., AI agent token streams, progress updates) is a common need with no built-in solution. This module provides a correct, reusable implementation so users don't have to roll their own poll/signal/dedup logic.

Checklist

  1. Closes — N/A (new contrib module, no existing issue)

  2. How was this tested:

    • 24 pytest tests in tests/contrib/pubsub/test_pubsub.py covering batching, flush safety, CAN serialization, replay guards, dedup (TTL pruning, truncation), offset-based resumption, max_batch_size, drain, and error handling
    • Demo application
    • Shared with prospective users
  3. Any docs updates needed?

    • Module includes README.md with usage examples and API reference
    • Design docs: DESIGN-v2.md, and addenda covering CAN, dedup, and topic semantics
    • No docs.temporal.io updates yet — will add once the API stabilizes

jssmith and others added 15 commits April 5, 2026 21:33
A workflow mixin (PubSubMixin) that turns any workflow into a pub/sub
broker. Activities and starters publish via batched signals; external
clients subscribe via long-poll updates exposed as an async iterator.

Key design decisions:
- Payloads are opaque bytes for cross-language compatibility
- Topics are plain strings, no hierarchy or prefix matching
- Global monotonic offsets (not per-topic) for simple continuation
- Batching built into PubSubClient with Nagle-like timer + priority flush
- Structured concurrency: no fire-and-forget tasks, trio-compatible
- Continue-as-new support: drain_pubsub() + get_pubsub_state() + validator
  to cleanly drain polls, plus follow_continues on the subscriber side

Module layout:
  _types.py  — PubSubItem, PublishInput, PollInput, PollResult, PubSubState
  _mixin.py  — PubSubMixin (signal, update, query handlers)
  _client.py — PubSubClient (batcher, async iterator, CAN resilience)

9 E2E integration tests covering: activity publish + subscribe, topic
filtering, offset-based replay, interleaved workflow/activity publish,
priority flush, iterator cancellation, context manager flush, concurrent
subscribers, and mixin coexistence with application signals/queries.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
PubSubState is now a Pydantic model so it survives serialization through
Pydantic-based data converters when embedded in Any-typed fields. Without
this, continue-as-new would fail with "'dict' object has no attribute 'log'"
because Pydantic deserializes Any fields as plain dicts.

Added two CAN tests:
- test_continue_as_new_any_typed_fails: documents that Any-typed fields
  lose PubSubState type information (negative test)
- test_continue_as_new_properly_typed: verifies CAN works with properly
  typed PubSubState | None fields

Simplified subscribe() exception handling: removed the broad except
Exception clause that tried _follow_continue_as_new() on every error.
Now only catches WorkflowUpdateRPCTimeoutOrCancelledError for CAN follow.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
README.md: usage-oriented documentation covering workflow mixin, activity
publishing, subscribing, continue-as-new, and cross-language protocol.

flush() safety: items are now removed from the buffer only after the
signal succeeds. Previously, buffer.clear() ran before the signal,
losing items on failure. Added test_flush_retains_items_on_signal_failure.

init_pubsub() guard: publish() and _pubsub_publish signal handler now
check for initialization and raise a clear RuntimeError instead of a
cryptic AttributeError.

PubSubClient.for_workflow() factory: preferred constructor that takes a
Client + workflow_id. Enables follow_continues in subscribe() without
accessing private WorkflowHandle._client. The handle-based constructor
remains for simple cases that don't need CAN following.

activity_pubsub_client() now uses for_workflow() internally with proper
keyword-only typed arguments instead of **kwargs: object.

CAN test timing: replaced asyncio.sleep(2) with assert_eq_eventually
polling for a different run_id, matching sdk-python test patterns.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
_pubsub_poll and _pubsub_offset now call _check_initialized() for a
clear RuntimeError instead of cryptic AttributeError when init_pubsub()
is forgotten.

README CAN example now includes the required imports (@DataClass,
workflow) and @workflow.init decorator.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The poll validator accesses _pubsub_draining, which would AttributeError
if init_pubsub() was never called. Added _check_initialized() guard.

Fixed PubSubState docstring: the field must be typed as PubSubState | None,
not Any. The old docstring incorrectly implied Any-typed fields would work.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
get_pubsub_state() and drain_pubsub() now call _check_initialized().
Previously drain_pubsub() could silently set _pubsub_draining on an
uninitialized instance, which init_pubsub() would then reset to False.

New tests:
- test_max_batch_size: verifies auto-flush when buffer reaches limit,
  using max_cached_workflows=0 to also test replay safety
- test_replay_safety: interleaved workflow/activity publish with
  max_cached_workflows=0, proving the mixin is determinism-safe

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Review comments (#@agent: annotations) capture design questions on:
- Topic offset model and information leakage (resolved: global offsets
  with BFF-layer containment, per NATS JetStream model)
- Exactly-once publish delivery (resolved: publisher ID + sequence number
  dedup, per Kafka producer model)
- Flush concurrency (resolved: asyncio.Lock with buffer swap)
- CAN follow behavior, poll rate limiting, activity context detection,
  validator purpose, pyright errors, API ergonomics

DESIGN-ADDENDUM-TOPICS.md: full exploration of per-topic vs global offsets
with industry survey (Kafka, Redis, NATS, PubNub, Google Pub/Sub,
RabbitMQ). Concludes global offsets are correct for workflow-scoped
pub/sub; leakage contained at BFF trust boundary.

DESIGN-ADDENDUM-DEDUP.md: exactly-once delivery via publisher ID +
monotonic sequence number. Workflow dedup state is dict[str, int],
bounded by publisher count. Buffer swap pattern with sequence reuse
on failure. PubSubState carries publisher_sequences through CAN.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Types:
- Remove offset from PubSubItem (global offset is now derived)
- Add publisher_id + sequence to PublishInput for exactly-once dedup
- Add base_offset + publisher_sequences to PubSubState for CAN
- Use Field(default_factory=...) for Pydantic mutable defaults

Mixin:
- Add _pubsub_base_offset for future log truncation support
- Add _pubsub_publisher_sequences for signal deduplication
- Dedup in signal handler: reject if sequence <= last seen
- Poll uses base_offset arithmetic for offset translation
- Class-body type declarations for basedpyright compatibility
- Validator docstring explaining drain/CAN interaction
- Module docstring gives specific init_pubsub() guidance

Client:
- asyncio.Lock + buffer swap for flush concurrency safety
- Publisher ID (uuid) + monotonic sequence for exactly-once delivery
- Sequence advances on failure to prevent data loss when new items
  merge with retry batch (found via Codex review)
- Remove follow_continues param — always follow CAN via describe()
- Configurable poll_interval (default 0.1s) for rate limiting
- Merge activity_pubsub_client() into for_workflow() with auto-detect
- _follow_continue_as_new is async with describe() check

Tests:
- New test_dedup_rejects_duplicate_signal
- Updated flush failure test for new sequence semantics
- All activities use PubSubClient.for_workflow()
- Remove PubSubItem.offset assertions
- poll_interval=0 in test helper for speed

Docs:
- DESIGN-v2.md: consolidated design doc superseding original + addenda
- README.md: updated API reference
- DESIGN-ADDENDUM-DEDUP.md: corrected flush failure semantics

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrite the client-side dedup algorithm to match the formally verified
TLA+ protocol: failed flushes keep a separate _pending batch and retry
with the same sequence number. Only advance the confirmed sequence on
success. TLC proves NoDuplicates and OrderPreserved for the correct
algorithm, and finds duplicates in the old algorithm.

Add TTL-based pruning of publisher dedup entries during continue-as-new
(default 15 min). Add max_retry_duration (default 600s) to bound client
retries — must be less than publisher_ttl for safety. Both constraints
are formally verified in PubSubDedupTTL.tla.

Add truncate_pubsub() for explicit log prefix truncation. Add
publisher_last_seen timestamps for TTL tracking. Preserve legacy state
without timestamps during upgrade.

API changes: for_workflow→create, flush removed (use priority=True),
poll_interval→poll_cooldown, publisher ID shortened to 16 hex chars.

Includes TLA+ specs (correct, broken, inductive, multi-publisher TTL),
PROOF.md with per-action preservation arguments, scope and limitations.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New analysis document evaluates whether publishing should use signals
or updates, examining Temporal's native dedup (Update ID per-run,
request_id for RPCs) vs the application-level (publisher_id, sequence)
protocol. Conclusion: app-level dedup is permanent for signals but
could be dropped for updates once temporal/temporal#6375 is fixed.
Non-blocking flush keeps signals as the right choice for streaming.

Updates DESIGN-v2.md section 6 to be precise about the two Temporal
guarantees that signal ordering relies on: sequential send order and
history-order handler invocation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Analyzes deduplication through the end-to-end principle lens. Three
types of duplicates exist in the pipeline, each handled at the layer
that introduces them:

- Type A (duplicate LLM work): belongs at application layer — data
  escapes to consumers before the duplicate exists, so only the
  application can resolve it
- Type B (duplicate signal batches): belongs in pub/sub workflow —
  encapsulates transport details and is the only layer that can
  detect them correctly
- Type C (duplicate SSE delivery): belongs at BFF/browser layer

Concludes the (publisher_id, sequence) protocol is correctly placed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… design

Fill gaps identified during design review:
- Document why per-topic offsets were rejected (trust model, cursor
  portability, unjustified complexity) inline rather than only in historical
  addendum
- Expand BFF section with the four reconnection options considered and
  the decision to use SSE Last-Event-ID with BFF-assigned gapless IDs
- Add poll efficiency characteristics (O(new items) common case)
- Document BFF restart fallback (replay from turn start)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wire types (PublishEntry, _WireItem, PollResult, PubSubState) encode
data as base64 strings for cross-language compatibility across all
Temporal SDKs. User-facing types (PubSubItem) use native bytes.

Conversion happens inside handlers:
- Signal handler decodes base64 → bytes on ingest
- Poll handler encodes bytes → base64 on response
- Client publish() accepts bytes, encodes for signal
- Client subscribe() decodes poll response, yields bytes

This means Go/Java/.NET ports get cross-language compat for free since
their JSON serializers encode byte[] as base64 by default.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread temporalio/contrib/pubsub/_types.py Outdated
class PubSubState(BaseModel):
"""Serializable snapshot of pub/sub state for continue-as-new.

This is a Pydantic model (not a dataclass) so that Pydantic-based data
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it needs to somehow be clear that the pydantic data converter is required for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. It turns out we can remove the Pydantic dependency.

jssmith and others added 14 commits April 7, 2026 20:10
Remove the bounded poll wait from PubSubMixin and trim trailing
whitespace from types. Update DESIGN-v2.md with streaming plugin
rationale (no fencing needed, UI handles repeat delivery).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add opt-in streaming code path to both agent framework plugins.
When enabled, the model activity calls the streaming LLM endpoint,
publishes TEXT_DELTA/THINKING_DELTA/TOOL_CALL_START events via
PubSubClient as a side channel, and returns the complete response
for the workflow to process (unchanged interface).

OpenAI Agents SDK:
- ModelActivityParameters.enable_streaming flag
- New invoke_model_activity_streaming method on ModelActivity
- ModelResponse reconstructed from ResponseCompletedEvent
- Uses @_auto_heartbeater for periodic heartbeats
- Routing in _temporal_model_stub (rejects local activities)

Google ADK:
- TemporalModel(streaming=True) constructor parameter
- New invoke_model_streaming activity using stream=True
- Registered in GoogleAdkPlugin

Both use batch_interval=0.1s for near-real-time token delivery.
No pubsub module changes needed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Pydantic BaseModel was introduced as a workaround for Any-typed fields
losing type information during continue-as-new serialization. The actual fix
is using concrete type annotations (PubSubState | None), which the default
data converter handles correctly for dataclasses — no Pydantic dependency
needed.

This removes the pydantic import from the pubsub contrib module entirely,
making it work out of the box with the default data converter. All 18 tests
pass, including both continue-as-new tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implements DESIGN-ADDENDUM-ITEM-OFFSET.md. The poll handler now annotates
each item with its global offset (base_offset + position in log), enabling
subscribers to track fine-grained consumption progress for truncation.
This is needed for the voice-terminal agent where audio chunks must not be
truncated until actually played, not merely received.

- Add offset field to PubSubItem and _WireItem (default 0)
- Poll handler computes offset from base_offset + log_offset + enumerate index
- subscribe() passes wire_item.offset through to yielded PubSubItem
- Tests: per-item offsets, offsets with topic filtering, offsets after truncation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Documents the motivation and design for adding offset fields to
PubSubItem and _WireItem, enabling subscribers to track consumption
at item granularity rather than batch boundaries. Driven by the
voice-terminal agent's need to truncate only after audio playback,
not just after receipt.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Three changes:

1. Poll handler: replace ValueError with ApplicationError(non_retryable=True)
   when requested offset has been truncated. This fails the UPDATE (client
   gets the error) without crashing the WORKFLOW TASK — avoids the poison
   pill during replay that caused permanent workflow failures.

2. Poll handler: treat from_offset=0 as "from the beginning of whatever
   exists" (i.e., from base_offset). This lets subscribers recover from
   truncation by resubscribing from 0 without knowing the current base.

3. PubSubClient.subscribe(): catch WorkflowUpdateFailedError with type
   TruncatedOffset and retry from offset 0, auto-recovering.

New tests:
- test_poll_truncated_offset_returns_application_error
- test_poll_offset_zero_after_truncation
- test_subscribe_recovers_from_truncation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Verify that PubSubClient can subscribe to events from a different
workflow (same namespace) and that Nexus operations can start pub/sub
broker workflows in a separate namespace with cross-namespace
subscription working end-to-end. No library changes needed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Poll responses now estimate wire size (base64 data + topic) and stop
adding items once the response exceeds 1MB. The new `more_ready` flag
on PollResult tells the subscriber that more data is available, so it
skips the poll_cooldown sleep and immediately re-polls. This avoids
unnecessary latency during big reloads or catch-up scenarios while
keeping individual update payloads within Temporal's recommended limits.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Codify the four wire evolution rules that have been followed implicitly
through four addenda: additive-only fields with defaults, immutable
handler names, forward-compatible PubSubState, and no application-level
version negotiation. Includes a precedent table showing all past changes
and reasoning for why version fields in payloads would cause silent data
loss on signals.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
After max_retry_duration expires, the client dropped the pending batch
without advancing _sequence. The next batch reused the same sequence
number, which could be silently deduplicated by the workflow if the
timed-out signal was actually delivered — causing permanent data loss
for those items.

The fix advances _sequence to _pending_seq before clearing _pending,
ensuring subsequent batches always get a fresh sequence number.

TLA+ verification:
- Added DropPendingBuggy/DropPendingFixed actions to PubSubDedup.tla
- Added SequenceFreshness invariant: (pending=<<>>) => (confirmed_seq >= wf_last_seq)
- BuggyDropSpec FAILS SequenceFreshness (confirmed_seq=0 < wf_last_seq=1)
- FixedDropSpec PASSES all invariants (489 distinct states)
- NoDuplicates passes for both — the bug causes data loss, not duplicates

Python test:
- test_retry_timeout_sequence_reuse_causes_data_loss demonstrates the
  end-to-end consequence: reused seq=1 is rejected, fresh seq=2 accepted

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
# Conflicts:
#	temporalio/contrib/google_adk_agents/_model.py
This is a new release with no legacy to support. Changes:

- _mixin.py: Remove ts-is-None fallback that retained publishers without
  timestamps. All publishers always have timestamps, so this was dead code.
- _types.py: Clean up docstrings referencing addendum docs
- DESIGN-v2.md: Remove backward-compat framing, addendum references, and
  historical file listing. Keep the actual evolution rules.
- PROOF.md: "Legacy publisher_id" → "Empty publisher_id"
- README.md: Reference DESIGN-v2.md instead of deleted addendum
- Delete DESIGN.md and 4 DESIGN-ADDENDUM-*.md files (preserved in
  the top-level streaming-comparisons repo)
- Delete stale TLA+ trace .bin files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Simplify the README to focus on essential API patterns. Rename
for_workflow() to create() throughout, condense the topics section,
remove the exactly-once and type-warning sections (these details
belong in DESIGN-v2.md), and update the API reference table with
current parameter signatures. Also fix whitespace alignment in
DESIGN-v2.md diagram.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…de pubsub state

The CAN example only showed pubsub_state being passed through, which could
mislead readers into thinking that's all that's needed. Updated to include
a representative application field (items_processed) to make it clear that
your own workflow state must also be carried across the CAN boundary.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
jssmith and others added 2 commits April 20, 2026 11:54
…onale

Remove references to PubSubDedup.tla from code comments, test docstrings,
and the design doc — the TLA+ spec was moved out of the published module.

Add design rationale for opaque bytes vs typed payloads (decoupling,
layering, type hints). Document the JSON data converter requirement for
cross-language interop in both the design doc and README.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized some of these comments I had written a while ago I never submitted - but we need to move this out of draft anyway to get more eyes

Comment thread temporalio/contrib/pubsub/DESIGN-v2.md Outdated
Comment on lines +74 to +76
Call `init_pubsub()` in `__init__` for fresh workflows. When accepting
continue-as-new state, call it in `run()` with the `prior_state` argument
(see [Continue-as-New](#continue-as-new)).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This distinction seems like a footgun. Input is available in both init and run, IMO we should advise always doing it in the same place, and always passing prior_state, if they've defined it as part of their input. If it's not present, it's harmless.

In fact I would probably go so far as to say prior_state should be required, which will make sure people put it in their workflow input.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - updated.

Comment thread temporalio/contrib/pubsub/DESIGN-v2.md Outdated
| `get_offset()` | Query current global offset. |

Use as `async with` for batched publishing with automatic flush on exit.
There is no public `flush()` method — use `priority=True` on `publish()`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

priority seems like it should instead be named force_flush

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion

Comment thread temporalio/contrib/pubsub/DESIGN-v2.md Outdated
Comment on lines +127 to +128
When called from within an activity, `client` and `workflow_id` can be
omitted from `create()` — they are inferred from the activity context:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be different create functions for these cases, mostly because it's too easy to fail to realize you need to pass workflow id/client in the not-inside-an-activity case.

Copy link
Copy Markdown
Contributor Author

@jssmith jssmith Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I wasn't thrilled with re-using create / having different sets of args in different circumstances..

Comment thread temporalio/contrib/pubsub/DESIGN-v2.md Outdated
@dataclass
class PubSubItem:
topic: str # Topic string
data: bytes # Opaque payload
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should probably be our actual Payload types, so that users can run them through their data converters.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have implemented data as a Payload. It hadn't occurred to me that we could use the data converters at this layer.

Comment thread temporalio/contrib/pubsub/DESIGN-v2.md Outdated
Comment on lines +318 to +319
Truncation is deferred to a future iteration. Until then, the log grows
without bound within a run and is compacted only through continue-as-new.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being able to save space in the workflow itself is maaaaaybe useful, but the limiting factor is think almost always going to be workflow history size, which can't be truncated.

So not totally sure we'd ever use this, but, I suppose it doesn't hurt to have it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support continue-as-new. Truncation was implemented.

Comment on lines +332 to +333
A previous design used a 5-minute timeout as a defensive "don't block
forever" mechanism. This was removed because:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just... the update caller can time out their RPC however they like.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

`verification/PubSubDedup.tla` for the spec.

```python
async def _flush(self) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this doesn't match what's in the actual implementation code, would be easier to just refer to that. But I guess more importantly, is the TLA spec matching the latest iteration of the real implementation?

I think the idea of using TLA is cool, but, interpreting the spec is... difficult.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Algo wise, forgive me if this doesn't make sense since I haven't gone through it with a fine-tooth comb, but, wouldn't it make sense to automatically skip over any items whose seq < the confirmed sequence? I'm not sure if that situation just can't arise by construction but, if it can, it'd be more efficient to do it here than in the workflow.

jssmith and others added 12 commits April 22, 2026 13:10
Review pass over tests/contrib/pubsub/test_pubsub.py:

Delete redundant tests:
- test_poll_offset_zero_after_truncation and
  test_per_item_offsets_after_truncation were fully covered by
  test_truncate_pubsub and test_subscribe_recovers_from_truncation.
- test_small_response_more_ready_false was the trivial branch of the
  big-response test; fold a single more_ready=False assertion into
  test_poll_more_ready_when_response_exceeds_size_limit instead of
  standing up a separate workflow.
- test_subscribe_from_offset merged into test_per_item_offsets, renamed
  to test_subscribe_from_offset_and_per_item_offsets.
- test_retry_timeout_sequence_reuse_causes_data_loss was effectively a
  rename of test_dedup_rejects_duplicate_signal and asserted the BUG
  (silent dedup) rather than the FIX, so it would fail if the behavior
  became stricter.

Rewrite white-box tests to be behavioral:
- test_flush_keeps_pending_on_signal_failure and
  test_max_retry_duration_expiry asserted on private _buffer, _pending,
  _pending_seq, _sequence fields — any refactor of the retry state
  machine broke them even with preserved behavior. Replaced with
  test_flush_retry_preserves_items_after_failures and
  test_flush_raises_after_max_retry_duration, which use
  patch.object(handle, "signal", ...) to inject delivery failures
  against a real workflow and assert observable outcomes.
- test_continue_as_new_any_typed_fails used an absence-timeout
  assertion (len == 0 within 3s) that would flake on slow CI and pass
  for the wrong reason. Switched to assert_task_fail_eventually on the
  new run, which asserts the specific failure mode.

Remove sleep-as-barrier anti-pattern:
Drop ~10 asyncio.sleep(0.3-0.5) barriers after __pubsub_publish /
truncate signals. A subsequent query or update naturally waits for
prior signals to be processed by the worker, so the sleeps were both
redundant and brittle. Replace the while True: sleep(0.1) describe-
poll in the cross-namespace test with assert_eq_eventually.

Fix test_priority_flush to actually test priority:
The 0.5s sleep at the end of the publish_with_priority activity made
the test pass regardless — __aexit__ would always flush before the
10s external collect timeout elapsed. Extended the activity hold to
~10s and tightened the collect timeout to 5s so that a priority-
wakeup regression surfaces as a missing item instead of a pass via
exit-time flush. The hold is long enough that worker teardown
outraces activity completion, so tests still finish in sub-second
wall time.

Result: 30 → 25 tests, 1848 → ~1590 lines, all passing in 5s.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to the prior cleanup. The two remaining timing-based sleeps
are replaced with explicit coordination, and helper functions taking a
handle now carry proper type annotations.

test_iterator_cancellation: publish a seed item and wait for an
asyncio.Event set on first yield (bounded by asyncio.timeout), then
cancel. The iterator is provably active at cancel time, so the test
no longer races against an arbitrary sleep.

test_flush_raises_after_max_retry_duration: inject a controllable
clock via patch of temporalio.contrib.pubsub._client.time.monotonic.
Advance the clock between the failing flush and the retry check so
the timeout fires deterministically without depending on wall-clock
speed or clock resolution.

_is_different_run and collect_items now annotate their handle
parameters as WorkflowHandle[Any, Any] (WorkflowHandle is generic
over workflow class and return type; the helpers are polymorphic).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
No external truncate API exists — truncation is a workflow-internal
decision (retention policy, consumer progress), so external callers
must define their own signal or update that invokes truncate_pubsub.

- Expand the TruncateSignalWorkflow docstring to call out that it's
  test scaffolding and to point to the integration pattern.
- Note the workflow-side-only nature in the README table row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signals are fire-and-forget. The truncate tests relied on "a subsequent
update acts as a barrier for prior signals" — true but implicit. An
update handler returns only after it completes, making the contract
explicit and removing a class of reader confusion.

Rename TruncateSignalWorkflow → TruncateWorkflow, change truncate from
@workflow.signal to @workflow.update, and switch the three call sites
from handle.signal("truncate", ...) to handle.execute_update(...).
Drop stale barrier comments now that completion is intrinsic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Every other workflow in this file mixes PubSubMixin with a user-defined
close signal (and custom init/run), so coexistence is proven implicitly
by the full suite. The only unique claim here was that an app query
coexists with the mixin's __pubsub_offset query — a vanishingly small
risk given Temporal SDK registers handlers by explicit name and there
is no shared registry. If a future conflict did arise, dozens of tests
would fail, not just this one.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The prior version started two subscribe tasks via asyncio.gather and
asserted each received its expected items. That passes even if
subscriber A fully drains its items before subscriber B's first poll
goes out — the test never observed interleaving, only topic filtering
under parallel calls.

Reshape the test as a ping-pong: publish A-0, wait (via asyncio.Event)
for A to receive it; publish B-0, wait for B to receive it. At that
point both subscribers are mid-subscription and polling for item 2,
so both __pubsub_poll updates are in flight simultaneously. Repeat
for item 2. A sequential execution cannot satisfy the publish order
because B's first item isn't published until after A has received
its first.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three related test-quality changes after a Codex challenge pass.

Delete test_continue_as_new_any_typed_fails (and its workflow/input
classes). It exercised the default Temporal data converter behavior
(Any-typed dataclass field deserializes as dict) rather than a pubsub
concern, and relied on a weak assert_task_fail_eventually that would
pass for any task failure. Replace with a doc note on init_pubsub()
warning about Any-typed pubsub_state fields, keeping the guidance
where a user looks when wiring up CAN.

Strengthen test_continue_as_new_properly_typed. Previously only
verified log contents and offsets survived CAN. Now also verifies
publisher dedup state survives: seeds publisher_id="pub" sequence=1,
CANs, and asserts on publisher_sequences directly via a new query
handler. Three assertions — after CAN, after a duplicate publish, and
after a fresh-sequence publish — bracket the dedup contract without
inferring it from log length. Inline the previously-shared _run_can_test
helper since only one caller remained.

Widen TTL test margins from (0.3s sleep, 0.1s TTL) to (1.0s sleep,
0.5s TTL). The tighter margin left ~100ms headroom on each side for
pub-old to prune and pub-new to survive — borderline on slow CI where
worker scheduling between publish and query can itself exceed 100ms.
The new margins tolerate multi-hundred-ms scheduling jitter in both
directions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four sets of function-local imports had no technical justification —
no circular imports, no optional dependencies, no heavy-module
deferral benefit for a test file. They were editorial drift from
incremental additions. Move them to the top of the file:

- WorkflowUpdateFailedError (was local in truncate-error test)
- unittest.mock.patch (was duplicated in two retry tests)
- temporalio.api.nexus.v1, temporalio.api.operatorservice.v1
  (was local in create_cross_namespace_endpoint helper)
- google.protobuf.duration_pb2, temporalio.api.workflowservice.v1
  (was local in cross-namespace Nexus test)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
PubSubClient.__aexit__ could silently drop items on context-manager
exit. A single _flush() processes either pending OR buffer (if/elif),
so when the flusher task was cancelled mid-signal (pending set) while
the producer had added more items (buffer non-empty), the final flush
handled pending and left buffered items orphaned.

Real impact: agent streaming that publishes a last token and
immediately exits the context manager could silently drop trailing
tokens depending on timing. Fix by draining both in a loop until
pending and buffer are empty.

This bug was latent in test_max_batch_size because that test's activity
loop had no awaits — the flusher never ran during the loop, so pending
never accumulated concurrently with buffer. Strengthening the test
exposed it.

Test changes:

- test_max_batch_size: add an await asyncio.sleep(0) between publishes
  (matches real agent workloads that yield on every LLM token) and
  assert via publisher_sequences query that max_batch_size actually
  triggers >=2 mid-loop flushes, not a single exit flush. Without this
  the test passed even if max_batch_size were ignored entirely.

- test_replay_safety: assert the full ordered 7-item sequence and
  offsets rather than just endpoints. Endpoint-only checks would miss
  mid-stream replay corruption (reordering, duplication, drops).

- test_poll_truncated_offset_returns_application_error: add a comment
  explaining why pytest.raises(WorkflowUpdateFailedError) suffices to
  prove the handler raised ApplicationError — Temporal's update
  protocol completes with this error only for ApplicationError; other
  exceptions fail the workflow task instead, causing execute_update to
  hang rather than raise.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address a small set of stylistic issues flagged during review.

Fix stale docstring in PubSubState's PollResult: the field is
more_ready, not has_more. Readers following the docstring would have
looked for a non-existent attribute.

Add generic parameters to the WorkflowHandle annotation in
PubSubClient.__init__ (WorkflowHandle[Any, Any]). Matches the
treatment applied earlier in the tests; PubSubClient is polymorphic
over workflow types.

Rename the signal/update handler parameters in PubSubMixin from
`input` (which shadowed the builtin) to `payload`. The type names
(PublishInput, PollInput) already convey "input," so the parameter
name was redundant. Drop the now-unnecessary `# noqa: A002` on the
validator.

Clarify the PubSubClient.__init__ docstring about continue-as-new:
previously said "prefer create() when you need CAN following," now
explicitly notes that the direct-handle form does not follow CAN and
will stop yielding once the original run ends.

Run `ruff check --select I --fix` and `ruff format` to bring the
module and tests into line with project lint.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four changes responding to review comments on sdk-python PR #1423:

C1 (init_pubsub pattern). Docstrings, README, and DESIGN-v2.md now advise
a single call site from @workflow.init with prior_state threaded through
the workflow input, instead of the previous "call in __init__ for fresh,
in run() for CAN" split. The signature is unchanged (prior_state is still
optional and defaults to None) — the change is to the blessed pattern.

C2 (rename priority -> force_flush). PubSubClient.publish() renames the
kwarg to force_flush. The kwarg never implied ordering — it just forces
an immediate flush of the buffer — so the new name is accurate. Internal
test helpers, comments, and docs updated.

C3 (split create / from_activity). PubSubClient.create() now requires
explicit (client, workflow_id); the silent auto-detect path is gone. A
new PubSubClient.from_activity() classmethod pulls both from the current
activity context. This removes the failure mode where omitting args
outside an activity produced a confusing runtime error. Activity-side
test helpers migrated to from_activity().

C5 (truncation rationale). DESIGN-v2.md section 10 no longer describes
truncation as "deferred to a future iteration" — the feature is
implemented, and voice streaming workflows have shown it's needed in
practice. Because CAN is the standard pattern for long-running
workflows, workflow history size is not the binding constraint;
in-memory log growth between CAN boundaries is. The section now says so.

Tests pass (23/23, pytest tests/contrib/pubsub/).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Addresses PR #1423 review comment C4: expose Temporal Payload at the
PubSubItem / PublishEntry boundary so subscribers can decode via
subscribe(result_type=T), matching execute_update(result_type=...).

API changes:
- PubSubMixin.publish(topic, value): value is any payload-convertible
  object or a pre-built Payload (zero-copy).
- PubSubClient.publish(topic, value, force_flush=False): same shape;
  defers conversion to flush time, batching cost amortized.
- PubSubClient.subscribe(topics, *, result_type=None, ...): yields
  PubSubItem whose data is a Payload by default, or the decoded
  result_type when one is supplied.
- PubSubItem.data is now Any (Payload | decoded value).

Wire format and codec decisions:
- PublishEntry.data / _WireItem.data are
  base64(Payload.SerializeToString()). Nested Payload inside a
  dataclass fails with "Object of type Payload is not JSON
  serializable" because the default JSON converter only special-cases
  top-level Payloads on signal/update args. The base64-of-serialized-
  proto wire format keeps the JSON envelope while preserving
  Payload.metadata end-to-end. Round-trip is guarded by the new
  test_payload_roundtrip_prototype.py tests.
- Per-item encoding uses the SYNC payload converter (workflow.payload_
  converter() on the mixin, client.data_converter.payload_converter on
  the client). The codec chain (encryption, PII-redaction, compression)
  is NOT invoked per item — Temporal already runs the user's
  DataConverter.encode on the __pubsub_publish signal envelope and the
  __pubsub_poll update response, so running the codec per item as well
  would double-encrypt/compress (and compressing already-encrypted
  bytes defeats the codec). The per-item Payload still carries
  encoding metadata ("encoding: json/plain", "messageType: ...") which
  is what the subscribe(result_type=T) decode path actually needs.
- Workflow-side and client-side are now codec-symmetric; the
  previously-feared asymmetry does not exist.

Tests:
- Existing pubsub tests updated: collect_items takes the Client (needed
  to reach the payload converter), subscribe calls pass result_type=bytes
  where they compare against raw bytes.
- Added test_structured_type_round_trip: workflow publishes dataclass
  values, subscriber decodes via result_type= — exercises the primary
  value-add of the migration.
- Added test_payload_roundtrip_prototype.py as a regression guard for
  the wire-format choice: one test asserts nested Payload in a dataclass
  fails, another asserts base64(proto(Payload)) round-trips.

All 26 pubsub tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jssmith jssmith marked this pull request as ready for review April 23, 2026 07:08
@jssmith jssmith requested a review from a team as a code owner April 23, 2026 07:08
jssmith and others added 4 commits April 23, 2026 00:17
The bridge's Cargo.toml requires temporalio-client = "0.2.0" (set in
68561ee), but commit c4ec6e7 ("Update pubsub README: rename
for_workflow → create") inadvertently reverted the sdk-core submodule
pointer to f188eb53, a commit that still had the client crate at
0.1.0. This left uv/maturin unable to build the Rust bridge on this
branch: Cargo resolves the requirement against the vendored crate and
rejects 0.1.0 for the "^0.2.0" spec.

Restore the pointer to b544f95d — the commit origin/main uses with
the same Cargo.toml, so the bridge and its sdk-core workspace are
consistent again. No Python code changes; purely a submodule pointer
fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reconciles DESIGN-v2.md with the "Streaming API Design Considerations"
Notion page so both track the authoritative Python implementation. The
Notion page had richer narrative (durable-streams framing, pull-vs-push
reasoning, one-way-door callouts, offset-options comparison table,
alternatives-considered list for wire evolution, end-to-end-principle
writeup). This change brings that into the in-repo doc.

Changes:
- New top-of-doc note establishing that the Python code in
  sdk-python/temporalio/contrib/pubsub/ is authoritative; both
  DESIGN-v2.md and the Notion page track it.
- New Decision #1 "Durable streams" explaining the durable-by-default
  choice vs ephemeral streams (simpler model, reliability,
  correctness). Existing decisions renumbered.
- Decision #4 (Global offsets) gains the 6-option ecosystem
  comparison table and a one-way-door callout flagging the
  wire-protocol commitment.
- Decision #9 (Subscription is poll-based) expanded with the
  pull-vs-push trade-off (back-pressure, subscriber-controlled read
  position, data-at-rest) and explicit "both layers are exposed"
  framing.
- New "Design Principles" section with the Saltzer/Reed/Clark
  end-to-end-dedup framing and the "retries remain in the log"
  contract, with a one-way-door callout on the append-only-of-attempts
  contract.
- Compatibility section gains a full alternatives-considered list
  (version field, versioned handler names, protocol negotiation, SDK
  version embedding, accepting silent incompatibility) and a two-part
  one-way-door callout on immutable handler names + no version field.
- New "Ecosystem analogs" section: a compact one-paragraph summary
  (NATS JetStream for offsets, Kafka for idempotent producers, Redis
  for blocking pull, Workflow SDK as the durable-execution peer) with
  a pointer to the Notion page for the full comparison tables.

The Notion page itself is still behind on the Payload migration
(Decision #5 "Opaque message payloads" needs rewriting, API
signatures still show priority= and data: bytes). That update is
deferred pending resolution of an open reviewer discussion on
activity-retry/dedup (discussion 34a8fc56-7738-808c-b29b-001c5066e9d2)
whose substance overlaps with the Decision #5 rewrite.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts:
#	temporalio/contrib/google_adk_agents/_model.py
Follow-ups missed when the contrib/pubsub refactor renamed
PubSubClient.create(batch_interval=...) → PubSubClient.from_activity(...)
and publish(..., priority=True) → publish(..., force_flush=True).
Both plugin activities still called the old signatures and failed at
runtime with TypeError on the first publish.

Also update the streaming tests to pass result_type=bytes to
pubsub.subscribe(); after the bytes→Payload migration, item.data is a
raw Payload unless a result_type is specified, so json.loads(item.data)
was TypeErroring.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jssmith and others added 7 commits April 23, 2026 19:43
Users no longer inherit a mixin class. Instead, they construct
`PubSub(prior_state=...)` from `@workflow.init`; the constructor
registers the `__pubsub_publish` signal, `__pubsub_poll` update (with
validator), and `__pubsub_offset` query handlers dynamically via
`workflow.set_signal_handler`, `set_update_handler`, and
`set_query_handler`. The pub/sub wire contract (handler names, payload
shapes, offset semantics) is unchanged.

This matches how other-language SDKs will express the same pattern —
imperative handler registration from inside the workflow body rather
than inheritance — and lets the workflow retain its normal single base
class.

The constructor raises RuntimeError in two misuse cases:
  1. Called twice on the same workflow — detected via
     `workflow.get_signal_handler("__pubsub_publish") is not None`.
  2. Called from anywhere other than `__init__` — detected by
     inspecting the immediate caller's frame. History-length based
     detection was tried first but has two false positives (pre-start
     signals inflate first-task history length beyond 3, and cache
     eviction legitimately re-runs `__init__` with a higher current
     history length), so frame inspection is the correct mechanism.

Method renames on the broker (no longer needed as `_pubsub_*` prefixes
now that they live on a dedicated object):

  init_pubsub(prior_state=None)   -> PubSub(prior_state=None)
  self.publish(topic, value)      -> self.pubsub.publish(topic, value)
  self.get_pubsub_state(...)      -> self.pubsub.get_state(...)
  self.drain_pubsub()             -> self.pubsub.drain()
  self.truncate_pubsub(up_to)     -> self.pubsub.truncate(up_to)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fan-out: add a subsection under Design Decision 9 explaining that each
__pubsub_poll is an independent update RPC with no shared delivery, so
items destined for N subscribers cross the wire N times. Spells out the
three concurrent-subscriber shapes (same topic/offset, different
offsets, disjoint topics) and the rationale for the per-poll model.

Future Work: new top-level section with three items — shared workflow
fan-out (optimization of the above), workflow-defined filters and
transforms, and a safe workflow-side subscribe() API. Each entry names
the relevant design questions left open rather than prescribing an
implementation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The streaming activity previously maintained a normalization layer:
~50 lines of if/elif mapping OpenAI event types (response.output_text.delta,
response.reasoning_summary_*, etc.) to custom app event names (TEXT_DELTA,
THINKING_*, LLM_CALL_START/COMPLETE), plus text-delta accumulation into a
synthesized TEXT_COMPLETE, plus a function-call filter on output_item.added.

That normalization made sense when a shared UI consumed events from multiple
providers, but each provider-plugin should expose its native event stream and
let consumers render idiomatically. The activity now publishes each yielded
OpenAI event as its Pydantic JSON and returns the ModelResponse built from
the final ResponseCompletedEvent — three lines inside the stream loop.

Also factored out three helpers shared between the streaming and
non-streaming activities (both paths were duplicating them verbatim):
  _build_tools_and_handoffs — tool/handoff reconstruction from dataclass form
  _build_tool             — single tool-by-type dispatch
  _raise_for_openai_status — APIStatusError -> retry-posture translation

The local-activity guard in _temporal_model_stub.py gains a comment
explaining the two reasons streaming can't use local activities (no
heartbeat channel, no pubsub signal context from the activity).

Tests: replaced the normalized-event assertions with raw-event assertions;
dropped the rich-dispatcher coverage test since there's no dispatcher left
to cover. 115 passing / 16 skipped.

Downstream impact: consumers that depend on the normalized event names
(temporal-streaming-agents-samples frontend, shared-frontend hooks) need
to switch on raw OpenAI event types instead.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- ruff format: apply formatter to auto-generated style changes.
- pyright: replace dict literals for Response.text/usage with the
  pydantic model types (ResponseTextConfig, ResponseUsage,
  InputTokensDetails, OutputTokensDetails).
- basedpyright: suppress reportUnusedFunction on the private
  _encode_payload/_decode_payload helpers in pubsub._types (they are
  used from sibling modules, which basedpyright does not credit) and
  reportUnusedParameter on the CAN workflow run() input arg.
- pydocstyle: add docstrings to PubSubClient.__aenter__/__aexit__.
- typing.Self requires 3.11; import from typing_extensions like the
  rest of the SDK does.
- asyncio.timeout requires 3.11; fall back to async_timeout.timeout
  on 3.10 (async_timeout is an aiohttp transitive dep there).
On Python 3.10 CI, the `if sys.version_info >= (3, 11):` branch is
what basedpyright flags as unreachable. The ignore needs to be on
both branches so it is silent under every Python version in the
matrix.
The previous attempt placed the pragma on the indented `timeout as
_async_timeout` line, but basedpyright reports reportUnreachable
against the outer `from ... import (` line (the block-opening
statement), so the pragma had no effect. Move the ignore up to the
import line and combine with reportMissingImports there.

Locally verified clean on Python 3.10, 3.11, and 3.14 via
`uv run --python <ver> poe lint`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants