Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions src/workflows/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import asyncio
from contextlib import asynccontextmanager
from dataclasses import dataclass
import time
import json
import logging
from importlib.metadata import version
from pathlib import Path
from typing import Any, AsyncGenerator, TypedDict
from datetime import datetime, timezone

import uvicorn
from starlette.applications import Starlette
Expand Down Expand Up @@ -55,9 +55,9 @@ class HandlerDict(TypedDict):
error: str | None
result: RunResultT | None
status: Status
started_at: float
updated_at: float | None
completed_at: float | None
started_at: str
updated_at: str | None
completed_at: str | None


class WorkflowServer:
Expand Down Expand Up @@ -241,9 +241,17 @@ def openapi_schema(self) -> dict:
"type": "string",
"enum": ["running", "completed", "failed"],
},
"started_at": {"type": "number"},
"updated_at": {"type": "number", "nullable": True},
"completed_at": {"type": "number", "nullable": True},
"started_at": {"type": "string", "format": "date-time"},
"updated_at": {
"type": "string",
"format": "date-time",
"nullable": True,
},
"completed_at": {
"type": "string",
"format": "date-time",
"nullable": True,
},
"error": {"type": "string", "nullable": True},
"result": {"description": "Workflow result value"},
},
Expand Down Expand Up @@ -696,10 +704,7 @@ async def _get_handlers(self, request: Request) -> JSONResponse:
schema:
$ref: '#/components/schemas/HandlersList'
"""
items: list[HandlerDict] = []
for wrapper in self._handlers.values():
items.append(wrapper.to_dict())

items = [wrapper.to_dict() for wrapper in self._handlers.values()]
return JSONResponse({"handlers": items})

async def _post_event(self, request: Request) -> JSONResponse:
Expand Down Expand Up @@ -915,11 +920,11 @@ async def checkpoint(status: Status) -> None:
continue
await checkpoint("running")

wrapper.updated_at = time.time()
wrapper.updated_at = datetime.now(timezone.utc)
queue.put_nowait(event)
# done when stream events are complete
status: Status = "completed"
wrapper.completed_at = time.time()
wrapper.completed_at = datetime.now(timezone.utc)
try:
await handler
except Exception as e:
Expand All @@ -941,8 +946,8 @@ async def checkpoint(status: Status) -> None:
task=task,
handler_id=handler_id,
workflow_name=workflow_name,
started_at=time.time(),
updated_at=time.time(),
started_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
completed_at=None,
)
self._handlers[handler_id] = wrapper
Expand All @@ -960,22 +965,24 @@ class _WorkflowHandler:
# metadata
handler_id: str
workflow_name: str
started_at: float
updated_at: float
completed_at: float | None
started_at: datetime
updated_at: datetime
completed_at: datetime | None

def to_dict(self) -> HandlerDict:
return {
"handler_id": self.handler_id,
"workflow_name": self.workflow_name,
"run_id": self.run_handler.run_id,
"status": self.status,
"started_at": self.started_at,
"updated_at": self.updated_at,
"completed_at": self.completed_at,
"error": self.error,
"result": self.result,
}
return HandlerDict(
handler_id=self.handler_id,
workflow_name=self.workflow_name,
run_id=self.run_handler.run_id,
status=self.status,
started_at=self.started_at.isoformat(),
updated_at=self.updated_at.isoformat(),
completed_at=self.completed_at.isoformat()
if self.completed_at is not None
else None,
error=self.error,
result=self.result,
)

@property
def status(self) -> Status:
Expand Down
53 changes: 52 additions & 1 deletion tests/server/test_server_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

import pytest
import pytest_asyncio
from httpx import ASGITransport, AsyncClient
from httpx import ASGITransport, AsyncClient, Response

from tests.server.util import wait_for_passing
from workflows import Context
from workflows.server import WorkflowServer
from workflows.workflow import Workflow
from datetime import datetime

# Prepare the event to send
from workflows.context.serializers import JsonSerializer
Expand Down Expand Up @@ -696,3 +697,53 @@ async def test_post_event_missing_event_data(client: AsyncClient) -> None:
response = await client.post(f"/events/{handler_id}", json={})
assert response.status_code == 400
assert "Event data is required" in response.text


@pytest.mark.asyncio
async def test_handler_datetime_fields_progress(client: AsyncClient) -> None:
# Start interactive workflow which waits for an external event
response = await client.post("/workflows/interactive/run-nowait", json={})
assert response.status_code == 200
handler_id = response.json()["handler_id"]

# Snapshot initial times
resp = await client.get("/handlers")
assert resp.status_code == 200
handlers = resp.json()["handlers"]
item = next(h for h in handlers if h["handler_id"] == handler_id)
started_at_1 = datetime.fromisoformat(item["started_at"]) # ISO 8601
updated_at_1 = datetime.fromisoformat(item["updated_at"]) # ISO 8601
assert started_at_1 <= updated_at_1
assert item["completed_at"] is None

# Send an external event to progress the workflow and update timestamps
await asyncio.sleep(0.01)
serializer = JsonSerializer()
event = ExternalEvent(response="ts-check")
event_str = serializer.serialize(event)
send = await client.post(f"/events/{handler_id}", json={"event": event_str})
assert send.status_code == 200

# Check updated_at increased
resp2 = await client.get("/handlers")
assert resp2.status_code == 200
item2 = next(h for h in resp2.json()["handlers"] if h["handler_id"] == handler_id)
updated_at_2 = datetime.fromisoformat(item2["updated_at"]) # ISO 8601
assert updated_at_2 >= updated_at_1

# Wait for completion and check completed_at
async def _wait_done() -> Response:
r = await client.get(f"/results/{handler_id}")
if r.status_code == 200:
return r
raise AssertionError("not done")

await wait_for_passing(_wait_done)

resp3 = await client.get("/handlers")
item3 = next(h for h in resp3.json()["handlers"] if h["handler_id"] == handler_id)
assert item3["status"] in {"completed", "failed"}
if item3["status"] == "completed":
assert item3["completed_at"] is not None
completed_at = datetime.fromisoformat(item3["completed_at"]) # ISO 8601
assert completed_at >= updated_at_2
4 changes: 2 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.