Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/agentex/lib/cli/templates/temporal/README.md.j2
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,30 @@ class MyWorkflow(BaseWorkflow):
```

### Custom Activities
Add custom activities for external operations:
Add custom activities for external operations. **Important**: Always specify appropriate timeouts (recommended: 10 minutes):

```python
# In project/activities.py
@activity.defn
from datetime import timedelta
from temporalio import activity
from temporalio.common import RetryPolicy

@activity.defn(name="call_external_api")
async def call_external_api(data):
# HTTP requests, database operations, etc.
pass

# In your workflow, call it with a timeout:
result = await workflow.execute_activity(
"call_external_api",
data,
start_to_close_timeout=timedelta(minutes=10), # Recommended: 10 minute timeout
heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat monitoring
retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry policy
)

# Don't forget to register your custom activities in run_worker.py:
# all_activities = get_all_activities() + [your_custom_activity_function]
```

### Integration with External Services
Expand Down
77 changes: 77 additions & 0 deletions src/agentex/lib/cli/templates/temporal/project/activities.py.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
Custom Temporal Activities Template
====================================
This file is for defining custom Temporal activities that can be executed
by your workflow. Activities are used for:
- External API calls
- Database operations
- File I/O operations
- Heavy computations
- Any non-deterministic operations

IMPORTANT: All activities should have appropriate timeouts!
Default recommendation: start_to_close_timeout=timedelta(minutes=10)
"""

from datetime import timedelta
from typing import Any, Dict

from pydantic import BaseModel
from temporalio import activity
from temporalio.common import RetryPolicy

from agentex.lib.utils.logging import make_logger

logger = make_logger(__name__)


# Example activity parameter models
class ExampleActivityParams(BaseModel):
"""Parameters for the example activity"""
data: Dict[str, Any]
task_id: str


# Example custom activity
@activity.defn(name="example_custom_activity")
async def example_custom_activity(params: ExampleActivityParams) -> Dict[str, Any]:
"""
Example custom activity that demonstrates best practices.

When calling this activity from your workflow, use:
```python
result = await workflow.execute_activity(
"example_custom_activity",
ExampleActivityParams(data={"key": "value"}, task_id=task_id),
start_to_close_timeout=timedelta(minutes=10), # Recommended: 10 minute timeout
heartbeat_timeout=timedelta(minutes=1), # Optional: heartbeat every minute
retry_policy=RetryPolicy(maximum_attempts=3) # Optional: retry up to 3 times
)
```
"""
logger.info(f"Processing activity for task {params.task_id} with data: {params.data}")

# Your activity logic here
# This could be:
# - API calls
# - Database operations
# - File processing
# - ML model inference
# - etc.

result = {
"status": "success",
"processed_data": params.data,
"task_id": params.task_id
}

return result


# Add more custom activities below as needed
# Remember to:
# 1. Use appropriate timeouts (default: 10 minutes)
# 2. Define clear parameter models with Pydantic
# 3. Handle errors appropriately
# 4. Use logging for debugging
# 5. Keep activities focused on a single responsibility
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ async def main():
if task_queue_name is None:
raise ValueError("WORKFLOW_TASK_QUEUE is not set")

all_activities = get_all_activities() + [] # add your own activities here

# Create a worker with automatic tracing
worker = AgentexWorker(
task_queue=task_queue_name,
)

await worker.run(
activities=get_all_activities(),
activities=all_activities,
workflow={{ workflow_class }},
)

Expand Down
Loading