diff --git a/docs/docs/api_reference/workflow/context.md b/docs/docs/api_reference/workflow/context.md index 7c5d3bf6..3a4d5b8f 100644 --- a/docs/docs/api_reference/workflow/context.md +++ b/docs/docs/api_reference/workflow/context.md @@ -1,5 +1,29 @@ -::: workflows.context +::: workflows.context.Context options: + show_root_heading: true + show_root_full_path: false members: - - Context - filters: ["!^_", "^__init__$", "^collect_events$"] + - __init__ + - collect_events + - from_dict + - get_result + - is_running + - send_event + - store + - to_dict + - wait_for_event + - write_event_to_stream + + +::: workflows.context.state_store + options: + members: + - DictState + - InMemoryStateStore + +::: workflows.context.serializers + options: + members: + - BaseSerializer + - JsonSerializer + - PickleSerializer diff --git a/docs/docs/api_reference/workflow/errors.md b/docs/docs/api_reference/workflow/errors.md new file mode 100644 index 00000000..a9983237 --- /dev/null +++ b/docs/docs/api_reference/workflow/errors.md @@ -0,0 +1 @@ +::: workflows.errors diff --git a/docs/docs/api_reference/workflow/events.md b/docs/docs/api_reference/workflow/events.md index a963b295..51d7f2eb 100644 --- a/docs/docs/api_reference/workflow/events.md +++ b/docs/docs/api_reference/workflow/events.md @@ -2,5 +2,7 @@ options: members: - Event + - InputRequiredEvent + - HumanResponseEvent - StartEvent - StopEvent diff --git a/docs/docs/api_reference/workflow/handler.md b/docs/docs/api_reference/workflow/handler.md new file mode 100644 index 00000000..93b9267b --- /dev/null +++ b/docs/docs/api_reference/workflow/handler.md @@ -0,0 +1 @@ +::: workflows.handler diff --git a/docs/docs/api_reference/workflow/resource.md b/docs/docs/api_reference/workflow/resource.md new file mode 100644 index 00000000..12338507 --- /dev/null +++ b/docs/docs/api_reference/workflow/resource.md @@ -0,0 +1 @@ +::: workflows.resource.Resource diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index e75e48f4..a804df85 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -22,7 +22,7 @@ nav: - module_guides/workflow - Understanding: - Workflows: - - understanding/workflow + - understanding/workflows plugins: - search - include_dir_to_nav diff --git a/src/workflows/context/context.py b/src/workflows/context/context.py index eee7a9fa..13de86b8 100644 --- a/src/workflows/context/context.py +++ b/src/workflows/context/context.py @@ -52,14 +52,58 @@ class UnserializableKeyWarning(Warning): class Context(Generic[MODEL_T]): """ - A global object representing a context for a given workflow run. - - The Context object can be used to store data that needs to be available across iterations during a workflow - execution, and across multiple workflow runs. - Every context instance offers two type of data storage: a global one, that's shared among all the steps within a - workflow, and private one, that's only accessible from a single step. - - Both `set` and `get` operations on global data are governed by a lock, and considered coroutine-safe. + Global, per-run context and event broker for a `Workflow`. + + The `Context` coordinates event delivery between steps, tracks in-flight work, + exposes a global state store, and provides utilities for streaming and + synchronization. It is created by a `Workflow` at run time and can be + persisted and restored. + + Args: + workflow (Workflow): The owning workflow instance. Used to infer + step configuration and instrumentation. + + Attributes: + is_running (bool): Whether the workflow is currently running. + store (InMemoryStateStore[MODEL_T]): Type-safe, async state store shared + across steps. See also + [InMemoryStateStore][workflows.context.state_store.InMemoryStateStore]. + + Examples: + Basic usage inside a step: + + ```python + from workflows import step + from workflows.events import StartEvent, StopEvent + + @step + async def start(self, ctx: Context, ev: StartEvent) -> StopEvent: + await ctx.store.set("query", ev.topic) + ctx.write_event_to_stream(ev) # surface progress to UI + return StopEvent(result="ok") + ``` + + Persisting the state of a workflow across runs: + + ```python + from workflows import Context + + # Create a context and run the workflow with the same context + ctx = Context(my_workflow) + result_1 = await my_workflow.run(..., ctx=ctx) + result_2 = await my_workflow.run(..., ctx=ctx) + + # Serialize the context and restore it + ctx_dict = ctx.to_dict() + restored_ctx = Context.from_dict(my_workflow, ctx_dict) + result_3 = await my_workflow.run(..., ctx=restored_ctx) + ``` + + + See Also: + - [Workflow][workflows.Workflow] + - [Event][workflows.events.Event] + - [InMemoryStateStore][workflows.context.state_store.InMemoryStateStore] """ # These keys are set by pre-built workflows and @@ -101,6 +145,14 @@ async def _init_state_store(self, state_class: MODEL_T) -> None: @property def store(self) -> InMemoryStateStore[MODEL_T]: + """Typed, process-local state store shared across steps. + + If no state was initialized yet, a default + [DictState][workflows.context.state_store.DictState] store is created. + + Returns: + InMemoryStateStore[MODEL_T]: The state store instance. + """ # Default to DictState if no state manager is initialized if self._state_store is None: # DictState is designed to be compatible with any MODEL_T as the default fallback @@ -158,6 +210,35 @@ def _deserialize_queue( return queue def to_dict(self, serializer: BaseSerializer | None = None) -> dict[str, Any]: + """Serialize the context to a JSON-serializable dict. + + Persists the global state store, event queues, buffers, accepted events, + broker log, and running flag. This payload can be fed to + [from_dict][workflows.context.context.Context.from_dict] to resume a run + or carry state across runs. + + Args: + serializer (BaseSerializer | None): Value serializer used for state + and event payloads. Defaults to + [JsonSerializer][workflows.context.serializers.JsonSerializer]. + + Returns: + dict[str, Any]: A dict suitable for JSON encoding and later + restoration via `from_dict`. + + See Also: + - [InMemoryStateStore.to_dict][workflows.context.state_store.InMemoryStateStore.to_dict] + + Examples: + ```python + ctx_dict = ctx.to_dict() + my_db.set("key", json.dumps(ctx_dict)) + + ctx_dict = my_db.get("key") + restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict)) + result = await my_workflow.run(..., ctx=restored_ctx) + ``` + """ serializer = serializer or JsonSerializer() # Serialize state using the state manager's method @@ -194,6 +275,34 @@ def from_dict( data: dict[str, Any], serializer: BaseSerializer | None = None, ) -> "Context[MODEL_T]": + """Reconstruct a `Context` from a serialized payload. + + Args: + workflow (Workflow): The workflow instance that will own this + context. + data (dict[str, Any]): Payload produced by + [to_dict][workflows.context.context.Context.to_dict]. + serializer (BaseSerializer | None): Serializer used to decode state + and events. Defaults to JSON. + + Returns: + Context[MODEL_T]: A context instance initialized with the persisted + state and queues. + + Raises: + ContextSerdeError: If the payload is missing required fields or is + in an incompatible format. + + Examples: + ```python + ctx_dict = ctx.to_dict() + my_db.set("key", json.dumps(ctx_dict)) + + ctx_dict = my_db.get("key") + restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict)) + result = await my_workflow.run(..., ctx=restored_ctx) + ``` + """ serializer = serializer or JsonSerializer() try: @@ -273,6 +382,11 @@ async def remove_running_step(self, name: str) -> None: del self._currently_running_steps[name] async def running_steps(self) -> list[str]: + """Return the list of currently running step names. + + Returns: + list[str]: Names of steps that have at least one active worker. + """ async with self.lock: return list(self._currently_running_steps) @@ -304,23 +418,38 @@ def collect_events( self, ev: Event, expected: list[Type[Event]], buffer_id: str | None = None ) -> list[Event] | None: """ - Collects events for buffering in workflows. + Buffer events until all expected types are available, then return them. - This method adds the current event to the internal buffer and attempts to collect all - expected event types. If all expected events are found, they will be returned in order. - Otherwise, it returns None and restores any collected events back to the buffer. + This utility is helpful when a step can receive multiple event types + and needs to proceed only when it has a full set. The returned list is + ordered according to `expected`. Args: - ev (Event): The current event to add to the buffer. - expected (list[Type[Event]]): list of expected event types to collect. - buffer_id (str): A unique identifier for the events collected. Ideally this should be - the step name, so to avoid any interference between different steps. If not provided, - a stable identifier will be created using the list of expected events. + ev (Event): The incoming event to add to the buffer. + expected (list[Type[Event]]): Event types to collect, in order. + buffer_id (str | None): Optional stable key to isolate buffers across + steps or workers. Defaults to an internal key derived from the + task name or expected types. Returns: - list[Event] | None: list of collected events in the order of expected types if all - expected events are found; otherwise None. - + list[Event] | None: The events in the requested order when complete, + otherwise `None`. + + Examples: + ```python + @step + async def synthesize( + self, ctx: Context, ev: QueryEvent | RetrieveEvent + ) -> StopEvent | None: + events = ctx.collect_events(ev, [QueryEvent, RetrieveEvent]) + if events is None: + return None + query_ev, retrieve_ev = events + # ... proceed with both inputs present ... + ``` + + See Also: + - [Event][workflows.events.Event] """ buffer_id = buffer_id or self._get_event_buffer_id(expected) @@ -352,11 +481,42 @@ def collect_events( return None def send_event(self, message: Event, step: str | None = None) -> None: - """ - Sends an event to a specific step in the workflow. + """Dispatch an event to one or all workflow steps. + + If `step` is omitted, the event is broadcast to all step queues and + non-matching steps will ignore it. When `step` is provided, the target + step must accept the event type or a + [WorkflowRuntimeError][workflows.errors.WorkflowRuntimeError] is raised. + + Args: + message (Event): The event to enqueue. + step (str | None): Optional step name to target. - If step is None, the event is sent to all the receivers and we let - them discard events they don't want. + Raises: + WorkflowRuntimeError: If the target step does not exist or does not + accept the event type. + + Examples: + It's common to use this method to fan-out events: + + ```python + @step + async def my_step(self, ctx: Context, ev: StartEvent) -> WorkerEvent | GatherEvent: + for i in range(10): + ctx.send_event(WorkerEvent(msg=i)) + return GatherEvent() + ``` + + You also see this method used from the caller side to send events into the workflow: + + ```python + handler = my_workflow.run(...) + async for ev in handler.stream_events(): + if isinstance(ev, SomeEvent): + handler.ctx.send_event(SomeOtherEvent(msg="Hello!")) + + result = await handler + ``` """ if step is None: for queue in self._queues.values(): @@ -383,24 +543,41 @@ async def wait_for_event( requirements: dict[str, Any] | None = None, timeout: float | None = 2000, ) -> T: - """ - Asynchronously wait for a specific event type to be received. + """Wait for the next matching event of type `event_type`. - If provided, `waiter_event` will be written to the event stream to let the caller know that we're waiting for a response. + Optionally emits a `waiter_event` to the event stream once per `waiter_id` to + inform callers that the workflow is waiting for external input. + This helps to prevent duplicate waiter events from being sent to the event stream. Args: - event_type: The type of event to wait for - waiter_event: The event to emit to the event stream to let the caller know that we're waiting for a response - waiter_id: A unique identifier for this specific wait call. It helps ensure that we only send one `waiter_event` for each `waiter_id`. - requirements: Optional dict of requirements the event must match - timeout: Optional timeout in seconds. Defaults to 2000s. + event_type (type[T]): Concrete event class to wait for. + waiter_event (Event | None): Optional event to write to the stream + once when the wait begins. + waiter_id (str | None): Stable identifier to avoid emitting multiple + waiter events for the same logical wait. + requirements (dict[str, Any] | None): Key/value filters that must be + satisfied by the event via `event.get(key) == value`. + timeout (float | None): Max seconds to wait. `None` means no + timeout. Defaults to 2000 seconds. Returns: - The event type that was requested. + T: The received event instance of the requested type. Raises: - asyncio.TimeoutError: If the timeout is reached before receiving matching event - + asyncio.TimeoutError: If the timeout elapses. + + Examples: + ```python + @step + async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent: + response = await ctx.wait_for_event( + HumanResponseEvent, + waiter_event=InputRequiredEvent(msg="What's your name?"), + waiter_id="user_name", + timeout=60, + ) + return StopEvent(result=response.response) + ``` """ requirements = requirements or {} @@ -435,23 +612,47 @@ async def wait_for_event( await self.store.set(waiter_id, False) def write_event_to_stream(self, ev: Event | None) -> None: + """Enqueue an event for streaming to [WorkflowHandler]](workflows.handler.WorkflowHandler). + + Args: + ev (Event | None): The event to stream. `None` can be used as a + sentinel in some streaming modes. + + Examples: + ```python + @step + async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent: + ctx.write_event_to_stream(ev) + return StopEvent(result="ok") + ``` + """ self._streaming_queue.put_nowait(ev) def get_result(self) -> RunResultT: - """Returns the result of the workflow.""" + """Return the final result of the workflow run. + + Examples: + ```python + result = await my_workflow.run(..., ctx=ctx) + result_agent = ctx.get_result() + ``` + + Returns: + RunResultT: The value provided via a `StopEvent`. + """ return self._retval @property def streaming_queue(self) -> asyncio.Queue: + """The internal queue used for streaming events to callers.""" return self._streaming_queue async def shutdown(self) -> None: - """ - To be called when a workflow ends. + """Shut down the workflow run and clean up background tasks. - We clear all the tasks and set the is_running flag. Note that we - don't clear _globals or _queues so that the context can be still - used after the shutdown to fetch data or consume leftover events. + Cancels all outstanding workers, waits for them to finish, and marks the + context as not running. Queues and state remain available so callers can + inspect or drain leftover events. """ self.is_running = False # Cancel all running tasks @@ -470,6 +671,16 @@ def add_step_worker( run_id: str, resource_manager: ResourceManager, ) -> None: + """Spawn a background worker task to process events for a step. + + Args: + name (str): Step name. + step (Callable): Step function (sync or async). + config (StepConfig): Resolved configuration for the step. + verbose (bool): If True, print step activity. + run_id (str): Run identifier for instrumentation. + resource_manager (ResourceManager): Resource injector for the step. + """ self._tasks.add( asyncio.create_task( self._step_worker( @@ -608,6 +819,14 @@ async def _step_worker( self.send_event(new_ev) def add_cancel_worker(self) -> None: + """Install a worker that turns a cancel flag into an exception. + + When the cancel flag is set, a `WorkflowCancelledByUser` will be raised + internally to abort the run. + + See Also: + - [WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser] + """ self._tasks.add(asyncio.create_task(self._cancel_worker())) async def _cancel_worker(self) -> None: diff --git a/src/workflows/context/serializers.py b/src/workflows/context/serializers.py index 409bf2c8..a613d9a2 100644 --- a/src/workflows/context/serializers.py +++ b/src/workflows/context/serializers.py @@ -15,6 +15,17 @@ class BaseSerializer(ABC): + """ + Interface for value serialization used by the workflow context and state store. + + Implementations must encode arbitrary Python values into a string and be able + to reconstruct the original values from that string. + + See Also: + - [JsonSerializer][workflows.context.serializers.JsonSerializer] + - [PickleSerializer][workflows.context.serializers.PickleSerializer] + """ + @abstractmethod def serialize(self, value: Any) -> str: ... @@ -23,6 +34,32 @@ def deserialize(self, value: str) -> Any: ... class JsonSerializer(BaseSerializer): + """ + JSON-first serializer that understands Pydantic models and LlamaIndex components. + + Behavior: + - Pydantic models are encoded as JSON with their qualified class name so they + can be faithfully reconstructed. + - LlamaIndex components (objects exposing `class_name` and `to_dict`) are + serialized to their dict form alongside the qualified class name. + - Dicts and lists are handled recursively. + + Fallback for unsupported objects is to attempt JSON encoding directly; if it + fails, a `ValueError` is raised. + + Examples: + ```python + s = JsonSerializer() + payload = s.serialize({"x": 1, "y": [2, 3]}) + data = s.deserialize(payload) + assert data == {"x": 1, "y": [2, 3]} + ``` + + See Also: + - [BaseSerializer][workflows.context.serializers.BaseSerializer] + - [PickleSerializer][workflows.context.serializers.PickleSerializer] + """ + def _serialize_value(self, value: Any) -> Any: """Helper to serialize a single value.""" # Note: to avoid circular dependencies we cannot import BaseComponent from llama_index.core @@ -52,6 +89,17 @@ def _serialize_value(self, value: Any) -> Any: return value def serialize(self, value: Any) -> str: + """Serialize an arbitrary value to a JSON string. + + Args: + value (Any): The value to encode. + + Returns: + str: JSON string. + + Raises: + ValueError: If the value cannot be encoded to JSON. + """ try: serialized_value = self._serialize_value(value) return json.dumps(serialized_value) @@ -73,23 +121,69 @@ def _deserialize_value(self, data: Any) -> Any: return data def deserialize(self, value: str) -> Any: + """Deserialize a JSON string into Python objects. + + Args: + value (str): JSON string. + + Returns: + Any: The reconstructed value. + """ data = json.loads(value) return self._deserialize_value(data) class PickleSerializer(JsonSerializer): + """ + Hybrid serializer: JSON when possible, Pickle as a safe fallback. + + This serializer attempts JSON first for readability and portability, and + transparently falls back to Pickle for objects that cannot be represented in + JSON. Deserialization prioritizes Pickle and falls back to JSON. + + Warning: + Pickle can execute arbitrary code during deserialization. Only + deserialize trusted payloads. + + Note: Used to be called `JsonPickleSerializer` but it was renamed to `PickleSerializer`. + + Examples: + ```python + s = PickleSerializer() + class Foo: + def __init__(self, x): + self.x = x + payload = s.serialize(Foo(1)) # will likely use Pickle + obj = s.deserialize(payload) + assert isinstance(obj, Foo) + ``` + """ + def serialize(self, value: Any) -> str: - """Serialize while prioritizing JSON, falling back to Pickle.""" + """Serialize with JSON preference and Pickle fallback. + + Args: + value (Any): The value to encode. + + Returns: + str: Encoded string (JSON or base64-encoded Pickle bytes). + """ try: return super().serialize(value) except Exception: return base64.b64encode(pickle.dumps(value)).decode("utf-8") def deserialize(self, value: str) -> Any: - """ - Deserialize while prioritizing Pickle, falling back to JSON. - To avoid malicious exploits of the deserialization, deserialize objects - only when you deem it safe to do so. + """Deserialize with Pickle preference and JSON fallback. + + Args: + value (str): Encoded string. + + Returns: + Any: The reconstructed value. + + Notes: + Use only with trusted payloads due to Pickle security implications. """ try: return pickle.loads(base64.b64decode(value)) diff --git a/src/workflows/context/state_store.py b/src/workflows/context/state_store.py index 0a2aa679..31b3f9ee 100644 --- a/src/workflows/context/state_store.py +++ b/src/workflows/context/state_store.py @@ -23,10 +23,22 @@ class UnserializableKeyWarning(Warning): class DictState(DictLikeModel): """ - A dynamic state class that behaves like a dictionary. + Dynamic, dict-like Pydantic model for workflow state. - This is used as the default state type when no specific state class is provided. - It allows storing arbitrary key-value pairs while still being a Pydantic model. + Used as the default state model when no typed state is provided. Behaves + like a mapping while retaining Pydantic validation and serialization. + + Examples: + ```python + from workflows.context.state_store import DictState + + state = DictState() + state["foo"] = 1 + state.bar = 2 # attribute-style access works for nested structures + ``` + + See Also: + - [InMemoryStateStore][workflows.context.state_store.InMemoryStateStore] """ def __init__(self, **params: Any): @@ -39,55 +51,44 @@ def __init__(self, **params: Any): class InMemoryStateStore(Generic[MODEL_T]): """ - State manager for a workflow that provides type-safe state management. - - By using Context[StateType] as the parameter type annotation, the state manager - is automatically initialized with the correct type, providing full type safety - and IDE autocompletion. - - When no state type is specified (just Context), it defaults to DictState which - behaves like a regular dictionary. - - Example with typed state: - ```python - from pydantic import BaseModel - from workflows import Workflow, Context, step - from workflows.events import StartEvent, StopEvent - - class MyState(BaseModel): - name: str = "Unknown" - age: int = 0 - - class MyWorkflow(Workflow): - @step - async def step_1(self, ctx: Context[MyState], ev: StartEvent) -> StopEvent: - # ctx._state.get() is now properly typed as MyState - state = await ctx._state.get() - state.name = "John" # Type-safe: IDE knows this is a string - state.age = 30 # Type-safe: IDE knows this is an int - await ctx._state.set(state) - return StopEvent() - ``` - - Example with untyped dict-like state: - ```python - class MyWorkflow(Workflow): - @step - async def step_1(self, ctx: Context, ev: StartEvent) -> StopEvent: - # ctx._state behaves like a dict - state = await ctx._state.get() - state.name = "John" # Works like a dict - state.age = 30 # Dynamic assignment - await ctx._state.set(state) - return StopEvent() - ``` - - The state manager provides: - - Type-safe access to state properties with full IDE support (when typed) - - Dict-like behavior for dynamic state management (when untyped) - - Automatic state initialization based on the generic type parameter - - Thread-safe state access with async locking - - Deep path-based state access and modification + Async, in-memory, type-safe state manager for workflows. + + This store holds a single Pydantic model instance representing global + workflow state. When the generic parameter is omitted, it defaults to + [DictState][workflows.context.state_store.DictState] for flexible, + dictionary-like usage. + + Thread-safety is ensured with an internal `asyncio.Lock`. Consumers can + either perform atomic reads/writes via `get_state` and `set_state`, or make + in-place, transactional edits via the `edit_state` context manager. + + Examples: + Typed state model: + + ```python + from pydantic import BaseModel + from workflows.context.state_store import InMemoryStateStore + + class MyState(BaseModel): + count: int = 0 + + store = InMemoryStateStore(MyState()) + async with store.edit_state() as state: + state.count += 1 + ``` + + Dynamic state with `DictState`: + + ```python + from workflows.context.state_store import InMemoryStateStore, DictState + + store = InMemoryStateStore(DictState()) + await store.set("user.profile.name", "Ada") + name = await store.get("user.profile.name") + ``` + + See Also: + - [Context.store][workflows.context.context.Context.store] """ # These keys are set by pre-built workflows and @@ -99,11 +100,22 @@ def __init__(self, initial_state: MODEL_T): self._lock = asyncio.Lock() async def get_state(self) -> MODEL_T: - """Get a copy of the current state.""" + """Return a shallow copy of the current state model. + + Returns: + MODEL_T: A `.model_copy()` of the internal Pydantic model. + """ return self._state.model_copy() async def set_state(self, state: MODEL_T) -> None: - """Set the current state.""" + """Replace the current state model. + + Args: + state (MODEL_T): New state of the same type as the existing model. + + Raises: + ValueError: If the type differs from the existing state type. + """ if not isinstance(state, type(self._state)): raise ValueError(f"State must be of type {type(self._state)}") @@ -111,11 +123,19 @@ async def set_state(self, state: MODEL_T) -> None: self._state = state def to_dict(self, serializer: "BaseSerializer") -> dict[str, Any]: - """ - Serialize the state manager's state. + """Serialize the state and model metadata for persistence. + + For `DictState`, each individual item is serialized using the provided + serializer since values can be arbitrary Python objects. For other + Pydantic models, defers to the serializer (e.g. JSON) which can leverage + model-aware encoding. - For DictState, uses the BaseSerializer for individual items since they can be arbitrary types. - For other Pydantic models, leverages Pydantic's serialization but uses BaseSerializer for complex types. + Args: + serializer (BaseSerializer): Strategy used to encode values. + + Returns: + dict[str, Any]: A payload suitable for + [from_dict][workflows.context.state_store.InMemoryStateStore.from_dict]. """ # Special handling for DictState - serialize each item in _data if isinstance(self._state, DictState): @@ -154,8 +174,15 @@ def to_dict(self, serializer: "BaseSerializer") -> dict[str, Any]: def from_dict( cls, serialized_state: dict[str, Any], serializer: "BaseSerializer" ) -> "InMemoryStateStore[MODEL_T]": - """ - Deserialize and restore a state manager. + """Restore a state store from a serialized payload. + + Args: + serialized_state (dict[str, Any]): The payload produced by + [to_dict][workflows.context.state_store.InMemoryStateStore.to_dict]. + serializer (BaseSerializer): Strategy to decode stored values. + + Returns: + InMemoryStateStore[MODEL_T]: A store with the reconstructed model. """ if not serialized_state: # Return a default DictState manager @@ -185,17 +212,13 @@ def from_dict( @asynccontextmanager async def edit_state(self) -> AsyncGenerator[MODEL_T, None]: - """ - A context manager for editing the state. - The state will be locked while the context manager is active. - Any changes made to the state will be saved when the context manager is exited. + """Edit state transactionally under a lock. - Example: - ```python - async with ctx.store.edit_state() as state: - state.name = "John" - state.age = 30 - ``` + Yields the mutable model and writes it back on exit. This pattern avoids + read-modify-write races and keeps updates atomic. + + Yields: + MODEL_T: The current state model for in-place mutation. """ async with self._lock: state = self._state @@ -205,9 +228,22 @@ async def edit_state(self) -> AsyncGenerator[MODEL_T, None]: self._state = state async def get(self, path: str, default: Optional[Any] = Ellipsis) -> Any: - """ - Return a value from *path*, where path is a dot-separated string. - Example: await sm.get("user.profile.name") + """Get a nested value using dot-separated paths. + + Supports dict keys, list indices, and attribute access transparently at + each segment. + + Args: + path (str): Dot-separated path, e.g. "user.profile.name". + default (Any): If provided, return this when the path does not + exist; otherwise, raise `ValueError`. + + Returns: + Any: The resolved value. + + Raises: + ValueError: If the path is invalid and no default is provided or if + the path depth exceeds limits. """ segments = path.split(".") if path else [] if len(segments) > MAX_DEPTH: @@ -228,7 +264,18 @@ async def get(self, path: str, default: Optional[Any] = Ellipsis) -> Any: return value async def set(self, path: str, value: Any) -> None: - """Set *value* at the location designated by *path* (dot-separated).""" + """Set a nested value using dot-separated paths. + + Intermediate containers are created as needed. Dicts, lists, tuples, and + Pydantic models are supported where appropriate. + + Args: + path (str): Dot-separated path to write. + value (Any): Value to assign. + + Raises: + ValueError: If the path is empty or exceeds the maximum depth. + """ if not path: raise ValueError("Path cannot be empty") @@ -253,7 +300,12 @@ async def set(self, path: str, value: Any) -> None: self._assign_step(current, segments[-1], value) async def clear(self) -> None: - """Clear the state.""" + """Reset the state to its type defaults. + + Raises: + ValueError: If the model type cannot be instantiated from defaults + (i.e., fields missing default values). + """ try: await self.set_state(self._state.__class__()) except ValidationError: diff --git a/src/workflows/decorators.py b/src/workflows/decorators.py index 89f67dda..aaf99b3d 100644 --- a/src/workflows/decorators.py +++ b/src/workflows/decorators.py @@ -40,20 +40,47 @@ def step( retry_policy: RetryPolicy | None = None, ) -> Callable: """ - Decorator used to mark methods and functions as workflow steps. + Decorate a callable to declare it as a workflow step. - Decorators are evaluated at import time, but we need to wait for - starting the communication channels until runtime. For this reason, - we temporarily store the list of events that will be consumed by this - step in the function object itself. + The decorator inspects the function signature to infer the accepted event + type, return event types, optional `Context` parameter (optionally with a + typed state model), and any resource injections via `typing.Annotated`. + + When applied to free functions, provide the workflow class via + `workflow=MyWorkflow`. For instance methods, the association is automatic. Args: - workflow: Workflow class to which the decorated step will be added. Only needed when using the - decorator on free functions instead of class methods. - num_workers: The number of workers that will process events for the decorated step. The default - value works most of the times. - retry_policy: The policy used to retry a step that encountered an error while running. + workflow (type[Workflow] | None): Workflow class to attach the free + function step to. Not required for methods. + num_workers (int): Number of workers for this step. Defaults to 4. + retry_policy (RetryPolicy | None): Optional retry policy for failures. + + Returns: + Callable: The original function, annotated with internal step metadata. + + Raises: + WorkflowValidationError: If signature validation fails or when decorating + a free function without specifying `workflow`. + + Examples: + Method step: + + ```python + class MyFlow(Workflow): + @step + async def start(self, ev: StartEvent) -> StopEvent: + return StopEvent(result="done") + ``` + + Free function step: + + ```python + class MyWorkflow(Workflow): + pass + @step(workflow=MyWorkflow) + async def generate(ev: StartEvent) -> NextEvent: ... + ``` """ def decorator(func: Callable) -> Callable: diff --git a/src/workflows/errors.py b/src/workflows/errors.py index c8641abe..f5e95e55 100644 --- a/src/workflows/errors.py +++ b/src/workflows/errors.py @@ -3,32 +3,32 @@ class WorkflowValidationError(Exception): - pass + """Raised when the workflow configuration or step signatures are invalid.""" class WorkflowTimeoutError(Exception): - pass + """Raised when a workflow run exceeds the configured timeout.""" class WorkflowRuntimeError(Exception): - pass + """Raised for runtime errors during step execution or event routing.""" class WorkflowDone(Exception): - pass + """Internal control-flow exception used to terminate workers at run end.""" class WorkflowCancelledByUser(Exception): - pass + """Raised when a run is cancelled via the handler or programmatically.""" class WorkflowStepDoesNotExistError(Exception): - pass + """Raised when addressing a step that does not exist in the workflow.""" class WorkflowConfigurationError(Exception): - pass + """Raised when a logical configuration error is detected pre-run.""" class ContextSerdeError(Exception): - pass + """Raised when serializing/deserializing a `Context` fails.""" diff --git a/src/workflows/events.py b/src/workflows/events.py index 0b125b76..30e1851c 100644 --- a/src/workflows/events.py +++ b/src/workflows/events.py @@ -16,10 +16,15 @@ class DictLikeModel(BaseModel): """ - Base Pydantic model class that mimics dict interface. + Base Pydantic model class that mimics a dict interface for dynamic fields. + + Known, typed fields behave like regular Pydantic attributes. Any extra + keyword arguments are stored in an internal dict and can be accessed through + both attribute and mapping semantics. This hybrid model enables flexible + event payloads while preserving validation for declared fields. PrivateAttr: - _data (dict[str, Any]): Underlying Python dict. + _data (dict[str, Any]): Underlying Python dict for dynamic fields. """ model_config = ConfigDict(arbitrary_types_allowed=True) @@ -112,42 +117,29 @@ def custom_model_dump(self, handler: Any) -> dict[str, Any]: class Event(DictLikeModel): """ - Base class for event types that mimics dict interface. + Base class for all workflow events. + + Events are light-weight, serializable payloads passed between steps. + They support both attribute and mapping access to dynamic fields. Examples: - Basic example usage + Subclassing with typed fields: ```python - from llama_index.core.workflows.events import Event - - evt = Event(a=1, b=2) + from pydantic import Field - # can use dot access to get values of `a` and `b` - print((evt.a, evt.b)) + class CustomEv(Event): + score: int = Field(ge=0) - # can also set the attrs - evt.a = 2 + e = CustomEv(score=10) + print(e.score) ``` - Custom event with additional Fields/PrivateAttr - - ```python - from llama_index.core.workflows.events import Event - from pydantic import Field, PrivateAttr - - class CustomEvent(Event): - field_1: int = Field(description="my custom field") - _private_attr_1: int = PrivateAttr() - - evt = CustomEvent(a=1, b=2, field_1=3, _private_attr_1=4) - - # `field_1` and `_private_attr_1` get set as they do with Pydantic BaseModel - print(evt.field_1) - print(evt._private_attr_1) - - # `a` and `b` get set in the underlying dict, namely `evt._data` - print((evt.a, evt.b)) - ``` + See Also: + - [StartEvent][workflows.events.StartEvent] + - [StopEvent][workflows.events.StopEvent] + - [InputRequiredEvent][workflows.events.InputRequiredEvent] + - [HumanResponseEvent][workflows.events.HumanResponseEvent] """ def __init__(self, **params: Any): @@ -155,11 +147,32 @@ def __init__(self, **params: Any): class StartEvent(Event): - """StartEvent is implicitly sent when a workflow runs.""" + """Implicit entry event sent to kick off a `Workflow.run()`.""" class StopEvent(Event): - """EndEvent signals the workflow to stop.""" + """Terminal event that signals the workflow has completed. + + The `result` property contains the return value of the workflow run. When a + custom stop event subclass is used, the workflow result is that event + instance itself. + + Examples: + ```python + # default stop event: result holds the value + return StopEvent(result={"answer": 42}) + ``` + + Subclassing to provide a custom result: + + ```python + class MyStopEv(StopEvent): + pass + + @step + async def my_step(self, ctx: Context, ev: StartEvent) -> MyStopEv: + return MyStopEv(result={"answer": 42}) + """ _result: Any = PrivateAttr(default=None) @@ -177,11 +190,53 @@ def result(self) -> Any: class InputRequiredEvent(Event): - """InputRequiredEvent is sent when an input is required for a step.""" + """Emitted when human input is required to proceed. + + Automatically written to the event stream if returned from a step. + + If returned from a step, it does not need to be consumed by other steps and will pass validation. + It's expected that the caller will respond to this event and send back a [HumanResponseEvent][workflows.events.HumanResponseEvent]. + + Use this directly or subclass it. + + Typical flow: a step returns `InputRequiredEvent`, callers consume it from + the stream and send back a [HumanResponseEvent][workflows.events.HumanResponseEvent]. + + Examples: + ```python + from workflows.events import InputRequiredEvent, HumanResponseEvent + + class HITLWorkflow(Workflow): + @step + async def my_step(self, ev: StartEvent) -> InputRequiredEvent: + return InputRequiredEvent(prefix="What's your name? ") + + @step + async def my_step(self, ev: HumanResponseEvent) -> StopEvent: + return StopEvent(result=ev.response) + ``` + """ class HumanResponseEvent(Event): - """HumanResponseEvent is sent when a human response is required for a step.""" + """Carries a human's response for a prior input request. + + If consumed by a step and not returned by another, it will still pass validation. + + Examples: + ```python + from workflows.events import InputRequiredEvent, HumanResponseEvent + + class HITLWorkflow(Workflow): + @step + async def my_step(self, ev: StartEvent) -> InputRequiredEvent: + return InputRequiredEvent(prefix="What's your name? ") + + @step + async def my_step(self, ev: HumanResponseEvent) -> StopEvent: + return StopEvent(result=ev.response) + ``` + """ EventType = Type[Event] diff --git a/src/workflows/handler.py b/src/workflows/handler.py index 41933c87..b71dedd1 100644 --- a/src/workflows/handler.py +++ b/src/workflows/handler.py @@ -13,6 +13,18 @@ class WorkflowHandler(asyncio.Future[RunResultT]): + """ + Handle a running workflow: await results, stream events, access context, or cancel. + + Instances are returned by [Workflow.run][workflows.workflow.Workflow.run]. + They can be awaited for the final result and support streaming intermediate + events via [stream_events][workflows.handler.WorkflowHandler.stream_events]. + + See Also: + - [Context][workflows.context.context.Context] + - [StopEvent][workflows.events.StopEvent] + """ + def __init__( self, *args: Any, @@ -27,12 +39,14 @@ def __init__( @property def ctx(self) -> Context | None: + """The workflow [Context][workflows.context.context.Context] for this run.""" return self._ctx def __str__(self) -> str: return str(self.result()) def is_done(self) -> bool: + """Return True when the workflow has completed.""" return self.done() async def stream_events(self) -> AsyncGenerator[Event, None]: @@ -44,8 +58,8 @@ async def stream_events(self) -> AsyncGenerator[Event, None]: Events are yielded in the order they are generated by the workflow. The stream includes all events written to the context's streaming queue, - and terminates when a StopEvent is encountered, indicating the workflow - has completed. + and terminates when a [StopEvent][workflows.events.StopEvent] is + encountered, indicating the workflow has completed. Returns: AsyncGenerator[Event, None]: An async generator that yields Event objects @@ -54,7 +68,7 @@ async def stream_events(self) -> AsyncGenerator[Event, None]: Raises: ValueError: If the context is not set on the handler. WorkflowRuntimeError: If all events have already been consumed by a - previous call to stream_events() on the same handler instance. + previous call to `stream_events()` on the same handler instance. Examples: ```python @@ -73,7 +87,7 @@ async def stream_events(self) -> AsyncGenerator[Event, None]: Note: Events can only be streamed once per handler instance. Subsequent - calls to stream_events() will raise a WorkflowRuntimeError. + calls to `stream_events()` will raise a WorkflowRuntimeError. """ if self.ctx is None: raise ValueError("Context is not set!") @@ -93,7 +107,18 @@ async def stream_events(self) -> AsyncGenerator[Event, None]: break async def cancel_run(self) -> None: - """Method to cancel a Workflow execution.""" + """Cancel the running workflow. + + Signals the underlying context to raise + [WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser], + which will be caught by the workflow and gracefully end the run. + + Examples: + ```python + handler = workflow.run() + await handler.cancel_run() + ``` + """ if self.ctx: self.ctx._cancel_flag.set() await asyncio.sleep(0) diff --git a/src/workflows/resource.py b/src/workflows/resource.py index 2c5cb3b9..edc3362c 100644 --- a/src/workflows/resource.py +++ b/src/workflows/resource.py @@ -22,6 +22,12 @@ class _Resource(Generic[T]): + """Internal wrapper for resource factories. + + Wraps sync/async factories and records metadata such as the qualified name + and cache behavior. + """ + def __init__(self, factory: Callable[..., T | Awaitable[T]], cache: bool) -> None: self._factory = factory self._is_async = inspect.iscoroutinefunction(factory) @@ -29,6 +35,7 @@ def __init__(self, factory: Callable[..., T | Awaitable[T]], cache: bool) -> Non self.cache = cache async def call(self) -> T: + """Invoke the underlying factory, awaiting if necessary.""" if self._is_async: result = await cast(Callable[..., Awaitable[T]], self._factory)() else: @@ -37,23 +44,68 @@ async def call(self) -> T: class ResourceDefinition(BaseModel): + """Definition for a resource injection requested by a step signature. + + Attributes: + name (str): Parameter name in the step function. + resource (_Resource): Factory wrapper used by the manager to produce the dependency. + """ + model_config = ConfigDict(arbitrary_types_allowed=True) name: str resource: _Resource def Resource(factory: Callable[..., T], cache: bool = True) -> _Resource[T]: + """Declare a resource to inject into step functions. + + Args: + factory (Callable[..., T]): Function returning the resource instance. May be async. + cache (bool): If True, reuse the produced resource across steps. Defaults to True. + + Returns: + _Resource[T]: A resource descriptor to be used in `typing.Annotated`. + + Examples: + ```python + from typing import Annotated + from workflows.resource import Resource + + def get_memory(**kwargs) -> Memory: + return Memory.from_defaults("user123", token_limit=60000) + + class MyWorkflow(Workflow): + @step + async def first( + self, + ev: StartEvent, + memory: Annotated[Memory, Resource(get_memory)], + ) -> StopEvent: + await memory.aput(...) + return StopEvent(result="ok") + ``` + """ return _Resource(factory, cache) class ResourceManager: + """Manage resource lifecycles and caching across workflow steps. + + Methods: + set: Manually set a resource by name. + get: Produce or retrieve a resource via its descriptor. + get_all: Return the internal name->resource map. + """ + def __init__(self) -> None: self.resources: dict[str, Any] = {} async def set(self, name: str, val: Any) -> None: + """Register a resource instance under a name.""" self.resources.update({name: val}) async def get(self, resource: _Resource) -> Any: + """Return a resource instance, honoring cache settings.""" if not resource.cache: val = await resource.call() elif resource.cache and not self.resources.get(resource.name, None): @@ -64,4 +116,5 @@ async def get(self, resource: _Resource) -> Any: return val def get_all(self) -> dict[str, Any]: + """Return all materialized resources.""" return self.resources diff --git a/src/workflows/retry_policy.py b/src/workflows/retry_policy.py index bd17defc..8f8cc7ed 100644 --- a/src/workflows/retry_policy.py +++ b/src/workflows/retry_policy.py @@ -8,34 +8,51 @@ @runtime_checkable class RetryPolicy(Protocol): + """ + Policy interface to control step retries after failures. + + Implementations decide whether to retry and how long to wait before the next + attempt based on elapsed time, number of attempts, and the last error. + + See Also: + - [ConstantDelayRetryPolicy][workflows.retry_policy.ConstantDelayRetryPolicy] + - [step][workflows.decorators.step] + """ + def next( self, elapsed_time: float, attempts: int, error: Exception ) -> float | None: """ - Decides if we should make another retry, returning the number of seconds to wait before the next run. + Decide if another retry should occur and the delay before it. Args: - elapsed_time: Time in seconds that passed since the last attempt. - attempts: The number of attempts done so far. - error: The last error occurred. + elapsed_time (float): Seconds since the first failure. + attempts (int): Number of attempts made so far. + error (Exception): The last exception encountered. Returns: - The amount of seconds to wait before the next attempt, or None if we stop retrying. - + float | None: Seconds to wait before retrying, or `None` to stop. """ class ConstantDelayRetryPolicy: - """A simple policy that retries a step at regular intervals for a number of times.""" + """Retry at a fixed interval up to a maximum number of attempts. + + Examples: + ```python + @step(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=10)) + async def flaky(self, ev: StartEvent) -> StopEvent: + ... + ``` + """ def __init__(self, maximum_attempts: int = 3, delay: float = 5) -> None: """ - Creates a ConstantDelayRetryPolicy instance. + Initialize the policy. Args: - maximum_attempts: How many consecutive times the workflow should try to run the step in case of an error. - delay: how much time in seconds must pass before another attempt. - + maximum_attempts (int): Maximum consecutive attempts. Defaults to 3. + delay (float): Seconds to wait between attempts. Defaults to 5. """ self.maximum_attempts = maximum_attempts self.delay = delay @@ -43,6 +60,7 @@ def __init__(self, maximum_attempts: int = 3, delay: float = 5) -> None: def next( self, elapsed_time: float, attempts: int, error: Exception ) -> float | None: + """Return the fixed delay while attempts remain; otherwise `None`.""" if attempts >= self.maximum_attempts: return None diff --git a/src/workflows/types.py b/src/workflows/types.py index 6377daa0..e0261d6c 100644 --- a/src/workflows/types.py +++ b/src/workflows/types.py @@ -16,3 +16,11 @@ # TODO: When releasing 1.0, remove support for Any # and enforce usage of StopEventT RunResultT = Union[StopEventT, Any] +""" +Type aliases for workflow results. + +- `StopEventT`: Generic bound to [StopEvent][workflows.events.StopEvent] +- `RunResultT`: Result type returned by a workflow run. Today it allows either + a `StopEventT` subclass or `Any` for backward compatibility; future versions + may restrict this to `StopEventT` only. +""" diff --git a/src/workflows/workflow.py b/src/workflows/workflow.py index aecba6aa..9bcf9fea 100644 --- a/src/workflows/workflow.py +++ b/src/workflows/workflow.py @@ -48,17 +48,50 @@ def __init__(cls, name: str, bases: Tuple[type, ...], dct: dict[str, Any]) -> No class Workflow(metaclass=WorkflowMeta): """ - An event-driven abstraction used to orchestrate the execution of different components called "steps". - - Each step is responsible for handling certain event types and possibly emitting new events. Steps can be "bound" - when they are defined as methods of the `Workflow` class itself, or "unbound" when they are defined as free - functions. To define a step, the method or function must be decorated with the `@step` decorator. - - Workflows provide basic validation to catch potential runtime errors as soon as possible. Validation happens once, - when the workflow starts, and does not produce much overhead. It can be disabled in any case. - - Use an instance of a `Workflow` class to run a workflow and stream events produced during execution. Workflows - can be run step-by-step, by calling the `run_step` function multiple times until completion. + Event-driven orchestrator to define and run application flows using typed steps. + + A `Workflow` is composed of `@step`-decorated callables that accept and emit + typed [Event][workflows.events.Event]s. Steps can be declared as instance + methods or as free functions registered via the decorator. + + Key features: + - Validation of step signatures and event graph before running + - Typed start/stop events + - Streaming of intermediate events + - Optional human-in-the-loop events + - Retry policies per step + - Resource injection + + Examples: + Basic usage: + + ```python + from workflows import Workflow, step + from workflows.events import StartEvent, StopEvent + + class MyFlow(Workflow): + @step + async def start(self, ev: StartEvent) -> StopEvent: + return StopEvent(result="done") + + result = await MyFlow(timeout=60).run(topic="Pirates") + ``` + + Custom start/stop events and streaming: + + ```python + handler = MyFlow().run() + async for ev in handler.stream_events(): + ... + result = await handler + ``` + + See Also: + - [step][workflows.decorators.step] + - [Event][workflows.events.Event] + - [Context][workflows.context.context.Context] + - [WorkflowHandler][workflows.handler.WorkflowHandler] + - [RetryPolicy][workflows.retry_policy.RetryPolicy] """ def __init__( @@ -70,22 +103,17 @@ def __init__( num_concurrent_runs: int | None = None, ) -> None: """ - Create an instance of the workflow. + Initialize a workflow instance. Args: - timeout: - Number of seconds after the workflow execution will be halted, raising a `WorkflowTimeoutError` - exception. If set to `None`, the timeout will be disabled. - disable_validation: - Whether or not the workflow should be validated before running. In case the workflow is - misconfigured, a call to `run` will raise a `WorkflowValidationError` exception explaining the details - of the problem. - verbose: - Whether or not the workflow should print additional informative messages during execution. - num_concurrent_runs: - maximum number of .run() executions occurring simultaneously. If set to `None`, there - is no limit to this number. - + timeout (float | None): Max seconds to wait for completion. `None` + disables the timeout. + disable_validation (bool): Skip pre-run validation of the event graph + (not recommended). + verbose (bool): If True, print step activity. + resource_manager (ResourceManager | None): Custom resource manager + for dependency injection. + num_concurrent_runs (int | None): Limit on concurrent `run()` calls. """ # Configuration self._timeout = timeout @@ -129,7 +157,10 @@ def _ensure_start_event_class(self) -> type[StartEvent]: @property def start_event_class(self) -> type[StartEvent]: - """Returns the StartEvent type used in this workflow.""" + """The `StartEvent` subclass accepted by this workflow. + + Determined by inspecting step input types. + """ return self._start_event_class def _ensure_stop_event_class(self) -> type[RunResultT]: @@ -157,7 +188,10 @@ def _ensure_stop_event_class(self) -> type[RunResultT]: @property def stop_event_class(self) -> type[RunResultT]: - """Returns the StopEvent type used in this workflow.""" + """The `StopEvent` subclass produced by this workflow. + + Determined by inspecting step return annotations. + """ return self._stop_event_class @classmethod @@ -268,7 +302,52 @@ def run( start_event: StartEvent | None = None, **kwargs: Any, ) -> WorkflowHandler: - """Runs the workflow until completion.""" + """Run the workflow and return a handler for results and streaming. + + This schedules the workflow execution in the background and returns a + [WorkflowHandler][workflows.handler.WorkflowHandler] that can be awaited + for the final result or used to stream intermediate events. + + You may pass either a concrete `start_event` instance or keyword + arguments that will be used to construct the inferred + [StartEvent][workflows.events.StartEvent] subclass. + + Args: + ctx (Context | None): Optional context to resume or share state + across runs. If omitted, a fresh context is created. + start_event (StartEvent | None): Optional explicit start event. + **kwargs (Any): Keyword args to initialize the start event when + `start_event` is not provided. + + Returns: + WorkflowHandler: A future-like object to await the final result and + stream events. + + Raises: + WorkflowValidationError: If validation fails and validation is + enabled. + WorkflowRuntimeError: If the start event cannot be created from kwargs. + WorkflowTimeoutError: If execution exceeds the configured timeout. + + Examples: + ```python + # Create and run with kwargs + handler = MyFlow().run(topic="Pirates") + + # Stream events + async for ev in handler.stream_events(): + ... + + # Await final result + result = await handler + ``` + + If you subclassed the start event, you can also directly pass it in: + + ```python + result = await my_workflow.run(start_event=MyStartEvent(topic="Pirates")) + ``` + """ # Validate the workflow self._validate()