From 586b94fb93b4e9781163990952df3a2ba2ea5d38 Mon Sep 17 00:00:00 2001 From: Terry Zhao Date: Tue, 23 Sep 2025 15:22:05 -0700 Subject: [PATCH 1/4] server: return ISO 8601 timestamps (Z) for started_at/updated_at/completed_at and update OpenAPI to date-time format --- src/workflows/server/server.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/workflows/server/server.py b/src/workflows/server/server.py index 74fbf9c..fa72c61 100644 --- a/src/workflows/server/server.py +++ b/src/workflows/server/server.py @@ -11,6 +11,7 @@ from importlib.metadata import version from pathlib import Path from typing import Any, AsyncGenerator, TypedDict +from datetime import datetime, timezone import uvicorn from starlette.applications import Starlette @@ -241,9 +242,9 @@ def openapi_schema(self) -> dict: "type": "string", "enum": ["running", "completed", "failed"], }, - "started_at": {"type": "number"}, - "updated_at": {"type": "number", "nullable": True}, - "completed_at": {"type": "number", "nullable": True}, + "started_at": {"type": "string", "format": "date-time"}, + "updated_at": {"type": "string", "format": "date-time", "nullable": True}, + "completed_at": {"type": "string", "format": "date-time", "nullable": True}, "error": {"type": "string", "nullable": True}, "result": {"description": "Workflow result value"}, }, @@ -970,9 +971,9 @@ def to_dict(self) -> HandlerDict: "workflow_name": self.workflow_name, "run_id": self.run_handler.run_id, "status": self.status, - "started_at": self.started_at, - "updated_at": self.updated_at, - "completed_at": self.completed_at, + "started_at": _ts_to_iso(self.started_at), + "updated_at": _ts_to_iso(self.updated_at), + "completed_at": _ts_to_iso(self.completed_at) if self.completed_at is not None else None, "error": self.error, "result": self.result, } @@ -1026,6 +1027,11 @@ async def iter_events(self) -> AsyncGenerator[Event, None]: break +def _ts_to_iso(ts: float) -> str: + dt = datetime.fromtimestamp(ts, tz=timezone.utc) + return dt.isoformat().replace("+00:00", "Z") + + @dataclass class _NamedWorkflow: name: str From 23e7b4ba6ba99a72e70101697ecba52deb40c171 Mon Sep 17 00:00:00 2001 From: Terry Zhao Date: Tue, 23 Sep 2025 15:25:14 -0700 Subject: [PATCH 2/4] server: format timestamps with strftime('%Y-%m-%dT%H:%M:%SZ') for canonical ISO-8601Z output --- src/workflows/server/server.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/workflows/server/server.py b/src/workflows/server/server.py index fa72c61..80ad1d0 100644 --- a/src/workflows/server/server.py +++ b/src/workflows/server/server.py @@ -1028,8 +1028,7 @@ async def iter_events(self) -> AsyncGenerator[Event, None]: def _ts_to_iso(ts: float) -> str: - dt = datetime.fromtimestamp(ts, tz=timezone.utc) - return dt.isoformat().replace("+00:00", "Z") + return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') @dataclass From e5123107fdfbb6d9c6a6020d008e585ce32f9c69 Mon Sep 17 00:00:00 2001 From: Terry Zhao Date: Tue, 23 Sep 2025 15:38:27 -0700 Subject: [PATCH 3/4] tests(server): add datetime progression test for handler started_at/updated_at/completed_at --- src/workflows/server/server.py | 53 +++++++++++---------------- tests/server/test_server_endpoints.py | 51 ++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 31 deletions(-) diff --git a/src/workflows/server/server.py b/src/workflows/server/server.py index 80ad1d0..916fc4f 100644 --- a/src/workflows/server/server.py +++ b/src/workflows/server/server.py @@ -5,7 +5,6 @@ import asyncio from contextlib import asynccontextmanager from dataclasses import dataclass -import time import json import logging from importlib.metadata import version @@ -56,9 +55,9 @@ class HandlerDict(TypedDict): error: str | None result: RunResultT | None status: Status - started_at: float - updated_at: float | None - completed_at: float | None + started_at: str + updated_at: str | None + completed_at: str | None class WorkflowServer: @@ -697,10 +696,7 @@ async def _get_handlers(self, request: Request) -> JSONResponse: schema: $ref: '#/components/schemas/HandlersList' """ - items: list[HandlerDict] = [] - for wrapper in self._handlers.values(): - items.append(wrapper.to_dict()) - + items = [wrapper.to_dict() for wrapper in self._handlers.values()] return JSONResponse({"handlers": items}) async def _post_event(self, request: Request) -> JSONResponse: @@ -916,11 +912,11 @@ async def checkpoint(status: Status) -> None: continue await checkpoint("running") - wrapper.updated_at = time.time() + wrapper.updated_at = datetime.now(timezone.utc) queue.put_nowait(event) # done when stream events are complete status: Status = "completed" - wrapper.completed_at = time.time() + wrapper.completed_at = datetime.now(timezone.utc) try: await handler except Exception as e: @@ -942,8 +938,8 @@ async def checkpoint(status: Status) -> None: task=task, handler_id=handler_id, workflow_name=workflow_name, - started_at=time.time(), - updated_at=time.time(), + started_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), completed_at=None, ) self._handlers[handler_id] = wrapper @@ -961,22 +957,22 @@ class _WorkflowHandler: # metadata handler_id: str workflow_name: str - started_at: float - updated_at: float - completed_at: float | None + started_at: datetime + updated_at: datetime + completed_at: datetime | None def to_dict(self) -> HandlerDict: - return { - "handler_id": self.handler_id, - "workflow_name": self.workflow_name, - "run_id": self.run_handler.run_id, - "status": self.status, - "started_at": _ts_to_iso(self.started_at), - "updated_at": _ts_to_iso(self.updated_at), - "completed_at": _ts_to_iso(self.completed_at) if self.completed_at is not None else None, - "error": self.error, - "result": self.result, - } + return HandlerDict( + handler_id=self.handler_id, + workflow_name=self.workflow_name, + run_id=self.run_handler.run_id, + status=self.status, + started_at=self.started_at.isoformat(), + updated_at=self.updated_at.isoformat(), + completed_at=self.completed_at.isoformat() if self.completed_at is not None else None, + error=self.error, + result=self.result, + ) @property def status(self) -> Status: @@ -1026,11 +1022,6 @@ async def iter_events(self) -> AsyncGenerator[Event, None]: queue_get_task.cancel() break - -def _ts_to_iso(ts: float) -> str: - return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') - - @dataclass class _NamedWorkflow: name: str diff --git a/tests/server/test_server_endpoints.py b/tests/server/test_server_endpoints.py index 8ff5987..4d9b4af 100644 --- a/tests/server/test_server_endpoints.py +++ b/tests/server/test_server_endpoints.py @@ -15,6 +15,7 @@ from workflows import Context from workflows.server import WorkflowServer from workflows.workflow import Workflow +from datetime import datetime # Prepare the event to send from workflows.context.serializers import JsonSerializer @@ -696,3 +697,53 @@ async def test_post_event_missing_event_data(client: AsyncClient) -> None: response = await client.post(f"/events/{handler_id}", json={}) assert response.status_code == 400 assert "Event data is required" in response.text + + +@pytest.mark.asyncio +async def test_handler_datetime_fields_progress(client: AsyncClient) -> None: + # Start interactive workflow which waits for an external event + response = await client.post("/workflows/interactive/run-nowait", json={}) + assert response.status_code == 200 + handler_id = response.json()["handler_id"] + + # Snapshot initial times + resp = await client.get("/handlers") + assert resp.status_code == 200 + handlers = resp.json()["handlers"] + item = next(h for h in handlers if h["handler_id"] == handler_id) + started_at_1 = datetime.fromisoformat(item["started_at"]) # ISO 8601 + updated_at_1 = datetime.fromisoformat(item["updated_at"]) # ISO 8601 + assert started_at_1 <= updated_at_1 + assert item["completed_at"] is None + + # Send an external event to progress the workflow and update timestamps + await asyncio.sleep(0.01) + serializer = JsonSerializer() + event = ExternalEvent(response="ts-check") + event_str = serializer.serialize(event) + send = await client.post(f"/events/{handler_id}", json={"event": event_str}) + assert send.status_code == 200 + + # Check updated_at increased + resp2 = await client.get("/handlers") + assert resp2.status_code == 200 + item2 = next(h for h in resp2.json()["handlers"] if h["handler_id"] == handler_id) + updated_at_2 = datetime.fromisoformat(item2["updated_at"]) # ISO 8601 + assert updated_at_2 >= updated_at_1 + + # Wait for completion and check completed_at + async def _wait_done(): + r = await client.get(f"/results/{handler_id}") + if r.status_code == 200: + return r + raise AssertionError("not done") + + await wait_for_passing(_wait_done) + + resp3 = await client.get("/handlers") + item3 = next(h for h in resp3.json()["handlers"] if h["handler_id"] == handler_id) + assert item3["status"] in {"completed", "failed"} + if item3["status"] == "completed": + assert item3["completed_at"] is not None + completed_at = datetime.fromisoformat(item3["completed_at"]) # ISO 8601 + assert completed_at >= updated_at_2 From 5366d41acc68292f4fee7ae0dd45fb59223a83c7 Mon Sep 17 00:00:00 2001 From: Terry Zhao Date: Tue, 23 Sep 2025 15:44:39 -0700 Subject: [PATCH 4/4] fix lint --- src/workflows/server/server.py | 17 ++++++++++++++--- tests/server/test_server_endpoints.py | 4 ++-- uv.lock | 4 ++-- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/workflows/server/server.py b/src/workflows/server/server.py index 916fc4f..997a7bc 100644 --- a/src/workflows/server/server.py +++ b/src/workflows/server/server.py @@ -242,8 +242,16 @@ def openapi_schema(self) -> dict: "enum": ["running", "completed", "failed"], }, "started_at": {"type": "string", "format": "date-time"}, - "updated_at": {"type": "string", "format": "date-time", "nullable": True}, - "completed_at": {"type": "string", "format": "date-time", "nullable": True}, + "updated_at": { + "type": "string", + "format": "date-time", + "nullable": True, + }, + "completed_at": { + "type": "string", + "format": "date-time", + "nullable": True, + }, "error": {"type": "string", "nullable": True}, "result": {"description": "Workflow result value"}, }, @@ -969,7 +977,9 @@ def to_dict(self) -> HandlerDict: status=self.status, started_at=self.started_at.isoformat(), updated_at=self.updated_at.isoformat(), - completed_at=self.completed_at.isoformat() if self.completed_at is not None else None, + completed_at=self.completed_at.isoformat() + if self.completed_at is not None + else None, error=self.error, result=self.result, ) @@ -1022,6 +1032,7 @@ async def iter_events(self) -> AsyncGenerator[Event, None]: queue_get_task.cancel() break + @dataclass class _NamedWorkflow: name: str diff --git a/tests/server/test_server_endpoints.py b/tests/server/test_server_endpoints.py index 4d9b4af..4e4d1c2 100644 --- a/tests/server/test_server_endpoints.py +++ b/tests/server/test_server_endpoints.py @@ -9,7 +9,7 @@ import pytest import pytest_asyncio -from httpx import ASGITransport, AsyncClient +from httpx import ASGITransport, AsyncClient, Response from tests.server.util import wait_for_passing from workflows import Context @@ -732,7 +732,7 @@ async def test_handler_datetime_fields_progress(client: AsyncClient) -> None: assert updated_at_2 >= updated_at_1 # Wait for completion and check completed_at - async def _wait_done(): + async def _wait_done() -> Response: r = await client.get(f"/results/{handler_id}") if r.status_code == 200: return r diff --git a/uv.lock b/uv.lock index 2c1052a..f9ec23b 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.9" resolution-markers = [ "python_full_version >= '3.10'", @@ -531,7 +531,7 @@ wheels = [ [[package]] name = "llama-index-workflows" -version = "2.2.2" +version = "2.3.0" source = { editable = "." } dependencies = [ { name = "eval-type-backport", marker = "python_full_version < '3.10'" },