Skip to content

makemore/django-agent-framework

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Django Agent Framework

⚠️ DEPRECATED ⚠️

This package has been merged into django-agent-runtime.

Please update your code to use django-agent-runtime>=0.4.0 instead.

Migration Guide

1. Update your dependencies

# Remove old package
pip uninstall django-agent-framework

# Install new package
pip install django-agent-runtime>=0.4.0

2. Update your imports

Old (deprecated):

from django_agent_framework import DjangoStepExecutor, DjangoRunContext

New:

from django_agent_runtime.steps import DjangoStepExecutor, DjangoRunContext

Or use the convenience imports:

from django_agent_runtime import DjangoStepExecutor, DjangoRunContext

3. Update INSTALLED_APPS

Old:

INSTALLED_APPS = [
    ...
    'django_agent_framework',
]

New:

INSTALLED_APPS = [
    ...
    'django_agent_runtime',
]

4. Run migrations

python manage.py migrate django_agent_runtime

Why the merge?

The functionality in django-agent-framework was specific to Django and overlapped with django-agent-runtime. Merging them:

  1. Simplifies installation - One Django package instead of two
  2. Reduces confusion - Clear separation between pure Python (agent-runtime-core) and Django (django-agent-runtime)
  3. Shares infrastructure - Step checkpoints can use the same persistence layer as other runtime features

Legacy Documentation (for reference)

Features

  • DjangoStepExecutor: Execute multi-step agent operations with database-backed checkpointing
  • DjangoRunContext: RunContext implementation using Django's cache and database
  • StepCheckpoint Model: Persistent storage for execution state with resume capability
  • StepEvent Model: Event logging for step execution history
  • Django Admin Integration: View and manage step execution history
  • Celery Integration: Optional async step execution via Celery tasks

Quick Start (Legacy)

1. Add to INSTALLED_APPS

INSTALLED_APPS = [
    ...
    'django_agent_framework',
]

2. Run Migrations

python manage.py migrate django_agent_framework

3. Define Steps

from agent_runtime_core.steps import Step

async def fetch_data(ctx, state):
    """Fetch data from external API."""
    data = await fetch_from_api()
    state['data'] = data
    return {'fetched': len(data)}

async def process_data(ctx, state):
    """Process the fetched data."""
    data = state['data']
    processed = transform(data)
    return {'processed': len(processed)}

async def save_results(ctx, state):
    """Save results to database."""
    # Save logic here
    return {'saved': True}

4. Execute Steps

from uuid import uuid4
from django_agent_framework import DjangoRunContext, DjangoStepExecutor
from agent_runtime_core.steps import Step

async def run_pipeline(user):
    # Create context
    ctx = DjangoRunContext(
        run_id=uuid4(),
        user=user,
    )
    
    # Create executor
    executor = DjangoStepExecutor(ctx)
    
    # Define and run steps
    steps = [
        Step("fetch", fetch_data),
        Step("process", process_data, retries=3, retry_delay=2.0),
        Step("save", save_results),
    ]
    
    results = await executor.run(steps)
    return results

Step Class Reference

The Step class from agent_runtime_core.steps defines a single step in a multi-step operation:

from agent_runtime_core.steps import Step

step = Step(
    name="process_data",           # Unique identifier for this step (required)
    fn=my_async_function,          # Async function: async def fn(ctx, state) -> result (required)
    retries=3,                     # Number of retry attempts on failure (default: 0)
    retry_delay=2.0,               # Seconds to wait between retries (default: 1.0)
    timeout=30.0,                  # Timeout in seconds for this step (default: None)
    description="Process input",   # Human-readable description for progress reporting (default: None)
    checkpoint=True,               # Whether to checkpoint after this step (default: True)
)

Step Function Signature

Step functions must be async and accept two arguments:

async def my_step(ctx: RunContext, state: dict) -> Any:
    """
    Args:
        ctx: The RunContext providing access to emit(), checkpoint(), cancelled(), etc.
        state: Mutable dict for passing data between steps (custom_state)
    
    Returns:
        Any JSON-serializable result (stored in step_results)
    """
    # Access shared state from previous steps
    previous_data = state.get('data')
    
    # Modify state for subsequent steps
    state['processed'] = True
    
    # Check for cancellation in long operations
    if ctx.cancelled():
        raise StepCancelledError("my_step")
    
    # Return result (will be stored in results["my_step"])
    return {"status": "complete", "count": 42}

ExecutionState

The ExecutionState class tracks the state of a multi-step execution:

