From 765825680132677ea0351f2a9410f472ee754906 Mon Sep 17 00:00:00 2001 From: Eugenia Guerrero Quintana Date: Mon, 1 Sep 2025 18:25:36 -0600 Subject: [PATCH 1/2] feat(templates): add custom activity timeout guidance for temporal agents - Add activities.py.j2 template with 10-minute timeout examples - Auto-detect custom activities in run_worker.py.j2 - Update documentation with timeout best practices --- .../lib/cli/templates/temporal/README.md.j2 | 17 +++- .../temporal/project/activities.py.j2 | 77 +++++++++++++++++++ .../temporal/project/run_worker.py.j2 | 33 +++++++- 3 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 src/agentex/lib/cli/templates/temporal/project/activities.py.j2 diff --git a/src/agentex/lib/cli/templates/temporal/README.md.j2 b/src/agentex/lib/cli/templates/temporal/README.md.j2 index f6531978..1f306ecf 100644 --- a/src/agentex/lib/cli/templates/temporal/README.md.j2 +++ b/src/agentex/lib/cli/templates/temporal/README.md.j2 @@ -262,14 +262,27 @@ class MyWorkflow(BaseWorkflow): ``` ### Custom Activities -Add custom activities for external operations: +Add custom activities for external operations. **Important**: Always specify appropriate timeouts (recommended: 10 minutes): ```python # In project/activities.py -@activity.defn +from datetime import timedelta +from temporalio import activity +from temporalio.common import RetryPolicy + +@activity.defn(name="call_external_api") async def call_external_api(data): # HTTP requests, database operations, etc. pass + +# In your workflow, call it with a timeout: +result = await workflow.execute_activity( + "call_external_api", + data, + start_to_close_timeout=timedelta(minutes=10), # Recommended: 10 minute timeout + heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat monitoring + retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry policy +) ``` ### Integration with External Services diff --git a/src/agentex/lib/cli/templates/temporal/project/activities.py.j2 b/src/agentex/lib/cli/templates/temporal/project/activities.py.j2 new file mode 100644 index 00000000..6144b234 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal/project/activities.py.j2 @@ -0,0 +1,77 @@ +""" +Custom Temporal Activities Template +==================================== +This file is for defining custom Temporal activities that can be executed +by your workflow. Activities are used for: +- External API calls +- Database operations +- File I/O operations +- Heavy computations +- Any non-deterministic operations + +IMPORTANT: All activities should have appropriate timeouts! +Default recommendation: start_to_close_timeout=timedelta(minutes=10) +""" + +from datetime import timedelta +from typing import Any, Dict + +from pydantic import BaseModel +from temporalio import activity +from temporalio.common import RetryPolicy + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + + +# Example activity parameter models +class ExampleActivityParams(BaseModel): + """Parameters for the example activity""" + data: Dict[str, Any] + task_id: str + + +# Example custom activity +@activity.defn(name="example_custom_activity") +async def example_custom_activity(params: ExampleActivityParams) -> Dict[str, Any]: + """ + Example custom activity that demonstrates best practices. + + When calling this activity from your workflow, use: + ```python + result = await workflow.execute_activity( + "example_custom_activity", + ExampleActivityParams(data={"key": "value"}, task_id=task_id), + start_to_close_timeout=timedelta(minutes=10), # Recommended: 10 minute timeout + heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat every minute + retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry up to 3 times + ) + ``` + """ + logger.info(f"Processing activity for task {params.task_id} with data: {params.data}") + + # Your activity logic here + # This could be: + # - API calls + # - Database operations + # - File processing + # - ML model inference + # - etc. + + result = { + "status": "success", + "processed_data": params.data, + "task_id": params.task_id + } + + return result + + +# Add more custom activities below as needed +# Remember to: +# 1. Use appropriate timeouts (default: 10 minutes) +# 2. Define clear parameter models with Pydantic +# 3. Handle errors appropriately +# 4. Use logging for debugging +# 5. Keep activities focused on a single responsibility diff --git a/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 b/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 index 47a2a5d9..bdff0718 100644 --- a/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 +++ b/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 @@ -1,4 +1,7 @@ import asyncio +import importlib.util +import sys +from pathlib import Path from agentex.lib.core.temporal.activities import get_all_activities from agentex.lib.core.temporal.workers.worker import AgentexWorker @@ -14,6 +17,29 @@ environment_variables = EnvironmentVariables.refresh() logger = make_logger(__name__) +def load_custom_activities(): + """Load custom activities from project/activities.py if it exists""" + activities_path = Path(__file__).parent / "activities.py" + custom_activities = [] + + if activities_path.exists(): + logger.info(f"Loading custom activities from {activities_path}") + spec = importlib.util.spec_from_file_location("project.activities", activities_path) + if spec and spec.loader: + module = importlib.util.module_from_spec(spec) + sys.modules["project.activities"] = module + spec.loader.exec_module(module) + + # Find all activity functions (decorated with @activity.defn) + for name in dir(module): + obj = getattr(module, name) + if callable(obj) and hasattr(obj, "__temporal_activity_definition__"): + custom_activities.append(obj) + logger.info(f" - Registered custom activity: {name}") + + return custom_activities + + async def main(): # Setup debug mode if enabled setup_debug_if_enabled() @@ -22,13 +48,18 @@ async def main(): if task_queue_name is None: raise ValueError("WORKFLOW_TASK_QUEUE is not set") + # Load all activities (both core and custom) + all_activities = get_all_activities() + custom_activities = load_custom_activities() + all_activities.extend(custom_activities) + # Create a worker with automatic tracing worker = AgentexWorker( task_queue=task_queue_name, ) await worker.run( - activities=get_all_activities(), + activities=all_activities, workflow={{ workflow_class }}, ) From ffe0d78f2ced2b19aad42eaf794e7b98b665c4ac Mon Sep 17 00:00:00 2001 From: Eugenia Guerrero Quintana Date: Tue, 2 Sep 2025 11:31:58 -0600 Subject: [PATCH 2/2] simplify --- .../lib/cli/templates/temporal/README.md.j2 | 3 ++ .../temporal/project/run_worker.py.j2 | 31 +------------------ 2 files changed, 4 insertions(+), 30 deletions(-) diff --git a/src/agentex/lib/cli/templates/temporal/README.md.j2 b/src/agentex/lib/cli/templates/temporal/README.md.j2 index 1f306ecf..ee71d02b 100644 --- a/src/agentex/lib/cli/templates/temporal/README.md.j2 +++ b/src/agentex/lib/cli/templates/temporal/README.md.j2 @@ -283,6 +283,9 @@ result = await workflow.execute_activity( heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat monitoring retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry policy ) + +# Don't forget to register your custom activities in run_worker.py: +# all_activities = get_all_activities() + [your_custom_activity_function] ``` ### Integration with External Services diff --git a/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 b/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 index bdff0718..1721abac 100644 --- a/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 +++ b/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 @@ -1,7 +1,4 @@ import asyncio -import importlib.util -import sys -from pathlib import Path from agentex.lib.core.temporal.activities import get_all_activities from agentex.lib.core.temporal.workers.worker import AgentexWorker @@ -17,29 +14,6 @@ environment_variables = EnvironmentVariables.refresh() logger = make_logger(__name__) -def load_custom_activities(): - """Load custom activities from project/activities.py if it exists""" - activities_path = Path(__file__).parent / "activities.py" - custom_activities = [] - - if activities_path.exists(): - logger.info(f"Loading custom activities from {activities_path}") - spec = importlib.util.spec_from_file_location("project.activities", activities_path) - if spec and spec.loader: - module = importlib.util.module_from_spec(spec) - sys.modules["project.activities"] = module - spec.loader.exec_module(module) - - # Find all activity functions (decorated with @activity.defn) - for name in dir(module): - obj = getattr(module, name) - if callable(obj) and hasattr(obj, "__temporal_activity_definition__"): - custom_activities.append(obj) - logger.info(f" - Registered custom activity: {name}") - - return custom_activities - - async def main(): # Setup debug mode if enabled setup_debug_if_enabled() @@ -48,10 +22,7 @@ async def main(): if task_queue_name is None: raise ValueError("WORKFLOW_TASK_QUEUE is not set") - # Load all activities (both core and custom) - all_activities = get_all_activities() - custom_activities = load_custom_activities() - all_activities.extend(custom_activities) + all_activities = get_all_activities() + [] # add your own activities here # Create a worker with automatic tracing worker = AgentexWorker(