Skip to content

Commit

Permalink
Merge pull request #5 from northpowered/3-extra-fields-in-execution-r…
Browse files Browse the repository at this point in the history
…equests

Extra fields in execution requests
  • Loading branch information
northpowered committed May 3, 2023
2 parents 242782c + 53319d9 commit 5d651bf
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 33 deletions.
36 changes: 25 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,35 @@ Args for endpoints:
**Activity execution**

```
*activity_name* - [string] - REQUIRED
*activity_task_queue* - [string] - REQUIRED
*args* - [ANY] - may be null
*start_to_close_timeout* - [int] - Default is 10
*execution_timeout* - [int] - Default is 10
*parent_workflow_id* - [string] - If null, UUID4 will be used
activity_name - [string] - REQUIRED
activity_task_queue - [string] - REQUIRED
args - [ANY] - may be null
start_to_close_timeout - [int] - Default is 10
schedule_to_start_timeout - [int] - may be null
heartbeat_timeout - [int] - may be null
schedule_to_close_timeout - [int] - may be null
retry_policy - [Object] - may be null
parent_workflow_id - [string] - If null, UUID4 will be used
parent_workflow_execution_timeout - [int] - Default is 10
parent_workflow_run_timeout - [int] - may be null
parent_workflow_task_timeout - [int] - may be null
# RetryPolicy object
initial_interval - [int] - Default is 1
backoff_coefficient - [float] - Default is 2
maximum_interval - [int] - may be null
maximum_attempts - [int] - Default is 0
```


**Workflow execution**
```
*workflow_name* - [string] - REQUIRED
*workflow_task_queue* - [string] - REQUIRED
*args* - [ANY] - may be null
*execution_timeout* - [int] - Default is 10
*workfloCODECOV_TOKENw_id* - [string] - If null, UUID4 will be used
workflow_name - [string] - REQUIRED
workflow_task_queue - [string] - REQUIRED
args - [ANY] - may be null
execution_timeout - [int] - Default is 10
workflow_id- [string] - If null, UUID4 will be used
```
### Run

Expand Down
10 changes: 10 additions & 0 deletions src/executor/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from datetime import timedelta


def schedule_timeout_from_request(value: int | str | None) -> timedelta | None:
if value is None:
return value
elif isinstance(value, int) or value.isdigit():
return timedelta(seconds=int(value))
else:
raise ValueError(value)
8 changes: 6 additions & 2 deletions src/executor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
PROMETHEUS_ENDPOINT_PORT
)
from contextlib import asynccontextmanager
from .router import execution_router
from .router import (
workflow_router,
activity_router
)
from rich.console import Console


Expand Down Expand Up @@ -63,4 +66,5 @@ async def lifespan(app: FastAPI): # pragma: no cover
)


app.include_router(execution_router)
app.include_router(workflow_router)
app.include_router(activity_router)
26 changes: 19 additions & 7 deletions src/executor/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def __init__(self, version: APIVersion | None = None, **kwargs): # pragma: no c
self.prefix = f"/v{version}{self.prefix}"