from agent_runtime_core.steps import ExecutionState

state = ExecutionState(
    execution_id=UUID,           # Unique ID for this execution
    current_step_index=0,        # Index of current/next step to run
    completed_steps=[],          # List of completed step names
    step_results={},             # Dict mapping step names to their results
    started_at=datetime,         # When execution started
    custom_state={},             # User-defined state passed between steps
)

# Serialize for checkpointing
state_dict = state.to_dict()

# Restore from checkpoint
state = ExecutionState.from_dict(state_dict)

Events

The StepExecutor emits events during execution that are captured by DjangoRunContext and stored in the StepEvent model:

STEP_STARTED

Emitted when a step begins execution.

{
    "step_name": "fetch_data",
    "step_index": 0,
    "total_steps": 3,
    "attempt": 1,              # Current attempt number
    "max_attempts": 4,         # Total attempts (1 + retries)
    "description": "Fetch data from API",
}

STEP_COMPLETED

Emitted when a step completes successfully.

{
    "step_name": "fetch_data",
    "step_index": 0,
    "total_steps": 3,
    "result": {"fetched": 100},
    "duration_seconds": 1.23,
}

STEP_FAILED

Emitted when a step fails after all retries.

{
    "step_name": "fetch_data",
    "step_index": 0,
    "total_steps": 3,
    "error": "Connection timeout",
    "attempts": 4,
}

STEP_RETRYING

Emitted when a step fails but will be retried.

{
    "step_name": "fetch_data",
    "step_index": 0,
    "attempt": 2,
    "max_attempts": 4,
    "error": "Connection timeout",
    "retry_delay": 2.0,
}

STEP_SKIPPED

Emitted when a step is skipped (already completed on resume).

{
    "step_name": "fetch_data",
    "step_index": 0,
    "total_steps": 3,
    "reason": "already_completed",
}

PROGRESS_UPDATE

Emitted to report execution progress.

{
    "step_name": "fetch_data",
    "step_index": 0,
    "total_steps": 3,
    "progress_percent": 33.3,
    "description": "Fetching data from API",
}

Resuming from Checkpoint

If execution is interrupted, you can resume from the last checkpoint:

# Resume using the same run_id
ctx = DjangoRunContext(run_id=original_run_id, user=user)
executor = DjangoStepExecutor(ctx)

# Pass the same steps - completed ones will be skipped
results = await executor.run(steps, resume=True)

Celery Integration

For background execution:

from django_agent_framework.celery import run_steps_task

# Define steps as serializable config
steps_config = [
    {"name": "fetch", "module": "myapp.steps", "function": "fetch_data"},
    {"name": "process", "module": "myapp.steps", "function": "process_data", "retries": 3},
    {"name": "save", "module": "myapp.steps", "function": "save_results"},
]

# Queue the task
result = run_steps_task.delay(
    run_id=str(uuid4()),
    steps_config=steps_config,
    user_id=request.user.id,
)

Admin Interface

The package includes Django admin views for:

  • Viewing checkpoint status and progress
  • Inspecting step results and custom state
  • Browsing event history
  • Filtering by status, user, and date

Models

StepCheckpoint

Stores execution state for resumable step sequences:

  • execution_id: Unique execution identifier
  • run_id: Associated run ID
  • status: pending, running, completed, failed, cancelled
  • completed_steps: List of completed step names
  • step_results: Results from each step
  • custom_state: User-defined state passed between steps

StepEvent

Records events during step execution:

  • event_type: step_started, step_completed, step_failed, etc.
  • step_name: Name of the step
  • payload: Event details
  • timestamp: When the event occurred

Configuration

DjangoRunContext Options

ctx = DjangoRunContext(
    run_id=uuid4(),
    user=request.user,                    # Optional Django user
    conversation_id=conversation_id,       # Optional conversation link
    input_messages=[...],                  # Input messages
    params={'key': 'value'},              # Run parameters
    metadata={'source': 'api'},           # Run metadata
    use_cache_for_events=True,            # Cache events for quick access
    event_cache_timeout=3600,             # Cache timeout in seconds
)

DjangoStepExecutor Options

executor = DjangoStepExecutor(
    ctx,
    checkpoint_key="_step_executor_state",  # Key for checkpoint storage
    cancel_check_interval=0.5,              # Cancellation check interval
    auto_update_status=True,                # Auto-update checkpoint status
)

Requirements

  • Python >= 3.10
  • Django >= 4.2
  • agent-runtime-core >= 0.5.0

License

MIT

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages