diff --git a/src/workflows/server/server.py b/src/workflows/server/server.py index 74fbf9c..997a7bc 100644 --- a/src/workflows/server/server.py +++ b/src/workflows/server/server.py @@ -5,12 +5,12 @@ import asyncio from contextlib import asynccontextmanager from dataclasses import dataclass -import time import json import logging from importlib.metadata import version from pathlib import Path from typing import Any, AsyncGenerator, TypedDict +from datetime import datetime, timezone import uvicorn from starlette.applications import Starlette @@ -55,9 +55,9 @@ class HandlerDict(TypedDict): error: str | None result: RunResultT | None status: Status - started_at: float - updated_at: float | None - completed_at: float | None + started_at: str + updated_at: str | None + completed_at: str | None class WorkflowServer: @@ -241,9 +241,17 @@ def openapi_schema(self) -> dict: "type": "string", "enum": ["running", "completed", "failed"], }, - "started_at": {"type": "number"}, - "updated_at": {"type": "number", "nullable": True}, - "completed_at": {"type": "number", "nullable": True}, + "started_at": {"type": "string", "format": "date-time"}, + "updated_at": { + "type": "string", + "format": "date-time", + "nullable": True, + }, + "completed_at": { + "type": "string", + "format": "date-time", + "nullable": True, + }, "error": {"type": "string", "nullable": True}, "result": {"description": "Workflow result value"}, }, @@ -696,10 +704,7 @@ async def _get_handlers(self, request: Request) -> JSONResponse: schema: $ref: '#/components/schemas/HandlersList' """ - items: list[HandlerDict] = [] - for wrapper in self._handlers.values(): - items.append(wrapper.to_dict()) - + items = [wrapper.to_dict() for wrapper in self._handlers.values()] return JSONResponse({"handlers": items}) async def _post_event(self, request: Request) -> JSONResponse: @@ -915,11 +920,11 @@ async def checkpoint(status: Status) -> None: continue await checkpoint("running") - wrapper.updated_at = time.time() + wrapper.updated_at = datetime.now(timezone.utc) queue.put_nowait(event) # done when stream events are complete status: Status = "completed" - wrapper.completed_at = time.time() + wrapper.completed_at = datetime.now(timezone.utc) try: await handler except Exception as e: @@ -941,8 +946,8 @@ async def checkpoint(status: Status) -> None: task=task, handler_id=handler_id, workflow_name=workflow_name, - started_at=time.time(), - updated_at=time.time(), + started_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), completed_at=None, ) self._handlers[handler_id] = wrapper @@ -960,22 +965,24 @@ class _WorkflowHandler: # metadata handler_id: str workflow_name: str - started_at: float - updated_at: float - completed_at: float | None + started_at: datetime + updated_at: datetime + completed_at: datetime | None def to_dict(self) -> HandlerDict: - return { - "handler_id": self.handler_id, - "workflow_name": self.workflow_name, - "run_id": self.run_handler.run_id, - "status": self.status, - "started_at": self.started_at, - "updated_at": self.updated_at, - "completed_at": self.completed_at, - "error": self.error, - "result": self.result, - } + return HandlerDict( + handler_id=self.handler_id, + workflow_name=self.workflow_name, + run_id=self.run_handler.run_id, + status=self.status, + started_at=self.started_at.isoformat(), + updated_at=self.updated_at.isoformat(), + completed_at=self.completed_at.isoformat() + if self.completed_at is not None + else None, + error=self.error, + result=self.result, + ) @property def status(self) -> Status: diff --git a/tests/server/test_server_endpoints.py b/tests/server/test_server_endpoints.py index 8ff5987..4e4d1c2 100644 --- a/tests/server/test_server_endpoints.py +++ b/tests/server/test_server_endpoints.py @@ -9,12 +9,13 @@ import pytest import pytest_asyncio -from httpx import ASGITransport, AsyncClient +from httpx import ASGITransport, AsyncClient, Response from tests.server.util import wait_for_passing from workflows import Context from workflows.server import WorkflowServer from workflows.workflow import Workflow +from datetime import datetime # Prepare the event to send from workflows.context.serializers import JsonSerializer @@ -696,3 +697,53 @@ async def test_post_event_missing_event_data(client: AsyncClient) -> None: response = await client.post(f"/events/{handler_id}", json={}) assert response.status_code == 400 assert "Event data is required" in response.text + + +@pytest.mark.asyncio +async def test_handler_datetime_fields_progress(client: AsyncClient) -> None: + # Start interactive workflow which waits for an external event + response = await client.post("/workflows/interactive/run-nowait", json={}) + assert response.status_code == 200 + handler_id = response.json()["handler_id"] + + # Snapshot initial times + resp = await client.get("/handlers") + assert resp.status_code == 200 + handlers = resp.json()["handlers"] + item = next(h for h in handlers if h["handler_id"] == handler_id) + started_at_1 = datetime.fromisoformat(item["started_at"]) # ISO 8601 + updated_at_1 = datetime.fromisoformat(item["updated_at"]) # ISO 8601 + assert started_at_1 <= updated_at_1 + assert item["completed_at"] is None + + # Send an external event to progress the workflow and update timestamps + await asyncio.sleep(0.01) + serializer = JsonSerializer() + event = ExternalEvent(response="ts-check") + event_str = serializer.serialize(event) + send = await client.post(f"/events/{handler_id}", json={"event": event_str}) + assert send.status_code == 200 + + # Check updated_at increased + resp2 = await client.get("/handlers") + assert resp2.status_code == 200 + item2 = next(h for h in resp2.json()["handlers"] if h["handler_id"] == handler_id) + updated_at_2 = datetime.fromisoformat(item2["updated_at"]) # ISO 8601 + assert updated_at_2 >= updated_at_1 + + # Wait for completion and check completed_at + async def _wait_done() -> Response: + r = await client.get(f"/results/{handler_id}") + if r.status_code == 200: + return r + raise AssertionError("not done") + + await wait_for_passing(_wait_done) + + resp3 = await client.get("/handlers") + item3 = next(h for h in resp3.json()["handlers"] if h["handler_id"] == handler_id) + assert item3["status"] in {"completed", "failed"} + if item3["status"] == "completed": + assert item3["completed_at"] is not None + completed_at = datetime.fromisoformat(item3["completed_at"]) # ISO 8601 + assert completed_at >= updated_at_2 diff --git a/uv.lock b/uv.lock index 2c1052a..f9ec23b 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.9" resolution-markers = [ "python_full_version >= '3.10'", @@ -531,7 +531,7 @@ wheels = [ [[package]] name = "llama-index-workflows" -version = "2.2.2" +version = "2.3.0" source = { editable = "." } dependencies = [ { name = "eval-type-backport", marker = "python_full_version < '3.10'" },