diff --git a/README.md b/README.md index 2b8a7bb..a88c267 100644 --- a/README.md +++ b/README.md @@ -16,21 +16,35 @@ Args for endpoints: **Activity execution** ``` -*activity_name* - [string] - REQUIRED -*activity_task_queue* - [string] - REQUIRED -*args* - [ANY] - may be null -*start_to_close_timeout* - [int] - Default is 10 -*execution_timeout* - [int] - Default is 10 -*parent_workflow_id* - [string] - If null, UUID4 will be used +activity_name - [string] - REQUIRED +activity_task_queue - [string] - REQUIRED +args - [ANY] - may be null +start_to_close_timeout - [int] - Default is 10 +schedule_to_start_timeout - [int] - may be null +heartbeat_timeout - [int] - may be null +schedule_to_close_timeout - [int] - may be null +retry_policy - [Object] - may be null +parent_workflow_id - [string] - If null, UUID4 will be used +parent_workflow_execution_timeout - [int] - Default is 10 +parent_workflow_run_timeout - [int] - may be null +parent_workflow_task_timeout - [int] - may be null + +# RetryPolicy object + +initial_interval - [int] - Default is 1 +backoff_coefficient - [float] - Default is 2 +maximum_interval - [int] - may be null +maximum_attempts - [int] - Default is 0 ``` + **Workflow execution** ``` -*workflow_name* - [string] - REQUIRED -*workflow_task_queue* - [string] - REQUIRED -*args* - [ANY] - may be null -*execution_timeout* - [int] - Default is 10 -*workfloCODECOV_TOKENw_id* - [string] - If null, UUID4 will be used +workflow_name - [string] - REQUIRED +workflow_task_queue - [string] - REQUIRED +args - [ANY] - may be null +execution_timeout - [int] - Default is 10 +workflow_id- [string] - If null, UUID4 will be used ``` ### Run diff --git a/src/executor/common.py b/src/executor/common.py new file mode 100644 index 0000000..b661f96 --- /dev/null +++ b/src/executor/common.py @@ -0,0 +1,10 @@ +from datetime import timedelta + + +def schedule_timeout_from_request(value: int | str | None) -> timedelta | None: + if value is None: + return value + elif isinstance(value, int) or value.isdigit(): + return timedelta(seconds=int(value)) + else: + raise ValueError(value) diff --git a/src/executor/main.py b/src/executor/main.py index 4c079e7..09dfc95 100644 --- a/src/executor/main.py +++ b/src/executor/main.py @@ -12,7 +12,10 @@ PROMETHEUS_ENDPOINT_PORT ) from contextlib import asynccontextmanager -from .router import execution_router +from .router import ( + workflow_router, + activity_router +) from rich.console import Console @@ -63,4 +66,5 @@ async def lifespan(app: FastAPI): # pragma: no cover ) -app.include_router(execution_router) +app.include_router(workflow_router) +app.include_router(activity_router) diff --git a/src/executor/router.py b/src/executor/router.py index d8db9b1..86fa400 100644 --- a/src/executor/router.py +++ b/src/executor/router.py @@ -21,9 +21,9 @@ def __init__(self, version: APIVersion | None = None, **kwargs): # pragma: no c self.prefix = f"/v{version}{self.prefix}" -execution_router = APIRouter( - prefix="/execution", - tags=["Temporal execution"], +activity_router = APIRouter( + prefix="/activity", + tags=["Activities"], responses={ 404: {"description": "URL not found"}, 400: {"description": "Bad request"}, @@ -32,15 +32,27 @@ def __init__(self, version: APIVersion | None = None, **kwargs): # pragma: no c ) -execution_router.add_api_route( - "/activity", +workflow_router = APIRouter( + prefix="/workflow", + tags=["Workflows"], + responses={ + 404: {"description": "URL not found"}, + 400: {"description": "Bad request"}, + }, + version=APIVersion(1), +) + + +activity_router.add_api_route( + "/execute", activity_execution, summary="Execute any activity in the namespace", methods=["post"], ) -execution_router.add_api_route( - "/workflow", + +workflow_router.add_api_route( + "/execute", workflow_execution, summary="Execute any workflow in the namespace", methods=["post"], diff --git a/src/executor/schemas.py b/src/executor/schemas.py index bd558ab..0501543 100644 --- a/src/executor/schemas.py +++ b/src/executor/schemas.py @@ -1,20 +1,54 @@ from pydantic import BaseModel from typing import Any from dataclasses import dataclass +from temporalio.common import RetryPolicy +from .common import schedule_timeout_from_request # We use dataclasses instead of Pydantic for workflow args # to avoid issues with workflow-unsafe Pydantic features +@dataclass +class RetryPolicyInput: + initial_interval: int = 1 + backoff_coefficient: float = 2 + maximum_interval: int | None = None + maximum_attempts: int = 0 + + def get_policy(self) -> RetryPolicy: + return RetryPolicy( + initial_interval=schedule_timeout_from_request( + self.initial_interval + ), + backoff_coefficient=self.backoff_coefficient, + maximum_interval=schedule_timeout_from_request( + self.maximum_interval + ), + maximum_attempts=self.maximum_attempts, + ) + + @dataclass class ActivityExecutionInput: activity_name: str activity_task_queue: str args: Any | None = None start_to_close_timeout: int = 10 - execution_timeout: int = 10 parent_workflow_id: str | None = None + schedule_to_start_timeout: int | None = None + heartbeat_timeout: int | None = None + schedule_to_close_timeout: int | None = None + retry_policy: RetryPolicyInput | None = None + parent_workflow_execution_timeout: int = 10 + parent_workflow_run_timeout: int | None = None + parent_workflow_task_timeout: int | None = None + + def get_policy(self) -> RetryPolicy | None: + if self.retry_policy: + return self.retry_policy.get_policy() + else: + return None @dataclass diff --git a/src/executor/temporal.py b/src/executor/temporal.py index ae1cda5..73178c2 100644 --- a/src/executor/temporal.py +++ b/src/executor/temporal.py @@ -18,6 +18,7 @@ import opentelemetry.context from temporalio.contrib.opentelemetry import TracingInterceptor from .telemetry import runtime +from .common import schedule_timeout_from_request async def temporal_client(): # pragma: no cover @@ -42,9 +43,19 @@ async def run(self, payload: ActivityExecutionInput) -> InternalOutput: result = await workflow.execute_activity( activity=payload.activity_name, task_queue=payload.activity_task_queue, - start_to_close_timeout=timedelta( - seconds=payload.start_to_close_timeout + start_to_close_timeout=schedule_timeout_from_request( + payload.start_to_close_timeout ), + schedule_to_start_timeout=schedule_timeout_from_request( + payload.schedule_to_start_timeout + ), + heartbeat_timeout=schedule_timeout_from_request( + payload.heartbeat_timeout + ), + schedule_to_close_timeout=schedule_timeout_from_request( + payload.schedule_to_close_timeout + ), + retry_policy=payload.get_policy(), ) else: args = payload.args @@ -55,9 +66,19 @@ async def run(self, payload: ActivityExecutionInput) -> InternalOutput: result = await workflow.execute_activity( activity=payload.activity_name, task_queue=payload.activity_task_queue, - start_to_close_timeout=timedelta( - seconds=payload.start_to_close_timeout + start_to_close_timeout=schedule_timeout_from_request( + payload.start_to_close_timeout + ), + schedule_to_start_timeout=schedule_timeout_from_request( + payload.schedule_to_start_timeout + ), + heartbeat_timeout=schedule_timeout_from_request( + payload.heartbeat_timeout + ), + schedule_to_close_timeout=schedule_timeout_from_request( + payload.schedule_to_close_timeout ), + retry_policy=payload.get_policy(), args=args, ) except Exception as ex: @@ -91,7 +112,15 @@ async def internal_workflow_execution( payload, id=workflow_id, task_queue=TEMPORAL_INTERNAL_TASK_QUEUE, - execution_timeout=timedelta(seconds=payload.execution_timeout), + execution_timeout=schedule_timeout_from_request( + payload.parent_workflow_execution_timeout + ), + run_timeout=schedule_timeout_from_request( + payload.parent_workflow_execution_timeout + ), + task_timeout=schedule_timeout_from_request( + payload.parent_workflow_execution_timeout + ), ) assert result.success, result.data response.data = result.data diff --git a/src/tests/misc_test.py b/src/tests/misc_test.py new file mode 100644 index 0000000..28af5fd --- /dev/null +++ b/src/tests/misc_test.py @@ -0,0 +1,13 @@ +from executor.common import schedule_timeout_from_request +from datetime import timedelta + + +def test_schedule_timeout_builder(): + + assert schedule_timeout_from_request(None) is None + assert schedule_timeout_from_request(3) == timedelta(seconds=3) + assert schedule_timeout_from_request("3") == timedelta(seconds=3) + try: + schedule_timeout_from_request("foo") + except Exception as ex: + assert isinstance(ex, ValueError) diff --git a/src/tests/neg_temporal_endpoint_test.py b/src/tests/neg_temporal_endpoint_test.py index aba43ef..e996924 100644 --- a/src/tests/neg_temporal_endpoint_test.py +++ b/src/tests/neg_temporal_endpoint_test.py @@ -28,7 +28,7 @@ async def test_unknown_activity(t_client: Client): args={"str_1": "q", "str_2": "w"}, start_to_close_timeout=10, parent_workflow_id="pytest-parent-workflow", - execution_timeout=10, + parent_workflow_execution_timeout=10, ) resp: ExecutionResult = await activity_execution( client=t_client, payload=payload diff --git a/src/tests/temporal_a_execution_test.py b/src/tests/temporal_a_execution_test.py index 8330f69..ccfdcc9 100644 --- a/src/tests/temporal_a_execution_test.py +++ b/src/tests/temporal_a_execution_test.py @@ -11,6 +11,7 @@ ActivityExecutionInput, ExecutionResult, ) +from executor.schemas import RetryPolicyInput @pytest.mark.asyncio @@ -27,7 +28,7 @@ async def test_a_dataclass(t_client: Client): args={"str_1": "q", "str_2": "w"}, start_to_close_timeout=10, parent_workflow_id="pytest-parent-workflow", - execution_timeout=4, + parent_workflow_execution_timeout=4, ) resp: ExecutionResult = await internal_workflow_execution( client=t_client, payload=payload @@ -50,7 +51,7 @@ async def test_a_single_arg(t_client: Client): args="spam", start_to_close_timeout=10, parent_workflow_id="pytest-parent-workflow", - execution_timeout=4, + parent_workflow_execution_timeout=4, ) resp: ExecutionResult = await internal_workflow_execution( client=t_client, payload=payload @@ -73,7 +74,7 @@ async def test_a_no_arg(t_client: Client): args=None, start_to_close_timeout=10, parent_workflow_id="pytest-parent-workflow", - execution_timeout=4, + parent_workflow_execution_timeout=4, ) resp: ExecutionResult = await internal_workflow_execution( client=t_client, payload=payload @@ -96,10 +97,38 @@ async def test_a_seq_arg(t_client: Client): args=["spam", "eggs"], start_to_close_timeout=10, parent_workflow_id="pytest-parent-workflow", - execution_timeout=4, + parent_workflow_execution_timeout=4, ) resp: ExecutionResult = await internal_workflow_execution( client=t_client, payload=payload ) assert resp.success, resp assert resp.data == "spameggs" + + +@pytest.mark.asyncio +async def test_a_retry_policy(t_client: Client): + async with Worker( + t_client, + task_queue=MOCK_QUEUE_NAME, + workflows=test_workflows, + activities=test_activities, + ): + payload: ActivityExecutionInput = ActivityExecutionInput( + activity_name="retry_activity", + activity_task_queue=MOCK_QUEUE_NAME, + args="spam", + start_to_close_timeout=10, + parent_workflow_id="pytest-parent-workflow", + parent_workflow_execution_timeout=60, + retry_policy=RetryPolicyInput( + initial_interval=2, + backoff_coefficient=7, + maximum_attempts=5 + ) + ) + resp: ExecutionResult = await internal_workflow_execution( + client=t_client, payload=payload + ) + assert resp.success, resp + assert resp.data == "spamspam" diff --git a/src/tests/temporal_endpoint_test.py b/src/tests/temporal_endpoint_test.py index 78397b2..7a8937d 100644 --- a/src/tests/temporal_endpoint_test.py +++ b/src/tests/temporal_endpoint_test.py @@ -28,7 +28,7 @@ async def test_a_endpoint(t_client: Client): args={"str_1": "q", "str_2": "w"}, start_to_close_timeout=10, parent_workflow_id="pytest-parent-workflow", - execution_timeout=4, + parent_workflow_execution_timeout=4, ) resp: ExecutionResult = await activity_execution( client=t_client, payload=payload @@ -51,7 +51,7 @@ async def test_a_endpoint_rand_id(t_client: Client): args={"str_1": "q", "str_2": "w"}, start_to_close_timeout=10, parent_workflow_id=None, - execution_timeout=4, + parent_workflow_execution_timeout=4, ) resp: ExecutionResult = await activity_execution( client=t_client, payload=payload diff --git a/src/tests/worker.py b/src/tests/worker.py index c1d2ec3..6505e40 100644 --- a/src/tests/worker.py +++ b/src/tests/worker.py @@ -37,6 +37,12 @@ async def seq_arg_activity(str_1: str, str_2: str) -> str: return str_1 + str_2 +@activity.defn() +async def retry_activity(input: str) -> str: + # await asyncio.sleep(6) + return input + input + + @workflow.defn(sandboxed=False) class DataclassArgWorkflow: @workflow.run @@ -91,6 +97,7 @@ async def run(self, args: Sequence) -> dict: single_arg_activity, no_arg_activity, seq_arg_activity, + retry_activity ]