execution_router = APIRouter(
prefix="/execution",
tags=["Temporal execution"],
activity_router = APIRouter(
prefix="/activity",
tags=["Activities"],
responses={
404: {"description": "URL not found"},
400: {"description": "Bad request"},
Expand All @@ -32,15 +32,27 @@ def __init__(self, version: APIVersion | None = None, **kwargs): # pragma: no c
)


execution_router.add_api_route(
"/activity",
workflow_router = APIRouter(
prefix="/workflow",
tags=["Workflows"],
responses={
404: {"description": "URL not found"},
400: {"description": "Bad request"},
},
version=APIVersion(1),
)


activity_router.add_api_route(
"/execute",
activity_execution,
summary="Execute any activity in the namespace",
methods=["post"],
)

execution_router.add_api_route(
"/workflow",

workflow_router.add_api_route(
"/execute",
workflow_execution,
summary="Execute any workflow in the namespace",
methods=["post"],
Expand Down
36 changes: 35 additions & 1 deletion src/executor/schemas.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,54 @@
from pydantic import BaseModel
from typing import Any
from dataclasses import dataclass
from temporalio.common import RetryPolicy
from .common import schedule_timeout_from_request


# We use dataclasses instead of Pydantic for workflow args
# to avoid issues with workflow-unsafe Pydantic features


@dataclass
class RetryPolicyInput:
initial_interval: int = 1
backoff_coefficient: float = 2
maximum_interval: int | None = None
maximum_attempts: int = 0

def get_policy(self) -> RetryPolicy:
return RetryPolicy(
initial_interval=schedule_timeout_from_request(
self.initial_interval
),
backoff_coefficient=self.backoff_coefficient,
maximum_interval=schedule_timeout_from_request(
self.maximum_interval
),
maximum_attempts=self.maximum_attempts,
)


@dataclass
class ActivityExecutionInput:
activity_name: str
activity_task_queue: str
args: Any | None = None
start_to_close_timeout: int = 10
execution_timeout: int = 10
parent_workflow_id: str | None = None
schedule_to_start_timeout: int | None = None
heartbeat_timeout: int | None = None
schedule_to_close_timeout: int | None = None
retry_policy: RetryPolicyInput | None = None
parent_workflow_execution_timeout: int = 10
parent_workflow_run_timeout: int | None = None
parent_workflow_task_timeout: int | None = None

def get_policy(self) -> RetryPolicy | None:
if self.retry_policy:
return self.retry_policy.get_policy()
else:
return None


@dataclass
Expand Down
39 changes: 34 additions & 5 deletions src/executor/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import opentelemetry.context
from temporalio.contrib.opentelemetry import TracingInterceptor
from .telemetry import runtime
from .common import schedule_timeout_from_request


async def temporal_client(): # pragma: no cover
Expand All @@ -42,9 +43,19 @@ async def run(self, payload: ActivityExecutionInput) -> InternalOutput:
result = await workflow.execute_activity(
activity=payload.activity_name,
task_queue=payload.activity_task_queue,
start_to_close_timeout=timedelta(
seconds=payload.start_to_close_timeout
start_to_close_timeout=schedule_timeout_from_request(
payload.start_to_close_timeout
),
schedule_to_start_timeout=schedule_timeout_from_request(
payload.schedule_to_start_timeout
),
heartbeat_timeout=schedule_timeout_from_request(
payload.heartbeat_timeout
),
schedule_to_close_timeout=schedule_timeout_from_request(
payload.schedule_to_close_timeout
),
retry_policy=payload.get_policy(),
)
else:
args = payload.args
Expand All @@ -55,9 +66,19 @@ async def run(self, payload: ActivityExecutionInput) -> InternalOutput:
result = await workflow.execute_activity(
activity=payload.activity_name,
task_queue=payload.activity_task_queue,
start_to_close_timeout=timedelta(
seconds=payload.start_to_close_timeout
start_to_close_timeout=schedule_timeout_from_request(
payload.start_to_close_timeout
),
schedule_to_start_timeout=schedule_timeout_from_request(
payload.schedule_to_start_timeout
),
heartbeat_timeout=schedule_timeout_from_request(
payload.heartbeat_timeout
),
schedule_to_close_timeout=schedule_timeout_from_request(
payload.schedule_to_close_timeout
),
retry_policy=payload.get_policy(),
args=args,
)
except Exception as ex:
Expand Down Expand Up @@ -91,7 +112,15 @@ async def internal_workflow_execution(
payload,
id=workflow_id,
task_queue=TEMPORAL_INTERNAL_TASK_QUEUE,
execution_timeout=timedelta(seconds=payload.execution_timeout),
execution_timeout=schedule_timeout_from_request(
payload.parent_workflow_execution_timeout
),
run_timeout=schedule_timeout_from_request(
payload.parent_workflow_execution_timeout
),
task_timeout=schedule_timeout_from_request(
payload.parent_workflow_execution_timeout
),
)
assert result.success, result.data
response.data = result.data
Expand Down
13 changes: 13 additions & 0 deletions src/tests/misc_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from executor.common import schedule_timeout_from_request
from datetime import timedelta


def test_schedule_timeout_builder():

assert schedule_timeout_from_request(None) is None
assert schedule_timeout_from_request(3) == timedelta(seconds=3)
assert schedule_timeout_from_request("3") == timedelta(seconds=3)
try:
schedule_timeout_from_request("foo")
except Exception as ex:
assert isinstance(ex, ValueError)
2 changes: 1 addition & 1 deletion src/tests/neg_temporal_endpoint_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def test_unknown_activity(t_client: Client):
args={"str_1": "q", "str_2": "w"},
start_to_close_timeout=10,
parent_workflow_id="pytest-parent-workflow",
execution_timeout=10,
parent_workflow_execution_timeout=10,
)
resp: ExecutionResult = await activity_execution(
client=t_client, payload=payload
Expand Down
37 changes: 33 additions & 4 deletions src/tests/temporal_a_execution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ActivityExecutionInput,
ExecutionResult,
)
from executor.schemas import RetryPolicyInput


@pytest.mark.asyncio
Expand All @@ -27,7 +28,7 @@ async def test_a_dataclass(t_client: Client):
args={"str_1": "q", "str_2": "w"},
start_to_close_timeout=10,
parent_workflow_id="pytest-parent-workflow",
execution_timeout=4,
parent_workflow_execution_timeout=4,
)
resp: ExecutionResult = await internal_workflow_execution(
client=t_client, payload=payload
Expand All @@ -50,7 +51,7 @@ async def test_a_single_arg(t_client: Client):
args="spam",
start_to_close_timeout=10,
parent_workflow_id="pytest-parent-workflow",
execution_timeout=4,
parent_workflow_execution_timeout=4,
)
resp: ExecutionResult = await internal_workflow_execution(
client=t_client, payload=payload
Expand All @@ -73,7 +74,7 @@ async def test_a_no_arg(t_client: Client):
args=None,
start_to_close_timeout=10,
parent_workflow_id="pytest-parent-workflow",
execution_timeout=4,
parent_workflow_execution_timeout=4,
)
resp: ExecutionResult = await internal_workflow_execution(
client=t_client, payload=payload
Expand All @@ -96,10 +97,38 @@ async def test_a_seq_arg(t_client: Client):
args=["spam", "eggs"],
start_to_close_timeout=10,
parent_workflow_id="pytest-parent-workflow",
execution_timeout=4,
parent_workflow_execution_timeout=4,
)
resp: ExecutionResult = await internal_workflow_execution(
client=t_client, payload=payload
)
assert resp.success, resp
assert resp.data == "spameggs"


@pytest.mark.asyncio
async def test_a_retry_policy(t_client: Client):
async with Worker(
t_client,
task_queue=MOCK_QUEUE_NAME,
workflows=test_workflows,
activities=test_activities,
):
payload: ActivityExecutionInput = ActivityExecutionInput(
activity_name="retry_activity",
activity_task_queue=MOCK_QUEUE_NAME,
args="spam",
start_to_close_timeout=10,
parent_workflow_id="pytest-parent-workflow",
parent_workflow_execution_timeout=60,
retry_policy=RetryPolicyInput(
initial_interval=2,
backoff_coefficient=7,
maximum_attempts=5
)
)
resp: ExecutionResult = await internal_workflow_execution(
client=t_client, payload=payload
)
assert resp.success, resp
assert resp.data == "spamspam"
4 changes: 2 additions & 2 deletions src/tests/temporal_endpoint_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def test_a_endpoint(t_client: Client):
args={"str_1": "q", "str_2": "w"},
start_to_close_timeout=10,
parent_workflow_id="pytest-parent-workflow",
execution_timeout=4,
parent_workflow_execution_timeout=4,
)
resp: ExecutionResult = await activity_execution(
client=t_client, payload=payload
Expand All @@ -51,7 +51,7 @@ async def test_a_endpoint_rand_id(t_client: Client):
args={"str_1": "q", "str_2": "w"},
start_to_close_timeout=10,
parent_workflow_id=None,
execution_timeout=4,
parent_workflow_execution_timeout=4,
)
resp: ExecutionResult = await activity_execution(
client=t_client, payload=payload
Expand Down
7 changes: 7 additions & 0 deletions src/tests/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ async def seq_arg_activity(str_1: str, str_2: str) -> str:
return str_1 + str_2


@activity.defn()
async def retry_activity(input: str) -> str:
# await asyncio.sleep(6)
return input + input


@workflow.defn(sandboxed=False)
class DataclassArgWorkflow:
@workflow.run
Expand Down Expand Up @@ -91,6 +97,7 @@ async def run(self, args: Sequence) -> dict:
single_arg_activity,
no_arg_activity,
seq_arg_activity,
retry_activity
]


Expand Down

0 comments on commit 5d651bf

Please sign in to comment.