From 44ceff7de5056da870dff440c955349e9de9e78a Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Tue, 28 Apr 2026 21:10:03 -0700 Subject: [PATCH 1/8] Add workflow_streams samples: order_workflow scenario Initial samples directory for temporalio.contrib.workflow_streams, the workflow-hosted durable event stream contrib (experimental, contrib/pubsub branch of sdk-python). The order_workflow scenario covers the basic publisher path: a workflow binds a typed topic in @workflow.init, an activity publishes events via the topic handle, and a starter subscribes with WorkflowStreamClient and prints events as they arrive. Also enables the uv supply-chain cooldown options in the lockfile. --- README.md | 1 + workflow_stream/README.md | 51 +++++++++++ workflow_stream/__init__.py | 0 workflow_stream/activities/__init__.py | 0 .../activities/payment_activity.py | 41 +++++++++ workflow_stream/run_publisher.py | 56 ++++++++++++ workflow_stream/run_worker.py | 27 ++++++ workflow_stream/shared.py | 87 +++++++++++++++++++ workflow_stream/workflows/__init__.py | 0 workflow_stream/workflows/order_workflow.py | 60 +++++++++++++ 10 files changed, 323 insertions(+) create mode 100644 workflow_stream/README.md create mode 100644 workflow_stream/__init__.py create mode 100644 workflow_stream/activities/__init__.py create mode 100644 workflow_stream/activities/payment_activity.py create mode 100644 workflow_stream/run_publisher.py create mode 100644 workflow_stream/run_worker.py create mode 100644 workflow_stream/shared.py create mode 100644 workflow_stream/workflows/__init__.py create mode 100644 workflow_stream/workflows/order_workflow.py diff --git a/README.md b/README.md index d4d6a61b..5a5c937f 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [prometheus](prometheus) - Configure Prometheus metrics on clients/workers. +* [workflow_stream](workflow_stream) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_stream`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.** * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. diff --git a/workflow_stream/README.md b/workflow_stream/README.md new file mode 100644 index 00000000..3a57a8d7 --- /dev/null +++ b/workflow_stream/README.md @@ -0,0 +1,51 @@ +# Workflow Streams + +> **Experimental.** These samples target the +> `temporalio.contrib.workflow_stream` module on the +> [`contrib/pubsub` branch of sdk-python][branch], which is not yet +> released. To run them locally, install sdk-python from that branch +> (e.g. `uv pip install -e ` after checking out the +> branch). + +[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub + +`temporalio.contrib.workflow_stream` lets a workflow host a durable, +offset-addressed event channel. The workflow holds an append-only log; +external clients (activities, starters, BFFs) publish to topics via +signals and subscribe via long-poll updates. This packages the +boilerplate — batching, offset tracking, topic filtering, continue-as-new +hand-off — into a reusable stream. + +This directory has a minimal end-to-end example: + +* `workflows/order_workflow.py` — a workflow that hosts a + `WorkflowStream` and publishes status events as it processes an order. +* `activities/payment_activity.py` — an activity that publishes + intermediate progress to the stream via + `WorkflowStreamClient.from_activity()`. +* `run_worker.py` — registers the workflow and activity. +* `run_publisher.py` — starts the workflow, then prints subscribed + events as they arrive. + +## Run it + +```bash +# Terminal 1: worker +uv run workflow_stream/run_worker.py + +# Terminal 2: starter + subscriber +uv run workflow_stream/run_publisher.py +``` + +Expected output on the publisher side, with events streaming in as the +workflow progresses: + +``` +[status] received: order=order-1 +[progress] charging card... +[progress] card charged +[status] shipped: order=order-1 +[progress] charge id: charge-order-1 +[status] complete: order=order-1 +workflow result: charge-order-1 +``` diff --git a/workflow_stream/__init__.py b/workflow_stream/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_stream/activities/__init__.py b/workflow_stream/activities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_stream/activities/payment_activity.py b/workflow_stream/activities/payment_activity.py new file mode 100644 index 00000000..f69f8c8d --- /dev/null +++ b/workflow_stream/activities/payment_activity.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio import activity +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import TOPIC_PROGRESS, ProgressEvent + + +@activity.defn +async def charge_card(order_id: str) -> str: + """Pretend to charge a card, publishing progress to the parent workflow. + + `WorkflowStreamClient.from_activity()` reads the parent workflow id + and the Temporal client from the activity context, so this activity + can push events back without any wiring. + + Caveat: each call to ``from_activity()`` creates a fresh client with + a random ``publisher_id``, so dedup does not protect against an + activity retry republishing the same events. For activities that + must be exactly-once on the stream side, derive a stable + ``publisher_id`` from ``activity.info().activity_id`` (this is + invariant across attempts of the same scheduled activity). The + current ``WorkflowStreamClient`` API does not yet expose + ``publisher_id`` on its constructors; this sample accepts + at-most-once-per-attempt semantics. + """ + client = WorkflowStreamClient.from_activity( + batch_interval=timedelta(milliseconds=200) + ) + async with client: + client.publish(TOPIC_PROGRESS, ProgressEvent(message="charging card...")) + await asyncio.sleep(1.0) + client.publish( + TOPIC_PROGRESS, + ProgressEvent(message="card charged"), + force_flush=True, + ) + return f"charge-{order_id}" diff --git a/workflow_stream/run_publisher.py b/workflow_stream/run_publisher.py new file mode 100644 index 00000000..cdfb210d --- /dev/null +++ b/workflow_stream/run_publisher.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.api.common.v1 import Payload +from temporalio.client import Client +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import ( + TASK_QUEUE, + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, + race_with_workflow, +) +from workflow_stream.workflows.order_workflow import OrderWorkflow + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-order-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + OrderWorkflow.run, + OrderInput(order_id="order-1"), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + converter = client.data_converter.payload_converter + + async def consume() -> None: + # Single iterator over both topics — avoids a cancellation race + # between two concurrent subscribers. result_type is left unset + # so we can dispatch heterogeneous events on item.topic. + async for item in stream.subscribe([TOPIC_STATUS, TOPIC_PROGRESS]): + assert isinstance(item.data, Payload) + if item.topic == TOPIC_STATUS: + evt = converter.from_payload(item.data, StatusEvent) + print(f"[status] {evt.kind}: order={evt.order_id}") + if evt.kind == "complete": + return + elif item.topic == TOPIC_PROGRESS: + progress = converter.from_payload(item.data, ProgressEvent) + print(f"[progress] {progress.message}") + + result = await race_with_workflow(consume(), handle) + print(f"workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/run_worker.py b/workflow_stream/run_worker.py new file mode 100644 index 00000000..8fef1ab9 --- /dev/null +++ b/workflow_stream/run_worker.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from workflow_stream.activities.payment_activity import charge_card +from workflow_stream.shared import TASK_QUEUE +from workflow_stream.workflows.order_workflow import OrderWorkflow + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[OrderWorkflow], + activities=[charge_card], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/shared.py b/workflow_stream/shared.py new file mode 100644 index 00000000..ca1368c1 --- /dev/null +++ b/workflow_stream/shared.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Coroutine +from dataclasses import dataclass +from typing import Any, TypeVar + +from temporalio.client import WorkflowHandle +from temporalio.contrib.workflow_stream import WorkflowStreamState + +TASK_QUEUE = "workflow-stream-sample-task-queue" + +# Topics published by the workflow / activity. +TOPIC_STATUS = "status" +TOPIC_PROGRESS = "progress" + + +@dataclass +class OrderInput: + order_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StatusEvent: + kind: str + order_id: str + + +@dataclass +class ProgressEvent: + message: str + + +T = TypeVar("T") + + +async def race_with_workflow( + consumer: Coroutine[Any, Any, None], + handle: WorkflowHandle[Any, T], +) -> T: + """Run a subscriber concurrently with the workflow. + + If the workflow finishes before the subscriber sees its terminal + event, cancel the subscriber and surface the workflow's result + (raising on failure). If the subscriber finishes first, wait for + the workflow result. A non-cancellation failure in the subscriber + is propagated either way. + + Without this, a workflow that raises before publishing its terminal + event would leave the subscriber blocked on its next poll forever. + """ + consumer_task = asyncio.create_task(consumer) + result_task = asyncio.create_task(handle.result()) + we_cancelled_consumer = False + try: + await asyncio.wait( + [consumer_task, result_task], + return_when=asyncio.FIRST_COMPLETED, + ) + if not consumer_task.done(): + consumer_task.cancel() + we_cancelled_consumer = True + # gather(return_exceptions=True) drains both tasks. Only + # cancellation we initiated is expected — anything else + # propagates. + consumer_outcome, workflow_outcome = await asyncio.gather( + consumer_task, result_task, return_exceptions=True + ) + if isinstance(consumer_outcome, asyncio.CancelledError): + if not we_cancelled_consumer: + raise consumer_outcome + elif isinstance(consumer_outcome, BaseException): + raise consumer_outcome + if isinstance(workflow_outcome, BaseException): + raise workflow_outcome + return workflow_outcome + finally: + for task in (consumer_task, result_task): + if not task.done(): + task.cancel() + for task in (consumer_task, result_task): + try: + await task + except BaseException: + pass diff --git a/workflow_stream/workflows/__init__.py b/workflow_stream/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_stream/workflows/order_workflow.py b/workflow_stream/workflows/order_workflow.py new file mode 100644 index 00000000..4b4f4a82 --- /dev/null +++ b/workflow_stream/workflows/order_workflow.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream + +from workflow_stream.shared import ( + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, +) + +with workflow.unsafe.imports_passed_through(): + from workflow_stream.activities.payment_activity import charge_card + + +@workflow.defn +class OrderWorkflow: + """Process a fake order, publishing status and progress events. + + The workflow itself publishes status changes; an activity it runs + publishes finer-grained progress events using a + `WorkflowStreamClient`. A single stream carries both topics — + subscribers can filter on the topic(s) they care about. + """ + + @workflow.init + def __init__(self, input: OrderInput) -> None: + # Construct the stream from @workflow.init so it can register + # signal/update/query handlers before the workflow accepts any + # messages. Threading prior_state lets the workflow survive + # continue-as-new without losing buffered items. + self.stream = WorkflowStream(prior_state=input.stream_state) + + @workflow.run + async def run(self, input: OrderInput) -> str: + self.stream.publish( + TOPIC_STATUS, StatusEvent(kind="received", order_id=input.order_id) + ) + + charge_id = await workflow.execute_activity( + charge_card, + input.order_id, + start_to_close_timeout=timedelta(seconds=30), + ) + + self.stream.publish( + TOPIC_STATUS, StatusEvent(kind="shipped", order_id=input.order_id) + ) + self.stream.publish( + TOPIC_PROGRESS, + ProgressEvent(message=f"charge id: {charge_id}"), + ) + self.stream.publish( + TOPIC_STATUS, StatusEvent(kind="complete", order_id=input.order_id) + ) + return charge_id From faac49f9585a468330409e16e92c88886a966ba2 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 10:07:33 -0700 Subject: [PATCH 2/8] samples: workflow_stream: add reconnecting-subscriber scenario MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a second scenario demonstrating the central Workflow Streams use case: a consumer disconnects mid-stream and resumes later via subscribe(from_offset=...), with no events lost or duplicated. The existing OrderWorkflow finishes too quickly to make the pattern visible, so this introduces a multi-stage PipelineWorkflow paced with workflow.sleep between stages. The runner reads a couple of events, persists item.offset + 1 to a temp file, sleeps "disconnected" while the workflow keeps publishing, then opens a fresh Client + WorkflowStreamClient and resumes from the persisted offset — the same shape that works across actual process restarts. Co-Authored-By: Claude Opus 4.7 (1M context) --- workflow_stream/README.md | 48 ++++++-- .../run_reconnecting_subscriber.py | 107 ++++++++++++++++++ workflow_stream/run_worker.py | 3 +- workflow_stream/shared.py | 12 ++ .../workflows/pipeline_workflow.py | 43 +++++++ 5 files changed, 205 insertions(+), 8 deletions(-) create mode 100644 workflow_stream/run_reconnecting_subscriber.py create mode 100644 workflow_stream/workflows/pipeline_workflow.py diff --git a/workflow_stream/README.md b/workflow_stream/README.md index 3a57a8d7..dd4fcf49 100644 --- a/workflow_stream/README.md +++ b/workflow_stream/README.md @@ -16,16 +16,31 @@ signals and subscribe via long-poll updates. This packages the boilerplate — batching, offset tracking, topic filtering, continue-as-new hand-off — into a reusable stream. -This directory has a minimal end-to-end example: +This directory has two scenarios sharing one Worker. + +**Scenario 1 — basic publish/subscribe with heterogeneous topics:** * `workflows/order_workflow.py` — a workflow that hosts a `WorkflowStream` and publishes status events as it processes an order. * `activities/payment_activity.py` — an activity that publishes intermediate progress to the stream via `WorkflowStreamClient.from_activity()`. -* `run_worker.py` — registers the workflow and activity. -* `run_publisher.py` — starts the workflow, then prints subscribed - events as they arrive. +* `run_publisher.py` — starts the workflow, subscribes to both topics, + decodes each by `item.topic`, and prints events as they arrive. + +**Scenario 2 — reconnecting subscriber:** + +* `workflows/pipeline_workflow.py` — a multi-stage pipeline that + publishes stage transitions over ~10 seconds, leaving room for a + consumer to disconnect and reconnect mid-run. +* `run_reconnecting_subscriber.py` — connects, reads a couple of + events, persists `item.offset + 1` to disk, "disconnects," then + reopens a fresh client and resumes via `subscribe(from_offset=...)`. + This is the central Workflow Streams use case: a consumer can + disappear (page refresh, server restart, laptop closed) and resume + later without missing events or seeing duplicates. + +`run_worker.py` registers both workflows and the activity. ## Run it @@ -33,12 +48,13 @@ This directory has a minimal end-to-end example: # Terminal 1: worker uv run workflow_stream/run_worker.py -# Terminal 2: starter + subscriber +# Terminal 2: pick a scenario uv run workflow_stream/run_publisher.py +# or +uv run workflow_stream/run_reconnecting_subscriber.py ``` -Expected output on the publisher side, with events streaming in as the -workflow progresses: +Expected output on the basic publisher side: ``` [status] received: order=order-1 @@ -49,3 +65,21 @@ workflow progresses: [status] complete: order=order-1 workflow result: charge-order-1 ``` + +Expected output on the reconnecting subscriber side (note the offsets +are continuous across the disconnect — no events lost, none duplicated): + +``` +[phase 1] connecting and reading first few events + offset= 0 stage=validating + offset= 1 stage=loading data +[phase 1] persisted resume offset=2 -> /tmp/...; disconnecting + +[phase 2] reconnecting and resuming from persisted offset + offset= 2 stage=transforming + offset= 3 stage=writing output + offset= 4 stage=verifying + offset= 5 stage=complete + +workflow result: pipeline workflow-stream-pipeline-... done +``` diff --git a/workflow_stream/run_reconnecting_subscriber.py b/workflow_stream/run_reconnecting_subscriber.py new file mode 100644 index 00000000..3a5eee11 --- /dev/null +++ b/workflow_stream/run_reconnecting_subscriber.py @@ -0,0 +1,107 @@ +"""Reconnecting subscriber: persist offset, disconnect, resume. + +Demonstrates the central Workflow Streams use case: a consumer can +disappear mid-stream — page refresh, server restart, laptop closed — +and resume later without missing events or seeing duplicates. The +event log lives in the Workflow, so the consumer just remembers where +it stopped. + +The script runs the pattern in two phases inside one process to keep +the demo short. The same code shape works across actual process +restarts because the resume offset is persisted to disk between phases. + +Run the worker first (``uv run workflow_stream/run_worker.py``), then:: + + uv run workflow_stream/run_reconnecting_subscriber.py +""" + +from __future__ import annotations + +import asyncio +import tempfile +import uuid +from pathlib import Path + +from temporalio.client import Client +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import ( + TASK_QUEUE, + TOPIC_STATUS, + PipelineInput, + StageEvent, +) +from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow + +# Number of events read in phase 1 before simulating a disconnect. +# Picked small enough that the workflow is still running after. +PHASE_1_EVENTS = 2 + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-pipeline-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + PipelineWorkflow.run, + PipelineInput(pipeline_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + # Where the consumer remembers its position. In a real BFF or UI + # backend this would be a database row keyed by (user_id, run_id); + # a temp file keeps the sample self-contained. + offset_path = Path(tempfile.gettempdir()) / f"{workflow_id}.offset" + + # ---- Phase 1: connect, read a couple of events, persist offset, disconnect. + print("[phase 1] connecting and reading first few events") + stream = WorkflowStreamClient.create(client, workflow_id) + seen = 0 + next_offset = 0 + async for item in stream.subscribe([TOPIC_STATUS], result_type=StageEvent): + print(f" offset={item.offset:2d} stage={item.data.stage}") + # Persist *one past* the offset just consumed. On resume we want + # the *next* unseen event, not the one we already showed. + next_offset = item.offset + 1 + offset_path.write_text(str(next_offset)) + seen += 1 + if seen >= PHASE_1_EVENTS: + break + + print( + f"[phase 1] persisted resume offset={next_offset} -> {offset_path}; disconnecting\n" + ) + # The async for loop exits the subscribe() iterator. Any background + # poll Update is cancelled. The workflow keeps running in the + # background, accumulating events into its log. + await asyncio.sleep(3) # let the workflow publish more in our absence + + # ---- Phase 2: reconnect, read persisted offset, resume from there. + print("[phase 2] reconnecting and resuming from persisted offset") + resume_from = int(offset_path.read_text()) + # A brand-new client and stream object — same shape as a different + # process picking up where the first one left off. + client2 = await Client.connect("localhost:7233") + stream2 = WorkflowStreamClient.create(client2, workflow_id) + async for item in stream2.subscribe( + [TOPIC_STATUS], + from_offset=resume_from, + result_type=StageEvent, + ): + print(f" offset={item.offset:2d} stage={item.data.stage}") + # Continue persisting after each event so a second crash here + # would also resume cleanly. + offset_path.write_text(str(item.offset + 1)) + if item.data.stage == "complete": + break + + result = await handle.result() + print(f"\nworkflow result: {result}") + # Clean up the offset file; in a real consumer you'd retain it as + # long as the user might reconnect. + offset_path.unlink(missing_ok=True) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/run_worker.py b/workflow_stream/run_worker.py index 8fef1ab9..4b9a4ed5 100644 --- a/workflow_stream/run_worker.py +++ b/workflow_stream/run_worker.py @@ -9,6 +9,7 @@ from workflow_stream.activities.payment_activity import charge_card from workflow_stream.shared import TASK_QUEUE from workflow_stream.workflows.order_workflow import OrderWorkflow +from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow async def main() -> None: @@ -17,7 +18,7 @@ async def main() -> None: worker = Worker( client, task_queue=TASK_QUEUE, - workflows=[OrderWorkflow], + workflows=[OrderWorkflow, PipelineWorkflow], activities=[charge_card], ) await worker.run() diff --git a/workflow_stream/shared.py b/workflow_stream/shared.py index ca1368c1..652c8fa5 100644 --- a/workflow_stream/shared.py +++ b/workflow_stream/shared.py @@ -33,6 +33,18 @@ class ProgressEvent: message: str +@dataclass +class PipelineInput: + pipeline_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StageEvent: + stage: str + + T = TypeVar("T") diff --git a/workflow_stream/workflows/pipeline_workflow.py b/workflow_stream/workflows/pipeline_workflow.py new file mode 100644 index 00000000..5f53c1bf --- /dev/null +++ b/workflow_stream/workflows/pipeline_workflow.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream + +from workflow_stream.shared import ( + TOPIC_STATUS, + PipelineInput, + StageEvent, +) + + +@workflow.defn +class PipelineWorkflow: + """Multi-stage pipeline that publishes stage transitions over time. + + Stages are spaced out with ``workflow.sleep`` so a subscriber can + realistically disconnect partway through and reconnect without the + pipeline finishing in the meantime — the shape needed to demo the + "show up late and still see what happened" pattern. + """ + + @workflow.init + def __init__(self, input: PipelineInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + + @workflow.run + async def run(self, input: PipelineInput) -> str: + stages = [ + "validating", + "loading data", + "transforming", + "writing output", + "verifying", + "complete", + ] + for stage in stages: + self.stream.publish(TOPIC_STATUS, StageEvent(stage=stage)) + if stage != "complete": + await workflow.sleep(timedelta(seconds=2)) + return f"pipeline {input.pipeline_id} done" From b607117f788fac2a0f1224d8f800f92a5a47ab29 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 10:10:10 -0700 Subject: [PATCH 3/8] samples: workflow_stream: add external-publisher scenario MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a third scenario covering the third publisher shape: a backend service or scheduled job pushing events into a workflow it didn't itself start. The earlier scenarios publish either from inside the workflow or from one of its activities; this one uses WorkflowStreamClient.create() externally. HubWorkflow is a passive stream host — it does no work of its own and just waits to be told to close, fitting the event-bus pattern. The runner publishes a series of news headlines, runs a subscriber task alongside, signals close, and exits when both tasks complete. Co-Authored-By: Claude Opus 4.7 (1M context) --- workflow_stream/README.md | 18 ++++- workflow_stream/run_external_publisher.py | 91 +++++++++++++++++++++++ workflow_stream/run_worker.py | 3 +- workflow_stream/shared.py | 13 ++++ workflow_stream/workflows/hub_workflow.py | 36 +++++++++ 5 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 workflow_stream/run_external_publisher.py create mode 100644 workflow_stream/workflows/hub_workflow.py diff --git a/workflow_stream/README.md b/workflow_stream/README.md index dd4fcf49..7b43ce08 100644 --- a/workflow_stream/README.md +++ b/workflow_stream/README.md @@ -40,7 +40,21 @@ This directory has two scenarios sharing one Worker. disappear (page refresh, server restart, laptop closed) and resume later without missing events or seeing duplicates. -`run_worker.py` registers both workflows and the activity. +**Scenario 3 — external (non-Activity) publisher:** + +* `workflows/hub_workflow.py` — a passive workflow that does no work + of its own; it exists only to host a `WorkflowStream` and shut down + when signaled. +* `run_external_publisher.py` — starts the hub, then publishes events + into it from a plain Python coroutine using + `WorkflowStreamClient.create(client, workflow_id)`. A subscriber + task runs alongside; when the publisher is done it signals + `HubWorkflow.close`, the workflow's run finishes, and the + subscriber's iterator exits normally. This is the shape that fits a + backend service or scheduled job pushing events into a workflow it + didn't itself start. + +`run_worker.py` registers all three workflows and the activity. ## Run it @@ -52,6 +66,8 @@ uv run workflow_stream/run_worker.py uv run workflow_stream/run_publisher.py # or uv run workflow_stream/run_reconnecting_subscriber.py +# or +uv run workflow_stream/run_external_publisher.py ``` Expected output on the basic publisher side: diff --git a/workflow_stream/run_external_publisher.py b/workflow_stream/run_external_publisher.py new file mode 100644 index 00000000..5ef7e27e --- /dev/null +++ b/workflow_stream/run_external_publisher.py @@ -0,0 +1,91 @@ +"""External publisher: a non-Activity process pushes events into a workflow. + +The two earlier scenarios publish from inside the workflow itself +(``OrderWorkflow``, ``PipelineWorkflow``) or from an Activity it runs +(``charge_card``). This scenario shows the third shape: a backend +service, scheduled job, or anything else with a Temporal ``Client`` +publishing into a *running* workflow it didn't start. Same factory as +the subscribe path — :py:meth:`WorkflowStreamClient.create` — used for +publishing instead. + +The script starts a ``HubWorkflow`` (which does no work of its own — +it exists only to host the stream), then runs a publisher and a +subscriber concurrently. When the publisher is done it signals +``HubWorkflow.close``, the workflow's run finishes, and the +subscriber's iterator exits normally. + +Run the worker first (``uv run workflow_stream/run_worker.py``), then:: + + uv run workflow_stream/run_external_publisher.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import ( + TASK_QUEUE, + TOPIC_NEWS, + HubInput, + NewsEvent, +) +from workflow_stream.workflows.hub_workflow import HubWorkflow + + +HEADLINES = [ + "rates held", + "merger announced", + "outage resolved", + "earnings beat", + "regulator opens probe", +] + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-hub-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + HubWorkflow.run, + HubInput(hub_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + async def publish_news() -> None: + # WorkflowStreamClient.create takes a Temporal client and a + # workflow id — the same factory used elsewhere for subscribing. + # The async context manager batches publishes and flushes on + # exit; we additionally call flush() before signaling close so + # we know the events landed before the workflow shuts down. + producer = WorkflowStreamClient.create(client, workflow_id) + async with producer: + for headline in HEADLINES: + producer.publish(TOPIC_NEWS, NewsEvent(headline=headline)) + print(f"[publisher] sent: {headline}") + await asyncio.sleep(0.5) + await producer.flush() + # Tell the hub it can stop. The workflow's run() returns, and + # any in-flight subscribers see their async-for loop exit. + await handle.signal(HubWorkflow.close) + print("[publisher] signaled close") + + async def consume_news() -> None: + consumer = WorkflowStreamClient.create(client, workflow_id) + async for item in consumer.subscribe( + [TOPIC_NEWS], result_type=NewsEvent + ): + print(f"[subscriber] offset={item.offset}: {item.data.headline}") + + await asyncio.gather(publish_news(), consume_news()) + + result = await handle.result() + print(f"\nworkflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/run_worker.py b/workflow_stream/run_worker.py index 4b9a4ed5..b118c22f 100644 --- a/workflow_stream/run_worker.py +++ b/workflow_stream/run_worker.py @@ -8,6 +8,7 @@ from workflow_stream.activities.payment_activity import charge_card from workflow_stream.shared import TASK_QUEUE +from workflow_stream.workflows.hub_workflow import HubWorkflow from workflow_stream.workflows.order_workflow import OrderWorkflow from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow @@ -18,7 +19,7 @@ async def main() -> None: worker = Worker( client, task_queue=TASK_QUEUE, - workflows=[OrderWorkflow, PipelineWorkflow], + workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow], activities=[charge_card], ) await worker.run() diff --git a/workflow_stream/shared.py b/workflow_stream/shared.py index 652c8fa5..42e94015 100644 --- a/workflow_stream/shared.py +++ b/workflow_stream/shared.py @@ -13,6 +13,7 @@ # Topics published by the workflow / activity. TOPIC_STATUS = "status" TOPIC_PROGRESS = "progress" +TOPIC_NEWS = "news" @dataclass @@ -45,6 +46,18 @@ class StageEvent: stage: str +@dataclass +class HubInput: + hub_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class NewsEvent: + headline: str + + T = TypeVar("T") diff --git a/workflow_stream/workflows/hub_workflow.py b/workflow_stream/workflows/hub_workflow.py new file mode 100644 index 00000000..eb686963 --- /dev/null +++ b/workflow_stream/workflows/hub_workflow.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream + +from workflow_stream.shared import HubInput + + +@workflow.defn +class HubWorkflow: + """Passive stream host: starts up, waits, closes when told. + + Unlike OrderWorkflow or PipelineWorkflow, this workflow does no + work of its own — it exists only to host a ``WorkflowStream`` that + external publishers push events into and external subscribers read + from. The shape that fits a backend service or "event bus" pattern, + where the workflow owns durable state but the events come from + outside. + """ + + @workflow.init + def __init__(self, input: HubInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self._closed = False + + @workflow.run + async def run(self, input: HubInput) -> str: + await workflow.wait_condition(lambda: self._closed) + return f"hub {input.hub_id} closed" + + @workflow.signal + def close(self) -> None: + # Custom signal handler that does not read stream state, so the + # synchronous-handler race documented in the README does not + # apply. + self._closed = True From 91233b0dbb4b84599b09b9c3f02aeab5d834d78e Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 10:12:56 -0700 Subject: [PATCH 4/8] samples: workflow_stream: add truncating-ticker scenario MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a fourth scenario for long-running workflows that need to bound their event log: the workflow publishes events at a fixed cadence and calls self.stream.truncate(...) periodically to keep only the most recent entries. The runner subscribes twice — fast and slow — to make the trade visible: the fast subscriber sees every offset in order; the slow one falls behind a truncation, has its iterator transparently jump forward to the new base offset, and shows the offset gap that intermediate events fell into. This is the model for high-volume long-running streams: bounded log size, slow consumers may miss intermediate events but always see the most recent state. Co-Authored-By: Claude Opus 4.7 (1M context) --- workflow_stream/README.md | 17 +++- workflow_stream/run_truncating_ticker.py | 83 ++++++++++++++++++++ workflow_stream/run_worker.py | 3 +- workflow_stream/shared.py | 16 ++++ workflow_stream/workflows/ticker_workflow.py | 59 ++++++++++++++ 5 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 workflow_stream/run_truncating_ticker.py create mode 100644 workflow_stream/workflows/ticker_workflow.py diff --git a/workflow_stream/README.md b/workflow_stream/README.md index 7b43ce08..a452e24b 100644 --- a/workflow_stream/README.md +++ b/workflow_stream/README.md @@ -54,7 +54,20 @@ This directory has two scenarios sharing one Worker. backend service or scheduled job pushing events into a workflow it didn't itself start. -`run_worker.py` registers all three workflows and the activity. +**Scenario 4 — bounded log via `truncate()`:** + +* `workflows/ticker_workflow.py` — a long-running workflow that + publishes events at a fixed cadence and calls + `self.stream.truncate(...)` periodically to bound log growth, keeping + only the most recent N entries. +* `run_truncating_ticker.py` — runs a fast subscriber and a slow + subscriber side by side. The fast one keeps up and sees every offset + in order; the slow one sleeps between iterations, falls behind a + truncation, and silently jumps forward to the new base offset. The + output makes the trade visible: bounded log size in exchange for + intermediate events being invisible to slow consumers. + +`run_worker.py` registers all four workflows and the activity. ## Run it @@ -68,6 +81,8 @@ uv run workflow_stream/run_publisher.py uv run workflow_stream/run_reconnecting_subscriber.py # or uv run workflow_stream/run_external_publisher.py +# or +uv run workflow_stream/run_truncating_ticker.py ``` Expected output on the basic publisher side: diff --git a/workflow_stream/run_truncating_ticker.py b/workflow_stream/run_truncating_ticker.py new file mode 100644 index 00000000..069ab4e4 --- /dev/null +++ b/workflow_stream/run_truncating_ticker.py @@ -0,0 +1,83 @@ +"""Truncating ticker: bounded log + slow vs. fast subscribers. + +The ``TickerWorkflow`` publishes ``count`` events at a fixed interval, +calling ``self.stream.truncate(...)`` periodically to bound log +growth. This script subscribes twice — once fast, once slow — and +prints both side-by-side so the trade is visible: + +* The fast subscriber keeps up and sees every published offset in + order. +* The slow subscriber sleeps between iterations. When a truncation + runs past its position, the iterator silently jumps forward to the + new base offset — the slow subscriber's offsets jump too, and + intermediate events are not visible to it. + +This is the bounded-log model: log size is capped, slow consumers may +miss intermediate events, but they always see the most recent state. +For long-running workflows pushing high event volumes this is usually +the right trade — pair with set-semantic events where each event +carries enough state to make missing the prior ones recoverable. + +Run the worker first (``uv run workflow_stream/run_worker.py``), then:: + + uv run workflow_stream/run_truncating_ticker.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_stream import WorkflowStreamClient + +from workflow_stream.shared import ( + TASK_QUEUE, + TOPIC_TICK, + TickerInput, + TickEvent, +) +from workflow_stream.workflows.ticker_workflow import TickerWorkflow + + +SLOW_SUBSCRIBER_DELAY_S = 1.5 + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-ticker-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + TickerWorkflow.run, + TickerInput( + count=20, + keep_last=3, + truncate_every=5, + interval_ms=400, + ), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + + async def fast_subscriber() -> None: + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + print(f"[fast] offset={item.offset:3d} n={item.data.n}") + + async def slow_subscriber() -> None: + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + print(f"[SLOW] offset={item.offset:3d} n={item.data.n}") + await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S) + + # Both iterators exit normally when the workflow completes. No + # terminal sentinel is needed — see the doc's "When the Workflow + # run completes" note. + await asyncio.gather(fast_subscriber(), slow_subscriber()) + + result = await handle.result() + print(f"\nworkflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_stream/run_worker.py b/workflow_stream/run_worker.py index b118c22f..9e21982e 100644 --- a/workflow_stream/run_worker.py +++ b/workflow_stream/run_worker.py @@ -11,6 +11,7 @@ from workflow_stream.workflows.hub_workflow import HubWorkflow from workflow_stream.workflows.order_workflow import OrderWorkflow from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow +from workflow_stream.workflows.ticker_workflow import TickerWorkflow async def main() -> None: @@ -19,7 +20,7 @@ async def main() -> None: worker = Worker( client, task_queue=TASK_QUEUE, - workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow], + workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow, TickerWorkflow], activities=[charge_card], ) await worker.run() diff --git a/workflow_stream/shared.py b/workflow_stream/shared.py index 42e94015..fd97a6a8 100644 --- a/workflow_stream/shared.py +++ b/workflow_stream/shared.py @@ -14,6 +14,7 @@ TOPIC_STATUS = "status" TOPIC_PROGRESS = "progress" TOPIC_NEWS = "news" +TOPIC_TICK = "tick" @dataclass @@ -58,6 +59,21 @@ class NewsEvent: headline: str +@dataclass +class TickerInput: + count: int = 20 + keep_last: int = 3 + truncate_every: int = 5 + interval_ms: int = 400 + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class TickEvent: + n: int + + T = TypeVar("T") diff --git a/workflow_stream/workflows/ticker_workflow.py b/workflow_stream/workflows/ticker_workflow.py new file mode 100644 index 00000000..61f895a2 --- /dev/null +++ b/workflow_stream/workflows/ticker_workflow.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_stream import WorkflowStream + +from workflow_stream.shared import ( + TOPIC_TICK, + TickEvent, + TickerInput, +) + + +@workflow.defn +class TickerWorkflow: + """Long-running ticker that bounds its event log via ``truncate``. + + Long-running workflows that publish high volumes of events would + otherwise grow their event log unboundedly. This workflow shows + the truncation pattern: every ``truncate_every`` events, drop + everything except the last ``keep_last`` entries by calling + ``self.stream.truncate(safe_offset)``. + + Subscribers that fall behind a truncation jump forward to the new + base offset transparently (the iterator handles the + ``TruncatedOffset`` error internally), so consumers stay live but + may not see every intermediate event. That is the trade: bounded + log size in exchange for at-best-effort delivery to slow + consumers. + + To compute the truncation offset the workflow tracks its own + published count. ``WorkflowStream`` does not expose a workflow-side + head-offset accessor, but the running count plus the carried + ``base_offset`` (in continue-as-new chains) is sufficient. + """ + + @workflow.init + def __init__(self, input: TickerInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + # Running count of events published by THIS run. To compute a + # global offset, add the prior_state's base_offset (omitted + # here — this sample doesn't continue-as-new). + self._published = 0 + + @workflow.run + async def run(self, input: TickerInput) -> str: + for n in range(input.count): + self.stream.publish(TOPIC_TICK, TickEvent(n=n)) + self._published += 1 + await workflow.sleep(timedelta(milliseconds=input.interval_ms)) + if ( + self._published % input.truncate_every == 0 + and self._published > input.keep_last + ): + # Drop everything except the last `keep_last` entries. + truncate_to = self._published - input.keep_last + self.stream.truncate(truncate_to) + return f"ticker emitted {self._published} events" From 78062b4dbdef2988a97fec9a057f19b28915319b Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 17:29:13 -0700 Subject: [PATCH 5/8] =?UTF-8?q?samples:=20rename=20workflow=5Fstream=20?= =?UTF-8?q?=E2=86=92=20workflow=5Fstreams;=20migrate=20to=20topic=20handle?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Directory and module path renamed to plural to match sdk-python `temporalio.contrib.workflow_streams` rename. - Workflow-side: bind a typed topic handle in `@workflow.init` and call `topic.publish(value)` — the removed `WorkflowStream.publish` form is gone. Same change applied to the activity and external-publisher. - Activity: `WorkflowStreamClient.from_activity()` → `from_within_activity()`. Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 2 +- .../activities/payment_activity.py | 41 ------------------- .../README.md | 16 ++++---- .../__init__.py | 0 .../activities/__init__.py | 0 .../activities/payment_activity.py | 41 +++++++++++++++++++ .../run_external_publisher.py | 13 +++--- .../run_publisher.py | 6 +-- .../run_reconnecting_subscriber.py | 10 ++--- .../run_truncating_ticker.py | 10 ++--- .../run_worker.py | 12 +++--- .../shared.py | 2 +- .../workflows/__init__.py | 0 .../workflows/hub_workflow.py | 4 +- .../workflows/order_workflow.py | 25 ++++------- .../workflows/pipeline_workflow.py | 7 ++-- .../workflows/ticker_workflow.py | 7 ++-- 17 files changed, 96 insertions(+), 100 deletions(-) delete mode 100644 workflow_stream/activities/payment_activity.py rename {workflow_stream => workflow_streams}/README.md (91%) rename {workflow_stream => workflow_streams}/__init__.py (100%) rename {workflow_stream => workflow_streams}/activities/__init__.py (100%) create mode 100644 workflow_streams/activities/payment_activity.py rename {workflow_stream => workflow_streams}/run_external_publisher.py (86%) rename {workflow_stream => workflow_streams}/run_publisher.py (90%) rename {workflow_stream => workflow_streams}/run_reconnecting_subscriber.py (92%) rename {workflow_stream => workflow_streams}/run_truncating_ticker.py (89%) rename {workflow_stream => workflow_streams}/run_worker.py (57%) rename {workflow_stream => workflow_streams}/shared.py (98%) rename {workflow_stream => workflow_streams}/workflows/__init__.py (100%) rename {workflow_stream => workflow_streams}/workflows/hub_workflow.py (91%) rename {workflow_stream => workflow_streams}/workflows/order_workflow.py (66%) rename {workflow_stream => workflow_streams}/workflows/pipeline_workflow.py (83%) rename {workflow_stream => workflow_streams}/workflows/ticker_workflow.py (91%) diff --git a/README.md b/README.md index 5a5c937f..80cda649 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [prometheus](prometheus) - Configure Prometheus metrics on clients/workers. -* [workflow_stream](workflow_stream) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_stream`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.** +* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.** * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. diff --git a/workflow_stream/activities/payment_activity.py b/workflow_stream/activities/payment_activity.py deleted file mode 100644 index f69f8c8d..00000000 --- a/workflow_stream/activities/payment_activity.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import annotations - -import asyncio -from datetime import timedelta - -from temporalio import activity -from temporalio.contrib.workflow_stream import WorkflowStreamClient - -from workflow_stream.shared import TOPIC_PROGRESS, ProgressEvent - - -@activity.defn -async def charge_card(order_id: str) -> str: - """Pretend to charge a card, publishing progress to the parent workflow. - - `WorkflowStreamClient.from_activity()` reads the parent workflow id - and the Temporal client from the activity context, so this activity - can push events back without any wiring. - - Caveat: each call to ``from_activity()`` creates a fresh client with - a random ``publisher_id``, so dedup does not protect against an - activity retry republishing the same events. For activities that - must be exactly-once on the stream side, derive a stable - ``publisher_id`` from ``activity.info().activity_id`` (this is - invariant across attempts of the same scheduled activity). The - current ``WorkflowStreamClient`` API does not yet expose - ``publisher_id`` on its constructors; this sample accepts - at-most-once-per-attempt semantics. - """ - client = WorkflowStreamClient.from_activity( - batch_interval=timedelta(milliseconds=200) - ) - async with client: - client.publish(TOPIC_PROGRESS, ProgressEvent(message="charging card...")) - await asyncio.sleep(1.0) - client.publish( - TOPIC_PROGRESS, - ProgressEvent(message="card charged"), - force_flush=True, - ) - return f"charge-{order_id}" diff --git a/workflow_stream/README.md b/workflow_streams/README.md similarity index 91% rename from workflow_stream/README.md rename to workflow_streams/README.md index a452e24b..1d6f167c 100644 --- a/workflow_stream/README.md +++ b/workflow_streams/README.md @@ -1,7 +1,7 @@ # Workflow Streams > **Experimental.** These samples target the -> `temporalio.contrib.workflow_stream` module on the +> `temporalio.contrib.workflow_streams` module on the > [`contrib/pubsub` branch of sdk-python][branch], which is not yet > released. To run them locally, install sdk-python from that branch > (e.g. `uv pip install -e ` after checking out the @@ -9,7 +9,7 @@ [branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub -`temporalio.contrib.workflow_stream` lets a workflow host a durable, +`temporalio.contrib.workflow_streams` lets a workflow host a durable, offset-addressed event channel. The workflow holds an append-only log; external clients (activities, starters, BFFs) publish to topics via signals and subscribe via long-poll updates. This packages the @@ -24,7 +24,7 @@ This directory has two scenarios sharing one Worker. `WorkflowStream` and publishes status events as it processes an order. * `activities/payment_activity.py` — an activity that publishes intermediate progress to the stream via - `WorkflowStreamClient.from_activity()`. + `WorkflowStreamClient.from_within_activity()`. * `run_publisher.py` — starts the workflow, subscribes to both topics, decodes each by `item.topic`, and prints events as they arrive. @@ -73,16 +73,16 @@ This directory has two scenarios sharing one Worker. ```bash # Terminal 1: worker -uv run workflow_stream/run_worker.py +uv run workflow_streams/run_worker.py # Terminal 2: pick a scenario -uv run workflow_stream/run_publisher.py +uv run workflow_streams/run_publisher.py # or -uv run workflow_stream/run_reconnecting_subscriber.py +uv run workflow_streams/run_reconnecting_subscriber.py # or -uv run workflow_stream/run_external_publisher.py +uv run workflow_streams/run_external_publisher.py # or -uv run workflow_stream/run_truncating_ticker.py +uv run workflow_streams/run_truncating_ticker.py ``` Expected output on the basic publisher side: diff --git a/workflow_stream/__init__.py b/workflow_streams/__init__.py similarity index 100% rename from workflow_stream/__init__.py rename to workflow_streams/__init__.py diff --git a/workflow_stream/activities/__init__.py b/workflow_streams/activities/__init__.py similarity index 100% rename from workflow_stream/activities/__init__.py rename to workflow_streams/activities/__init__.py diff --git a/workflow_streams/activities/payment_activity.py b/workflow_streams/activities/payment_activity.py new file mode 100644 index 00000000..d94a071b --- /dev/null +++ b/workflow_streams/activities/payment_activity.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio import activity +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent + + +@activity.defn +async def charge_card(order_id: str) -> str: + """Pretend to charge a card, publishing progress to the parent workflow. + + `WorkflowStreamClient.from_within_activity()` reads the parent + workflow id and the Temporal client from the activity context, so + this activity can push events back without any wiring. + + Caveat: each call to ``from_within_activity()`` creates a fresh + client with a random ``publisher_id``, so dedup does not protect + against an activity retry republishing the same events. For + activities that must be exactly-once on the stream side, derive a + stable ``publisher_id`` from ``activity.info().activity_id`` (this + is invariant across attempts of the same scheduled activity). The + current ``WorkflowStreamClient`` API does not yet expose + ``publisher_id`` on its constructors; this sample accepts + at-most-once-per-attempt semantics. + """ + client = WorkflowStreamClient.from_within_activity( + batch_interval=timedelta(milliseconds=200) + ) + async with client: + progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent) + progress.publish(ProgressEvent(message="charging card...")) + await asyncio.sleep(1.0) + progress.publish( + ProgressEvent(message="card charged"), + force_flush=True, + ) + return f"charge-{order_id}" diff --git a/workflow_stream/run_external_publisher.py b/workflow_streams/run_external_publisher.py similarity index 86% rename from workflow_stream/run_external_publisher.py rename to workflow_streams/run_external_publisher.py index 5ef7e27e..1663ed31 100644 --- a/workflow_stream/run_external_publisher.py +++ b/workflow_streams/run_external_publisher.py @@ -14,9 +14,9 @@ ``HubWorkflow.close``, the workflow's run finishes, and the subscriber's iterator exits normally. -Run the worker first (``uv run workflow_stream/run_worker.py``), then:: +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: - uv run workflow_stream/run_external_publisher.py + uv run workflow_streams/run_external_publisher.py """ from __future__ import annotations @@ -25,15 +25,15 @@ import uuid from temporalio.client import Client -from temporalio.contrib.workflow_stream import WorkflowStreamClient +from temporalio.contrib.workflow_streams import WorkflowStreamClient -from workflow_stream.shared import ( +from workflow_streams.shared import ( TASK_QUEUE, TOPIC_NEWS, HubInput, NewsEvent, ) -from workflow_stream.workflows.hub_workflow import HubWorkflow +from workflow_streams.workflows.hub_workflow import HubWorkflow HEADLINES = [ @@ -64,8 +64,9 @@ async def publish_news() -> None: # we know the events landed before the workflow shuts down. producer = WorkflowStreamClient.create(client, workflow_id) async with producer: + news = producer.topic(TOPIC_NEWS, type=NewsEvent) for headline in HEADLINES: - producer.publish(TOPIC_NEWS, NewsEvent(headline=headline)) + news.publish(NewsEvent(headline=headline)) print(f"[publisher] sent: {headline}") await asyncio.sleep(0.5) await producer.flush() diff --git a/workflow_stream/run_publisher.py b/workflow_streams/run_publisher.py similarity index 90% rename from workflow_stream/run_publisher.py rename to workflow_streams/run_publisher.py index cdfb210d..85c967ee 100644 --- a/workflow_stream/run_publisher.py +++ b/workflow_streams/run_publisher.py @@ -5,9 +5,9 @@ from temporalio.api.common.v1 import Payload from temporalio.client import Client -from temporalio.contrib.workflow_stream import WorkflowStreamClient +from temporalio.contrib.workflow_streams import WorkflowStreamClient -from workflow_stream.shared import ( +from workflow_streams.shared import ( TASK_QUEUE, TOPIC_PROGRESS, TOPIC_STATUS, @@ -16,7 +16,7 @@ StatusEvent, race_with_workflow, ) -from workflow_stream.workflows.order_workflow import OrderWorkflow +from workflow_streams.workflows.order_workflow import OrderWorkflow async def main() -> None: diff --git a/workflow_stream/run_reconnecting_subscriber.py b/workflow_streams/run_reconnecting_subscriber.py similarity index 92% rename from workflow_stream/run_reconnecting_subscriber.py rename to workflow_streams/run_reconnecting_subscriber.py index 3a5eee11..3aae76c6 100644 --- a/workflow_stream/run_reconnecting_subscriber.py +++ b/workflow_streams/run_reconnecting_subscriber.py @@ -10,9 +10,9 @@ the demo short. The same code shape works across actual process restarts because the resume offset is persisted to disk between phases. -Run the worker first (``uv run workflow_stream/run_worker.py``), then:: +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: - uv run workflow_stream/run_reconnecting_subscriber.py + uv run workflow_streams/run_reconnecting_subscriber.py """ from __future__ import annotations @@ -23,15 +23,15 @@ from pathlib import Path from temporalio.client import Client -from temporalio.contrib.workflow_stream import WorkflowStreamClient +from temporalio.contrib.workflow_streams import WorkflowStreamClient -from workflow_stream.shared import ( +from workflow_streams.shared import ( TASK_QUEUE, TOPIC_STATUS, PipelineInput, StageEvent, ) -from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow # Number of events read in phase 1 before simulating a disconnect. # Picked small enough that the workflow is still running after. diff --git a/workflow_stream/run_truncating_ticker.py b/workflow_streams/run_truncating_ticker.py similarity index 89% rename from workflow_stream/run_truncating_ticker.py rename to workflow_streams/run_truncating_ticker.py index 069ab4e4..50876a0d 100644 --- a/workflow_stream/run_truncating_ticker.py +++ b/workflow_streams/run_truncating_ticker.py @@ -18,9 +18,9 @@ the right trade — pair with set-semantic events where each event carries enough state to make missing the prior ones recoverable. -Run the worker first (``uv run workflow_stream/run_worker.py``), then:: +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: - uv run workflow_stream/run_truncating_ticker.py + uv run workflow_streams/run_truncating_ticker.py """ from __future__ import annotations @@ -29,15 +29,15 @@ import uuid from temporalio.client import Client -from temporalio.contrib.workflow_stream import WorkflowStreamClient +from temporalio.contrib.workflow_streams import WorkflowStreamClient -from workflow_stream.shared import ( +from workflow_streams.shared import ( TASK_QUEUE, TOPIC_TICK, TickerInput, TickEvent, ) -from workflow_stream.workflows.ticker_workflow import TickerWorkflow +from workflow_streams.workflows.ticker_workflow import TickerWorkflow SLOW_SUBSCRIBER_DELAY_S = 1.5 diff --git a/workflow_stream/run_worker.py b/workflow_streams/run_worker.py similarity index 57% rename from workflow_stream/run_worker.py rename to workflow_streams/run_worker.py index 9e21982e..8aa12edc 100644 --- a/workflow_stream/run_worker.py +++ b/workflow_streams/run_worker.py @@ -6,12 +6,12 @@ from temporalio.client import Client from temporalio.worker import Worker -from workflow_stream.activities.payment_activity import charge_card -from workflow_stream.shared import TASK_QUEUE -from workflow_stream.workflows.hub_workflow import HubWorkflow -from workflow_stream.workflows.order_workflow import OrderWorkflow -from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow -from workflow_stream.workflows.ticker_workflow import TickerWorkflow +from workflow_streams.activities.payment_activity import charge_card +from workflow_streams.shared import TASK_QUEUE +from workflow_streams.workflows.hub_workflow import HubWorkflow +from workflow_streams.workflows.order_workflow import OrderWorkflow +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow +from workflow_streams.workflows.ticker_workflow import TickerWorkflow async def main() -> None: diff --git a/workflow_stream/shared.py b/workflow_streams/shared.py similarity index 98% rename from workflow_stream/shared.py rename to workflow_streams/shared.py index fd97a6a8..746ee73d 100644 --- a/workflow_stream/shared.py +++ b/workflow_streams/shared.py @@ -6,7 +6,7 @@ from typing import Any, TypeVar from temporalio.client import WorkflowHandle -from temporalio.contrib.workflow_stream import WorkflowStreamState +from temporalio.contrib.workflow_streams import WorkflowStreamState TASK_QUEUE = "workflow-stream-sample-task-queue" diff --git a/workflow_stream/workflows/__init__.py b/workflow_streams/workflows/__init__.py similarity index 100% rename from workflow_stream/workflows/__init__.py rename to workflow_streams/workflows/__init__.py diff --git a/workflow_stream/workflows/hub_workflow.py b/workflow_streams/workflows/hub_workflow.py similarity index 91% rename from workflow_stream/workflows/hub_workflow.py rename to workflow_streams/workflows/hub_workflow.py index eb686963..fdf7da56 100644 --- a/workflow_stream/workflows/hub_workflow.py +++ b/workflow_streams/workflows/hub_workflow.py @@ -1,9 +1,9 @@ from __future__ import annotations from temporalio import workflow -from temporalio.contrib.workflow_stream import WorkflowStream +from temporalio.contrib.workflow_streams import WorkflowStream -from workflow_stream.shared import HubInput +from workflow_streams.shared import HubInput @workflow.defn diff --git a/workflow_stream/workflows/order_workflow.py b/workflow_streams/workflows/order_workflow.py similarity index 66% rename from workflow_stream/workflows/order_workflow.py rename to workflow_streams/workflows/order_workflow.py index 4b4f4a82..8b944508 100644 --- a/workflow_stream/workflows/order_workflow.py +++ b/workflow_streams/workflows/order_workflow.py @@ -3,9 +3,9 @@ from datetime import timedelta from temporalio import workflow -from temporalio.contrib.workflow_stream import WorkflowStream +from temporalio.contrib.workflow_streams import WorkflowStream -from workflow_stream.shared import ( +from workflow_streams.shared import ( TOPIC_PROGRESS, TOPIC_STATUS, OrderInput, @@ -14,7 +14,7 @@ ) with workflow.unsafe.imports_passed_through(): - from workflow_stream.activities.payment_activity import charge_card + from workflow_streams.activities.payment_activity import charge_card @workflow.defn @@ -34,12 +34,12 @@ def __init__(self, input: OrderInput) -> None: # messages. Threading prior_state lets the workflow survive # continue-as-new without losing buffered items. self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StatusEvent) + self.progress = self.stream.topic(TOPIC_PROGRESS, type=ProgressEvent) @workflow.run async def run(self, input: OrderInput) -> str: - self.stream.publish( - TOPIC_STATUS, StatusEvent(kind="received", order_id=input.order_id) - ) + self.status.publish(StatusEvent(kind="received", order_id=input.order_id)) charge_id = await workflow.execute_activity( charge_card, @@ -47,14 +47,7 @@ async def run(self, input: OrderInput) -> str: start_to_close_timeout=timedelta(seconds=30), ) - self.stream.publish( - TOPIC_STATUS, StatusEvent(kind="shipped", order_id=input.order_id) - ) - self.stream.publish( - TOPIC_PROGRESS, - ProgressEvent(message=f"charge id: {charge_id}"), - ) - self.stream.publish( - TOPIC_STATUS, StatusEvent(kind="complete", order_id=input.order_id) - ) + self.status.publish(StatusEvent(kind="shipped", order_id=input.order_id)) + self.progress.publish(ProgressEvent(message=f"charge id: {charge_id}")) + self.status.publish(StatusEvent(kind="complete", order_id=input.order_id)) return charge_id diff --git a/workflow_stream/workflows/pipeline_workflow.py b/workflow_streams/workflows/pipeline_workflow.py similarity index 83% rename from workflow_stream/workflows/pipeline_workflow.py rename to workflow_streams/workflows/pipeline_workflow.py index 5f53c1bf..a2d96d95 100644 --- a/workflow_stream/workflows/pipeline_workflow.py +++ b/workflow_streams/workflows/pipeline_workflow.py @@ -3,9 +3,9 @@ from datetime import timedelta from temporalio import workflow -from temporalio.contrib.workflow_stream import WorkflowStream +from temporalio.contrib.workflow_streams import WorkflowStream -from workflow_stream.shared import ( +from workflow_streams.shared import ( TOPIC_STATUS, PipelineInput, StageEvent, @@ -25,6 +25,7 @@ class PipelineWorkflow: @workflow.init def __init__(self, input: PipelineInput) -> None: self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StageEvent) @workflow.run async def run(self, input: PipelineInput) -> str: @@ -37,7 +38,7 @@ async def run(self, input: PipelineInput) -> str: "complete", ] for stage in stages: - self.stream.publish(TOPIC_STATUS, StageEvent(stage=stage)) + self.status.publish(StageEvent(stage=stage)) if stage != "complete": await workflow.sleep(timedelta(seconds=2)) return f"pipeline {input.pipeline_id} done" diff --git a/workflow_stream/workflows/ticker_workflow.py b/workflow_streams/workflows/ticker_workflow.py similarity index 91% rename from workflow_stream/workflows/ticker_workflow.py rename to workflow_streams/workflows/ticker_workflow.py index 61f895a2..e11616b4 100644 --- a/workflow_stream/workflows/ticker_workflow.py +++ b/workflow_streams/workflows/ticker_workflow.py @@ -3,9 +3,9 @@ from datetime import timedelta from temporalio import workflow -from temporalio.contrib.workflow_stream import WorkflowStream +from temporalio.contrib.workflow_streams import WorkflowStream -from workflow_stream.shared import ( +from workflow_streams.shared import ( TOPIC_TICK, TickEvent, TickerInput, @@ -38,6 +38,7 @@ class TickerWorkflow: @workflow.init def __init__(self, input: TickerInput) -> None: self.stream = WorkflowStream(prior_state=input.stream_state) + self.tick = self.stream.topic(TOPIC_TICK, type=TickEvent) # Running count of events published by THIS run. To compute a # global offset, add the prior_state's base_offset (omitted # here — this sample doesn't continue-as-new). @@ -46,7 +47,7 @@ def __init__(self, input: TickerInput) -> None: @workflow.run async def run(self, input: TickerInput) -> str: for n in range(input.count): - self.stream.publish(TOPIC_TICK, TickEvent(n=n)) + self.tick.publish(TickEvent(n=n)) self._published += 1 await workflow.sleep(timedelta(milliseconds=input.interval_ms)) if ( From 5d67b9ec73e34ac34b2f44feb53ee4b243a01585 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Wed, 29 Apr 2026 20:49:45 -0700 Subject: [PATCH 6/8] samples: workflow_streams review polish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - README: fix scenario count (two -> four), document subscriber start position and continue-as-new semantics for stream_state - hub_workflow: drop stale comment referencing a README race note that does not exist in this sample - payment_activity: trim long publisher_id/dedup caveat — moved out of the first sample's docstring to keep it approachable --- workflow_streams/README.md | 19 ++++++++++++++++++- .../activities/payment_activity.py | 10 ---------- workflow_streams/workflows/hub_workflow.py | 3 --- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/workflow_streams/README.md b/workflow_streams/README.md index 1d6f167c..ef983740 100644 --- a/workflow_streams/README.md +++ b/workflow_streams/README.md @@ -16,7 +16,7 @@ signals and subscribe via long-poll updates. This packages the boilerplate — batching, offset tracking, topic filtering, continue-as-new hand-off — into a reusable stream. -This directory has two scenarios sharing one Worker. +This directory has four scenarios sharing one Worker. **Scenario 1 — basic publish/subscribe with heterogeneous topics:** @@ -114,3 +114,20 @@ are continuous across the disconnect — no events lost, none duplicated): workflow result: pipeline workflow-stream-pipeline-... done ``` + +## Notes + +* **Subscriber start position.** `subscribe(...)` without `from_offset` + starts at the stream's current base offset and follows live — older + events that have been truncated, or that arrived before the + subscribe call, are not replayed. Pass `from_offset=N` to resume + from a known position (see `run_reconnecting_subscriber.py`); the + iterator skips forward to the current base if `N` has been + truncated. +* **Continue-as-new.** Every `*Input` dataclass carries + `stream_state: WorkflowStreamState | None = None`. To survive + continue-as-new without losing buffered items, capture the workflow's + stream state and pass it to the next run via + `WorkflowStream(prior_state=...)` in `@workflow.init`. The samples + declare the field for completeness; none of them actually trigger + continue-as-new. diff --git a/workflow_streams/activities/payment_activity.py b/workflow_streams/activities/payment_activity.py index d94a071b..2ccd708b 100644 --- a/workflow_streams/activities/payment_activity.py +++ b/workflow_streams/activities/payment_activity.py @@ -16,16 +16,6 @@ async def charge_card(order_id: str) -> str: `WorkflowStreamClient.from_within_activity()` reads the parent workflow id and the Temporal client from the activity context, so this activity can push events back without any wiring. - - Caveat: each call to ``from_within_activity()`` creates a fresh - client with a random ``publisher_id``, so dedup does not protect - against an activity retry republishing the same events. For - activities that must be exactly-once on the stream side, derive a - stable ``publisher_id`` from ``activity.info().activity_id`` (this - is invariant across attempts of the same scheduled activity). The - current ``WorkflowStreamClient`` API does not yet expose - ``publisher_id`` on its constructors; this sample accepts - at-most-once-per-attempt semantics. """ client = WorkflowStreamClient.from_within_activity( batch_interval=timedelta(milliseconds=200) diff --git a/workflow_streams/workflows/hub_workflow.py b/workflow_streams/workflows/hub_workflow.py index fdf7da56..1903b20a 100644 --- a/workflow_streams/workflows/hub_workflow.py +++ b/workflow_streams/workflows/hub_workflow.py @@ -30,7 +30,4 @@ async def run(self, input: HubInput) -> str: @workflow.signal def close(self) -> None: - # Custom signal handler that does not read stream state, so the - # synchronous-handler race documented in the README does not - # apply. self._closed = True From 62946911b388de3fa4b98efa72ef1d6757d7c316 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Thu, 30 Apr 2026 07:09:04 +0000 Subject: [PATCH 7/8] workflow_streams: deliver terminal events + fix run_publisher subscribe shape MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit End-to-end runs of the four workflow_streams scenarios surfaced two sample-side issues, both fixed here. run_publisher's consumer asserted ``isinstance(item.data, Payload)`` and called ``payload_converter.from_payload(item.data, T)``. The contrib's ``subscribe()`` defaults to converter-decoded data, not raw payloads, so this assertion fired on the first run. Switch to ``result_type=RawValue`` (the documented escape hatch for heterogeneous topics) and read ``item.data.payload``. Items published in the same workflow task that returns from ``@workflow.run`` were not delivered to subscribers — the in-memory log dies with the workflow and the next subscriber poll lands on a completed workflow. Fix: each scenario now uses an in-band terminator that subscribers break on, and each workflow holds the run open with ``await workflow.sleep(timedelta(milliseconds=500))`` so that final publish is fetched before the workflow exits: - OrderWorkflow / PipelineWorkflow: the workflow's own ``StatusEvent(kind="complete")`` / ``StageEvent(stage="complete")`` is the terminator (consumers already broke on it). - HubWorkflow: the *publisher* in run_external_publisher emits a sentinel ``NewsEvent(headline="__done__")`` immediately before signaling close; the consumer breaks on the sentinel. - TickerWorkflow: the final tick (n == count - 1) is the terminator; ``keep_last`` guarantees that offset survives the last truncation, so even slow consumers reach it. Because subscribers stop polling on the terminator, by the time ``workflow.run`` returns there are no in-flight poll handlers — no ``UnfinishedUpdateHandlersWarning`` from the SDK and no need for ``detach_pollers()`` / ``wait_condition(all_handlers_finished)`` in the workflow exit path. Two consecutive end-to-end runs of all four scenarios pass cleanly against ``temporal server start-dev --headless``. --- workflow_streams/run_external_publisher.py | 14 ++++++++++++-- workflow_streams/run_publisher.py | 16 +++++++++------- workflow_streams/run_truncating_ticker.py | 14 ++++++++++---- workflow_streams/workflows/hub_workflow.py | 7 +++++++ workflow_streams/workflows/order_workflow.py | 5 +++++ workflow_streams/workflows/pipeline_workflow.py | 4 ++++ workflow_streams/workflows/ticker_workflow.py | 6 ++++++ 7 files changed, 53 insertions(+), 13 deletions(-) diff --git a/workflow_streams/run_external_publisher.py b/workflow_streams/run_external_publisher.py index 1663ed31..bf7d98e6 100644 --- a/workflow_streams/run_external_publisher.py +++ b/workflow_streams/run_external_publisher.py @@ -44,6 +44,13 @@ "regulator opens probe", ] +# In-band terminator the publisher emits before signaling close. The +# subscriber recognizes this value and stops polling — without an +# explicit terminator the consumer would have to rely on the workflow +# returning to break the iterator, which means racing the last item +# delivery against workflow completion. +DONE_HEADLINE = "__done__" + async def main() -> None: client = await Client.connect("localhost:7233") @@ -69,9 +76,10 @@ async def publish_news() -> None: news.publish(NewsEvent(headline=headline)) print(f"[publisher] sent: {headline}") await asyncio.sleep(0.5) + news.publish(NewsEvent(headline=DONE_HEADLINE), force_flush=True) await producer.flush() - # Tell the hub it can stop. The workflow's run() returns, and - # any in-flight subscribers see their async-for loop exit. + # Tell the hub it can stop. The subscriber has already broken + # out of its async-for loop on the sentinel above. await handle.signal(HubWorkflow.close) print("[publisher] signaled close") @@ -80,6 +88,8 @@ async def consume_news() -> None: async for item in consumer.subscribe( [TOPIC_NEWS], result_type=NewsEvent ): + if item.data.headline == DONE_HEADLINE: + return print(f"[subscriber] offset={item.offset}: {item.data.headline}") await asyncio.gather(publish_news(), consume_news()) diff --git a/workflow_streams/run_publisher.py b/workflow_streams/run_publisher.py index 85c967ee..2e5ddb8d 100644 --- a/workflow_streams/run_publisher.py +++ b/workflow_streams/run_publisher.py @@ -3,8 +3,8 @@ import asyncio import uuid -from temporalio.api.common.v1 import Payload from temporalio.client import Client +from temporalio.common import RawValue from temporalio.contrib.workflow_streams import WorkflowStreamClient from workflow_streams.shared import ( @@ -35,17 +35,19 @@ async def main() -> None: async def consume() -> None: # Single iterator over both topics — avoids a cancellation race - # between two concurrent subscribers. result_type is left unset - # so we can dispatch heterogeneous events on item.topic. - async for item in stream.subscribe([TOPIC_STATUS, TOPIC_PROGRESS]): - assert isinstance(item.data, Payload) + # between two concurrent subscribers. result_type=RawValue + # delivers the underlying Payload so we can dispatch + # heterogeneous events on item.topic. + async for item in stream.subscribe( + [TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue + ): if item.topic == TOPIC_STATUS: - evt = converter.from_payload(item.data, StatusEvent) + evt = converter.from_payload(item.data.payload, StatusEvent) print(f"[status] {evt.kind}: order={evt.order_id}") if evt.kind == "complete": return elif item.topic == TOPIC_PROGRESS: - progress = converter.from_payload(item.data, ProgressEvent) + progress = converter.from_payload(item.data.payload, ProgressEvent) print(f"[progress] {progress.message}") result = await race_with_workflow(consume(), handle) diff --git a/workflow_streams/run_truncating_ticker.py b/workflow_streams/run_truncating_ticker.py index 50876a0d..65f8740e 100644 --- a/workflow_streams/run_truncating_ticker.py +++ b/workflow_streams/run_truncating_ticker.py @@ -41,6 +41,7 @@ SLOW_SUBSCRIBER_DELAY_S = 1.5 +TICKER_COUNT = 20 async def main() -> None: @@ -50,7 +51,7 @@ async def main() -> None: handle = await client.start_workflow( TickerWorkflow.run, TickerInput( - count=20, + count=TICKER_COUNT, keep_last=3, truncate_every=5, interval_ms=400, @@ -60,19 +61,24 @@ async def main() -> None: ) stream = WorkflowStreamClient.create(client, workflow_id) + last_n = TICKER_COUNT - 1 + # Both subscribers break on the final tick (n == last_n). ``keep_last`` + # ensures that offset survives the last truncation so even the slow + # consumer reaches it. async def fast_subscriber() -> None: async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): print(f"[fast] offset={item.offset:3d} n={item.data.n}") + if item.data.n == last_n: + return async def slow_subscriber() -> None: async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): print(f"[SLOW] offset={item.offset:3d} n={item.data.n}") + if item.data.n == last_n: + return await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S) - # Both iterators exit normally when the workflow completes. No - # terminal sentinel is needed — see the doc's "When the Workflow - # run completes" note. await asyncio.gather(fast_subscriber(), slow_subscriber()) result = await handle.result() diff --git a/workflow_streams/workflows/hub_workflow.py b/workflow_streams/workflows/hub_workflow.py index 1903b20a..5dcc3c5f 100644 --- a/workflow_streams/workflows/hub_workflow.py +++ b/workflow_streams/workflows/hub_workflow.py @@ -1,5 +1,7 @@ from __future__ import annotations +from datetime import timedelta + from temporalio import workflow from temporalio.contrib.workflow_streams import WorkflowStream @@ -26,6 +28,11 @@ def __init__(self, input: HubInput) -> None: @workflow.run async def run(self, input: HubInput) -> str: await workflow.wait_condition(lambda: self._closed) + # The publisher publishes its own terminator into the stream + # before signaling close (see run_external_publisher.py). + # Hold the run open briefly so subscribers' final poll + # delivers any items still in the log. + await workflow.sleep(timedelta(milliseconds=500)) return f"hub {input.hub_id} closed" @workflow.signal diff --git a/workflow_streams/workflows/order_workflow.py b/workflow_streams/workflows/order_workflow.py index 8b944508..099634cd 100644 --- a/workflow_streams/workflows/order_workflow.py +++ b/workflow_streams/workflows/order_workflow.py @@ -50,4 +50,9 @@ async def run(self, input: OrderInput) -> str: self.status.publish(StatusEvent(kind="shipped", order_id=input.order_id)) self.progress.publish(ProgressEvent(message=f"charge id: {charge_id}")) self.status.publish(StatusEvent(kind="complete", order_id=input.order_id)) + # The "complete" status event above is the in-band terminator + # subscribers break on (see run_publisher.py). Hold the run + # open briefly so subscribers' next poll delivers it before + # this task returns and the in-memory log is gone. + await workflow.sleep(timedelta(milliseconds=500)) return charge_id diff --git a/workflow_streams/workflows/pipeline_workflow.py b/workflow_streams/workflows/pipeline_workflow.py index a2d96d95..83336905 100644 --- a/workflow_streams/workflows/pipeline_workflow.py +++ b/workflow_streams/workflows/pipeline_workflow.py @@ -41,4 +41,8 @@ async def run(self, input: PipelineInput) -> str: self.status.publish(StageEvent(stage=stage)) if stage != "complete": await workflow.sleep(timedelta(seconds=2)) + # The "complete" stage above is the in-band terminator + # subscribers break on. Hold the run open briefly so the final + # poll delivers it. + await workflow.sleep(timedelta(milliseconds=500)) return f"pipeline {input.pipeline_id} done" diff --git a/workflow_streams/workflows/ticker_workflow.py b/workflow_streams/workflows/ticker_workflow.py index e11616b4..566b98f1 100644 --- a/workflow_streams/workflows/ticker_workflow.py +++ b/workflow_streams/workflows/ticker_workflow.py @@ -57,4 +57,10 @@ async def run(self, input: TickerInput) -> str: # Drop everything except the last `keep_last` entries. truncate_to = self._published - input.keep_last self.stream.truncate(truncate_to) + # The final tick (n == count - 1) is the in-band terminator + # subscribers break on. ``keep_last`` guarantees that final + # offset survives the last truncation so even slow consumers + # eventually see it. Hold the run open briefly so the final + # poll delivers it. + await workflow.sleep(timedelta(milliseconds=500)) return f"ticker emitted {self._published} events" From bfbb2ed19f984cb4840be3247e5eedeca84abc4e Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Thu, 30 Apr 2026 07:11:25 +0000 Subject: [PATCH 8/8] workflow_streams README: document the stream-end pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subscribers don't exit on their own when the host workflow completes — they need an in-band terminator, and the workflow needs to hold open briefly so the final publish is fetched before run() returns. Both pieces show up in every scenario here, so document them in one place and update scenario 3's description to mention the sentinel headline the publisher emits. --- workflow_streams/README.md | 47 ++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/workflow_streams/README.md b/workflow_streams/README.md index ef983740..bf2466fb 100644 --- a/workflow_streams/README.md +++ b/workflow_streams/README.md @@ -48,11 +48,12 @@ This directory has four scenarios sharing one Worker. * `run_external_publisher.py` — starts the hub, then publishes events into it from a plain Python coroutine using `WorkflowStreamClient.create(client, workflow_id)`. A subscriber - task runs alongside; when the publisher is done it signals - `HubWorkflow.close`, the workflow's run finishes, and the - subscriber's iterator exits normally. This is the shape that fits a - backend service or scheduled job pushing events into a workflow it - didn't itself start. + task runs alongside; when the publisher is done it emits an in-band + sentinel headline (`__done__`) into the stream, then signals + `HubWorkflow.close`. The subscriber breaks on the sentinel and + exits its `async for`. This is the shape that fits a backend + service or scheduled job pushing events into a workflow it didn't + itself start. **Scenario 4 — bounded log via `truncate()`:** @@ -69,6 +70,42 @@ This directory has four scenarios sharing one Worker. `run_worker.py` registers all four workflows and the activity. +## Ending the stream + +`WorkflowStreamClient.subscribe()` is a long-poll loop — it does not +exit on its own when the host workflow completes. Two things have to +happen at the end of a streamed workflow for clean shutdown: + +1. **An in-band terminator that subscribers recognize.** Each scenario + here sends one before the workflow exits: + - `OrderWorkflow` and `PipelineWorkflow` publish a "complete" + status / stage event; consumers break on it. + - `run_external_publisher.py` publishes a sentinel + `NewsEvent(headline="__done__")` immediately before signaling + `HubWorkflow.close`; the consumer breaks on the sentinel. + - `TickerWorkflow`'s final tick (`n == count - 1`) is the + terminator; subscribers break when they see it. `keep_last` + guarantees that final offset survives the last truncation, so + even slow consumers reach it. + +2. **A short hold-open in the workflow before returning** so that the + final publish gets fetched. Items published in the same workflow + task that returns from `@workflow.run` are abandoned: the + in-memory log dies with the workflow, and the next subscriber + poll lands on a completed workflow. Each workflow here ends with + + ```python + await workflow.sleep(timedelta(milliseconds=500)) + return ... + ``` + + which gives subscribers in their `poll_cooldown` interval time to + issue one more poll. With both pieces in place, subscribers + receive the terminator, break out of their `async for`, and stop + polling — by the time the workflow exits there are no in-flight + poll handlers, so the SDK does not warn about unfinished + handlers. + ## Run it ```bash