diff --git a/src/agentex/lib/cli/templates/temporal/README.md.j2 b/src/agentex/lib/cli/templates/temporal/README.md.j2 index f6531978..ee71d02b 100644 --- a/src/agentex/lib/cli/templates/temporal/README.md.j2 +++ b/src/agentex/lib/cli/templates/temporal/README.md.j2 @@ -262,14 +262,30 @@ class MyWorkflow(BaseWorkflow): ``` ### Custom Activities -Add custom activities for external operations: +Add custom activities for external operations. **Important**: Always specify appropriate timeouts (recommended: 10 minutes): ```python # In project/activities.py -@activity.defn +from datetime import timedelta +from temporalio import activity +from temporalio.common import RetryPolicy + +@activity.defn(name="call_external_api") async def call_external_api(data): # HTTP requests, database operations, etc. pass + +# In your workflow, call it with a timeout: +result = await workflow.execute_activity( + "call_external_api", + data, + start_to_close_timeout=timedelta(minutes=10), # Recommended: 10 minute timeout + heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat monitoring + retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry policy +) + +# Don't forget to register your custom activities in run_worker.py: +# all_activities = get_all_activities() + [your_custom_activity_function] ``` ### Integration with External Services diff --git a/src/agentex/lib/cli/templates/temporal/project/activities.py.j2 b/src/agentex/lib/cli/templates/temporal/project/activities.py.j2 new file mode 100644 index 00000000..6144b234 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal/project/activities.py.j2 @@ -0,0 +1,77 @@ +""" +Custom Temporal Activities Template +==================================== +This file is for defining custom Temporal activities that can be executed +by your workflow. Activities are used for: +- External API calls +- Database operations +- File I/O operations +- Heavy computations +- Any non-deterministic operations + +IMPORTANT: All activities should have appropriate timeouts! +Default recommendation: start_to_close_timeout=timedelta(minutes=10) +""" + +from datetime import timedelta +from typing import Any, Dict + +from pydantic import BaseModel +from temporalio import activity +from temporalio.common import RetryPolicy + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + + +# Example activity parameter models +class ExampleActivityParams(BaseModel): + """Parameters for the example activity""" + data: Dict[str, Any] + task_id: str + + +# Example custom activity +@activity.defn(name="example_custom_activity") +async def example_custom_activity(params: ExampleActivityParams) -> Dict[str, Any]: + """ + Example custom activity that demonstrates best practices. + + When calling this activity from your workflow, use: + ```python + result = await workflow.execute_activity( + "example_custom_activity", + ExampleActivityParams(data={"key": "value"}, task_id=task_id), + start_to_close_timeout=timedelta(minutes=10), # Recommended: 10 minute timeout + heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat every minute + retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry up to 3 times + ) + ``` + """ + logger.info(f"Processing activity for task {params.task_id} with data: {params.data}") + + # Your activity logic here + # This could be: + # - API calls + # - Database operations + # - File processing + # - ML model inference + # - etc. + + result = { + "status": "success", + "processed_data": params.data, + "task_id": params.task_id + } + + return result + + +# Add more custom activities below as needed +# Remember to: +# 1. Use appropriate timeouts (default: 10 minutes) +# 2. Define clear parameter models with Pydantic +# 3. Handle errors appropriately +# 4. Use logging for debugging +# 5. Keep activities focused on a single responsibility diff --git a/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 b/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 index 47a2a5d9..1721abac 100644 --- a/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 +++ b/src/agentex/lib/cli/templates/temporal/project/run_worker.py.j2 @@ -22,13 +22,15 @@ async def main(): if task_queue_name is None: raise ValueError("WORKFLOW_TASK_QUEUE is not set") + all_activities = get_all_activities() + [] # add your own activities here + # Create a worker with automatic tracing worker = AgentexWorker( task_queue=task_queue_name, ) await worker.run( - activities=get_all_activities(), + activities=all_activities, workflow={{ workflow_class }}, )