# AI On-Call Agent System Documentation

This comprehensive documentation covers the complete AI On-Call Agent system architecture, setup, deployment, and usage. The system provides intelligent automation for ETL infrastructure monitoring and automated issue resolution.

## Table of Contents

1. [System Architecture Overview](#architecture)
2. [Service Components and Dependencies](#services)
3. [Celery Task Queue Configuration](#celery)
4. [Database Schema and Models](#database)
5. [API Endpoints Documentation](#api)
6. [Model Training in Notebooks](#training-notebooks)
7. [Model Training via API](#training-api)
8. [Development Environment Setup](#dev-setup)
9. [Production Deployment Setup](#prod-setup)
10. [Testing Endpoints with Examples](#testing)
11. [Configuration Management](#config)
12. [Monitoring and Logging Setup](#monitoring)

## 1. System Architecture Overview {#architecture}

The AI On-Call Agent is a distributed system designed for intelligent ETL infrastructure monitoring and automated incident resolution. The architecture follows microservices patterns with event-driven communication.

### Core Components

```
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Log Monitor   │    │  AI Decision    │    │ Action Engine   │
│                 │────│    Engine       │────│                 │
│ - File polling  │    │ - Classification│    │ - Airflow API   │
│ - Pattern match │    │ - Confidence    │    │ - Service ops   │
│ - Anomaly detect│    │ - Recommendations│    │ - Rollbacks     │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────────────────────────────────────────────────────┐
│                     Message Queue (Redis)                       │
└─────────────────────────────────────────────────────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Knowledge     │    │   PostgreSQL    │    │    FastAPI      │
│     Base        │    │   Database      │    │   Web Server    │
│                 │    │                 │    │                 │
│ - Solutions     │    │ - Incidents     │    │ - REST APIs     │
│ - Patterns      │    │ - Training data │    │ - Webhooks      │
│ - History       │    │ - Metrics       │    │ - Monitoring    │
└─────────────────┘    └─────────────────┘    └─────────────────┘
```

### Technology Stack

- **Backend Framework**: FastAPI (Python 3.9+)
- **Database**: PostgreSQL 13+ with SQLAlchemy ORM
- **Message Queue**: Redis for Celery task queue
- **Task Queue**: Celery for background processing
- **Machine Learning**: scikit-learn, pandas, numpy
- **AI Integration**: OpenAI GPT for advanced analysis
- **Monitoring**: Structured logging with JSON format
- **Infrastructure**: Docker & Docker Compose

In [None]:
# System Data Flow Example
"""
This demonstrates the typical data flow through the system:

1. Log Monitor detects anomaly
2. Creates incident in database
3. AI Engine analyzes and provides recommendations
4. Action Engine executes automated fixes
5. Results tracked and stored for training
"""

# Example incident flow
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum

class IncidentSeverity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

class IncidentStatus(Enum):
    OPEN = "open"
    IN_PROGRESS = "in_progress"
    RESOLVED = "resolved"

@dataclass
class Incident:
    id: str
    title: str
    description: str
    severity: IncidentSeverity
    status: IncidentStatus
    category: str
    metadata: Dict[str, Any]
    
    def to_dict(self):
        return {
            "id": self.id,
            "title": self.title,
            "description": self.description,
            "severity": self.severity.value,
            "status": self.status.value,
            "category": self.category,
            "metadata": self.metadata
        }

# Example incident creation
sample_incident = Incident(
    id="inc-001",
    title="Airflow DAG Failure - ETL Pipeline",
    description="ETL pipeline failed during data transformation step",
    severity=IncidentSeverity.HIGH,
    status=IncidentStatus.OPEN,
    category="airflow_dag",
    metadata={
        "dag_id": "etl_daily_pipeline",
        "task_id": "transform_data",
        "execution_date": "2025-07-31T10:00:00Z",
        "retry_count": 2,
        "max_retries": 3
    }
)

print("Sample Incident:")
print(sample_incident.to_dict())

## 2. Service Components and Dependencies {#services}

### Core Services

#### 2.1 Enhanced Incident Service (`enhanced_incident_service.py`)

The central orchestrator for incident management, AI analysis, and automated resolution.

**Responsibilities:**
- Incident lifecycle management (create, update, resolve)
- AI decision engine integration
- Automated action execution
- Training data collection
- Resolution tracking

**Dependencies:**
- PostgreSQL (incident storage)
- AI Decision Engine
- Action Service
- Redis (for caching)

#### 2.2 AI Decision Engine (`ai/__init__.py`)

Provides intelligent analysis and action recommendations using machine learning models.

**Responsibilities:**
- Incident classification
- Confidence scoring
- Action recommendation
- Pattern recognition
- Model training and updates

**Models Used:**
- RandomForestClassifier for incident categorization
- LogisticRegression for action recommendation
- TF-IDF Vectorizer for text analysis

#### 2.3 Action Service (`actions.py`)

Executes automated remediation actions across different systems.

**Responsibilities:**
- Airflow DAG restarts via REST API
- Service restarts (K8s, Docker, systemd)
- Database operations
- Cache clearing
- Spark job management

#### 2.4 Log Monitor (`monitoring/__init__.py`)

Real-time log monitoring and anomaly detection.

**Responsibilities:**
- File-based log polling
- Elasticsearch integration
- Pattern matching
- Alert generation
- Log archiving

In [None]:
# Service Initialization Examples

# 1. Enhanced Incident Service Setup
class EnhancedIncidentService:
    def __init__(self):
        self.action_service = ActionService()
        self.ai_engine = None  # Injected later
        
    def set_ai_engine(self, ai_engine):
        """Dependency injection for AI engine"""
        self.ai_engine = ai_engine
        
    async def create_incident_with_async_resolution(self, request, auto_resolve=True):
        """Main entry point for incident creation with AI analysis"""
        incident = await self.create_incident(request)
        
        if auto_resolve and self.ai_engine:
            # Background task for AI analysis and resolution
            asyncio.create_task(self._async_resolution_workflow(incident))
            
        return {"incident_id": incident.id, "async_resolution_triggered": True}

# 2. AI Decision Engine Setup
class AIDecisionEngine:
    def __init__(self):
        self.incident_classifier = None
        self.action_recommender = None
        self.text_vectorizer = None
        self._load_models()
        
    def _load_models(self):
        """Load or initialize ML models"""
        try:
            # Load existing models or train new ones
            self.incident_classifier = joblib.load('models/incident_classifier.pkl')
            self.action_recommender = joblib.load('models/action_recommender.pkl')
            self.text_vectorizer = joblib.load('models/text_vectorizer.pkl')
        except FileNotFoundError:
            print("Models not found, will train from scratch")
            
    async def analyze_incident(self, incident):
        """Analyze incident and provide recommendations"""
        features = self._extract_features(incident)
        category = self.incident_classifier.predict([features])[0]
        actions = self.action_recommender.predict([features])
        confidence = self.incident_classifier.predict_proba([features]).max()
        
        return {
            "category": category,
            "recommended_actions": actions,
            "confidence_score": confidence
        }

# 3. Action Service Setup  
class ActionService:
    def __init__(self):
        self.airflow_client = None
        self.k8s_client = None
        self.docker_client = None
        self._init_clients()
        
    def _init_clients(self):
        """Initialize external service clients"""
        try:
            import docker
            self.docker_client = docker.from_env()
        except Exception:
            print("Docker client not available")
            
    async def execute_action(self, action_type, parameters, incident_id):
        """Execute automated action"""
        if action_type == "restart_airflow_dag":
            return await self._restart_airflow_dag(parameters.get("dag_id"))
        elif action_type == "restart_service":
            return await self._restart_service(parameters.get("service"))
        # ... other action types

print("Service initialization examples completed")

## 3. Celery Task Queue Configuration {#celery}

### Overview

Celery is used for background task processing, allowing the system to handle long-running operations asynchronously without blocking API responses.

### Task Types

1. **AI Analysis Tasks**: Model training, incident classification
2. **Action Execution Tasks**: Service restarts, API calls
3. **Monitoring Tasks**: Log processing, metric collection
4. **Maintenance Tasks**: Database cleanup, model updates

### Configuration Structure

```
src/
├── celery_app.py          # Celery application setup
├── tasks/                 # Task definitions
│   ├── __init__.py
│   ├── ai_tasks.py        # AI/ML related tasks
│   ├── action_tasks.py    # Action execution tasks
│   └── monitoring_tasks.py # Monitoring tasks
└── config/
    └── celery_config.py   # Celery settings
```

### Broker Configuration

- **Development**: Redis (localhost:6379)
- **Production**: Redis Cluster or RabbitMQ
- **Result Backend**: Same as broker for simplicity

In [None]:
# Celery Configuration Examples

# 1. celery_app.py - Main Celery Application
from celery import Celery
import os

# Create Celery instance
celery_app = Celery(
    "on_call_agent",
    broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    backend=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    include=[
        "src.tasks.ai_tasks",
        "src.tasks.action_tasks", 
        "src.tasks.monitoring_tasks"
    ]
)

# Configuration
celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,  # 30 minutes
    task_soft_time_limit=25 * 60,  # 25 minutes
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=1000,
)

# 2. Task Examples - ai_tasks.py
from celery import shared_task
import asyncio

@shared_task(bind=True, name="train_incident_classifier")
def train_incident_classifier_task(self, min_samples=1000):
    """Background task to train the incident classifier model"""
    try:
        self.update_state(state="PROGRESS", meta={"progress": 0})
        
        # Load training data
        from src.ai import AIDecisionEngine
        ai_engine = AIDecisionEngine()
        
        self.update_state(state="PROGRESS", meta={"progress": 25})
        
        # Train models
        metrics = ai_engine.train_models(min_samples=min_samples)
        
        self.update_state(state="PROGRESS", meta={"progress": 100})
        
        return {
            "status": "completed",
            "metrics": metrics,
            "model_version": ai_engine.model_version
        }
    except Exception as exc:
        self.update_state(
            state="FAILURE",
            meta={"error": str(exc), "progress": 0}
        )
        raise

@shared_task(name="analyze_incident_async")
def analyze_incident_async(incident_data):
    """Analyze incident in background"""
    from src.ai import AIDecisionEngine
    ai_engine = AIDecisionEngine()
    
    # Convert to async call
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    try:
        result = loop.run_until_complete(
            ai_engine.analyze_incident(incident_data)
        )
        return result
    finally:
        loop.close()

# 3. Action Tasks - action_tasks.py
@shared_task(bind=True, name="execute_airflow_restart")
def execute_airflow_restart_task(self, dag_id, incident_id=None):
    """Execute Airflow DAG restart as background task"""
    try:
        from src.services.actions import ActionService
        action_service = ActionService()
        
        # Execute restart
        result = asyncio.run(
            action_service._restart_airflow_dag(dag_id)
        )
        
        return {
            "success": result[0],
            "error": result[1],
            "dag_id": dag_id,
            "incident_id": incident_id
        }
    except Exception as exc:
        self.update_state(
            state="FAILURE", 
            meta={"error": str(exc), "dag_id": dag_id}
        )
        raise

# 4. Worker Startup Commands
print("Celery worker startup commands:")
print("# Development:")
print("celery -A src.celery_app worker --loglevel=info --concurrency=2")
print()
print("# Production:")
print("celery -A src.celery_app worker --loglevel=warning --concurrency=4")
print()
print("# Monitor:")
print("celery -A src.celery_app flower --port=5555")

## 4. Database Schema and Models {#database}

### Schema Overview

The system uses PostgreSQL with SQLAlchemy ORM for data persistence. Key tables include:

- **incidents**: Core incident tracking
- **training_data**: ML model training data  
- **knowledge_base**: Solution patterns and fixes
- **actions**: Executed action history
- **logs**: System and application logs

### Entity Relationships

```
┌─────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  incidents  │────│ training_data   │    │ knowledge_base  │
│             │    │                 │    │                 │
│ id (PK)     │    │ incident_id(FK) │    │ id (PK)         │
│ title       │    │ features        │    │ title           │
│ severity    │    │ target_actions  │    │ error_patterns  │
│ status      │    │ outcome_score   │    │ solution_steps  │
│ metadata    │    │ created_at      │    │ success_rate    │
│ created_at  │    └─────────────────┘    └─────────────────┘
│ resolved_at │                    
└─────────────┘                    
      │                            
      ▼                            
┌─────────────────┐                
│    actions      │                
│                 │                
│ id (PK)         │                
│ incident_id(FK) │                
│ action_type     │                
│ parameters      │                
│ status          │                
│ result          │                
│ executed_at     │                
└─────────────────┘                
```

In [None]:
# SQLAlchemy Model Definitions

from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, JSON, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime

Base = declarative_base()

# 1. Incidents Table
class Incident(Base):
    __tablename__ = 'incidents'
    
    id = Column(String, primary_key=True)
    title = Column(String(200), nullable=False)
    description = Column(Text, nullable=False)
    severity = Column(String(20), nullable=False)  # critical, high, medium, low
    service = Column(String(100))
    status = Column(String(20), nullable=False)    # open, in_progress, resolved
    tags = Column(JSON, default=list)
    
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    resolved_at = Column(DateTime)
    
    resolution_notes = Column(Text)
    actions_taken = Column(JSON, default=list)
    metadata = Column(JSON, default=dict)
    
    # Relationships
    training_data = relationship("TrainingData", back_populates="incident")
    actions = relationship("Action", back_populates="incident")

# 2. Training Data Table
class TrainingData(Base):
    __tablename__ = 'training_data'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    incident_id = Column(String, ForeignKey('incidents.id'), nullable=False)
    
    # Features for ML training
    input_features = Column(JSON, nullable=False)
    target_actions = Column(JSON, nullable=False)
    outcome_score = Column(Integer)  # 0-100 effectiveness score
    resolution_time_minutes = Column(Integer)
    
    timestamp = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    incident = relationship("Incident", back_populates="training_data")

# 3. Knowledge Base Table
class KnowledgeEntry(Base):
    __tablename__ = 'knowledge_base'
    
    id = Column(String, primary_key=True)
    title = Column(String(200), nullable=False)
    description = Column(Text, nullable=False)
    category = Column(String(50), nullable=False)
    
    # Pattern matching
    error_patterns = Column(JSON, nullable=False)  # List of regex patterns
    solution_steps = Column(JSON, nullable=False)  # List of steps
    automated_actions = Column(JSON, default=list) # Automated fix actions
    
    # Metadata
    success_rate = Column(Integer, default=0)      # 0-100
    last_used = Column(DateTime)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    created_by = Column(String(100))

# 4. Actions Table  
class Action(Base):
    __tablename__ = 'actions'
    
    id = Column(String, primary_key=True)
    incident_id = Column(String, ForeignKey('incidents.id'))
    
    action_type = Column(String(50), nullable=False)
    parameters = Column(JSON, default=dict)
    status = Column(String(20), nullable=False)    # pending, running, success, failed
    
    created_at = Column(DateTime, default=datetime.utcnow)
    started_at = Column(DateTime)
    completed_at = Column(DateTime)
    
    result = Column(JSON, default=dict)
    error_message = Column(Text)
    executed_by = Column(String(100))
    is_manual = Column(Boolean, default=False)
    
    # Relationships
    incident = relationship("Incident", back_populates="actions")

# 5. Database Migration Example
from alembic import op
import sqlalchemy as sa

def upgrade():
    """Create incidents table migration"""
    op.create_table('incidents',
        sa.Column('id', sa.String(), nullable=False),
        sa.Column('title', sa.String(length=200), nullable=False),
        sa.Column('description', sa.Text(), nullable=False),
        sa.Column('severity', sa.String(length=20), nullable=False),
        sa.Column('service', sa.String(length=100), nullable=True),
        sa.Column('status', sa.String(length=20), nullable=False),
        sa.Column('tags', sa.JSON(), nullable=True),
        sa.Column('created_at', sa.DateTime(), nullable=True),
        sa.Column('updated_at', sa.DateTime(), nullable=True),
        sa.Column('resolved_at', sa.DateTime(), nullable=True),
        sa.Column('resolution_notes', sa.Text(), nullable=True),
        sa.Column('actions_taken', sa.JSON(), nullable=True),
        sa.Column('metadata', sa.JSON(), nullable=True),
        sa.PrimaryKeyConstraint('id')
    )
    
    # Create indexes
    op.create_index('ix_incidents_status', 'incidents', ['status'])
    op.create_index('ix_incidents_severity', 'incidents', ['severity'])
    op.create_index('ix_incidents_created_at', 'incidents', ['created_at'])

print("Database models defined successfully")

## 5. API Endpoints Documentation {#api}

### API Base URL
- **Development**: `http://localhost:8000`
- **Production**: `https://api.on-call-agent.yourdomain.com`

### Authentication
Currently using API key authentication (future: OAuth2/JWT)

### Core Endpoints

#### 5.1 Enhanced Incidents

| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | `/api/v1/enhanced-incidents/` | Create incident with auto-resolution |
| GET | `/api/v1/enhanced-incidents/` | List incidents with filtering |
| GET | `/api/v1/enhanced-incidents/{id}` | Get incident details |
| PUT | `/api/v1/enhanced-incidents/{id}` | Update incident |

#### 5.2 AI Training

| Method | Endpoint | Description |
|--------|----------|-------------|
| POST | `/api/v1/ai/retrain` | Trigger model retraining |
| GET | `/api/v1/ai/model-status` | Get current model status |
| GET | `/api/v1/ai/training-stats` | Get training statistics |

#### 5.3 Actions

| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/v1/actions/` | List executed actions |
| POST | `/api/v1/actions/execute` | Execute manual action |
| GET | `/api/v1/actions/{id}` | Get action details |

#### 5.4 Knowledge Base

| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/v1/knowledge/` | List knowledge entries |
| POST | `/api/v1/knowledge/` | Create knowledge entry |
| PUT | `/api/v1/knowledge/{id}` | Update knowledge entry |
| DELETE | `/api/v1/knowledge/{id}` | Delete knowledge entry |

#### 5.5 Testing & Demo

| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | `/api/v1/testing/scenarios` | List test scenarios |
| POST | `/api/v1/testing/simple-incident` | Create test incident |
| POST | `/api/v1/testing/force-action/{type}` | Test specific actions |

In [None]:
# API Request/Response Examples

# 1. Create Enhanced Incident
import requests
import json

# Enhanced Incident Creation
incident_data = {
    "title": "Airflow DAG Failed",
    "description": "The data_processing_dag failed with connection error",
    "severity": "high",
    "service_name": "data_pipeline",
    "log_data": "ERROR: Failed to connect to database",
    "context": {
        "environment": "production",
        "dag_id": "data_processing_dag",
        "task_id": "extract_data",
        "run_id": "manual_2024_01_31_10_30_00"
    },
    "auto_resolve": True,
    "metadata": {
        "source": "airflow",
        "dag_id": "data_processing_dag",
        "severity_level": 3
    }
}

# POST Request
response = requests.post(
    "http://localhost:8000/api/v1/enhanced-incidents/",
    json=incident_data,
    headers={"Content-Type": "application/json"}
)

print("Response Status:", response.status_code)
print("Response Body:", response.json())

# Expected Response:
{
    "id": "123e4567-e89b-12d3-a456-426614174000",
    "title": "Airflow DAG Failed",
    "status": "resolved",
    "ai_analysis": {
        "classification": "airflow_dag_failure",
        "confidence": 0.95,
        "recommended_action": "restart_dag"
    },
    "actions_taken": [
        {
            "type": "restart_dag",
            "status": "success",
            "dag_id": "data_processing_dag"
        }
    ],
    "created_at": "2024-01-31T10:30:00Z",
    "resolved_at": "2024-01-31T10:32:15Z"
}

In [None]:
# 2. Trigger AI Model Retraining
retrain_request = {
    "force_retrain": True,
    "model_types": ["classification", "action_prediction"],
    "include_recent_data": True
}

response = requests.post(
    "http://localhost:8000/api/v1/ai/retrain",
    json=retrain_request
)

# Expected Response:
{
    "task_id": "retrain_models_task_123",
    "status": "queued",
    "message": "Model retraining task queued successfully",
    "estimated_completion": "2024-01-31T11:00:00Z"
}

# 3. Get Training Statistics
response = requests.get("http://localhost:8000/api/v1/ai/training-stats")

# Expected Response:
{
    "classification_model": {
        "accuracy": 0.94,
        "precision": 0.92,
        "recall": 0.89,
        "f1_score": 0.91,
        "last_trained": "2024-01-30T15:30:00Z",
        "training_samples": 1540
    },
    "action_prediction_model": {
        "accuracy": 0.87,
        "precision": 0.85,
        "recall": 0.88,
        "f1_score": 0.86,
        "last_trained": "2024-01-30T15:30:00Z",
        "training_samples": 890
    }
}

# 4. Test Simple Incident Creation (for testing)
test_incident = {
    "title": "Test DAG Issue",
    "description": "Testing fallback logic",
    "severity": "medium",
    "service_name": "test_service",
    "log_data": "Test error message",
    "context": {"environment": "development"},
    "auto_resolve": True,
    "metadata": {"dag_id": "simple_test_dag"}
}

response = requests.post(
    "http://localhost:8000/api/v1/testing/simple-incident",
    json=test_incident
)

print("Test Response:", response.json())

## 6. Model Training in Notebooks {#training-notebooks}

### 6.1 Training Data Collection

The system collects training data from resolved incidents to continuously improve AI models:

- **Classification Training**: Incident descriptions → problem categories
- **Action Prediction**: Problem types → successful resolution actions
- **Severity Assessment**: Log patterns → severity levels

### 6.2 Notebook-Based Training Workflow

#### Location
- Training notebooks: `notebooks/training/`
- Data preparation: `notebooks/data_preparation/`
- Model evaluation: `notebooks/evaluation/`

#### Key Notebooks

1. **`01_data_preparation.ipynb`**: Data cleaning and feature engineering
2. **`02_classification_training.ipynb`**: Incident classification model
3. **`03_action_prediction.ipynb`**: Action recommendation model
4. **`04_model_evaluation.ipynb`**: Performance analysis and validation
5. **`05_hyperparameter_tuning.ipynb`**: Model optimization

### 6.3 Training Pipeline Components

#### Data Sources
- PostgreSQL incidents table
- Historical action success rates
- Log pattern libraries
- Knowledge base entries

In [None]:
# Training Code Examples for Notebooks

# Example from 02_classification_training.ipynb
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import joblib
import os

# 1. Data Loading and Preparation
def load_training_data():
    """Load training data from PostgreSQL"""
    from sqlalchemy import create_engine
    
    engine = create_engine(os.getenv('DATABASE_URL'))
    query = """
    SELECT 
        i.title, 
        i.description, 
        i.log_data,
        i.severity,
        td.problem_category,
        td.action_taken,
        td.success_rate
    FROM incidents i
    JOIN training_data td ON i.id = td.incident_id
    WHERE i.status = 'resolved'
    AND td.feedback_score >= 0.7
    """
    
    df = pd.read_sql(query, engine)
    return df

# 2. Feature Engineering
def prepare_features(df):
    """Create features for model training"""
    # Combine text fields
    df['combined_text'] = (
        df['title'].fillna('') + ' ' + 
        df['description'].fillna('') + ' ' + 
        df['log_data'].fillna('')
    )
    
    # TF-IDF Vectorization
    vectorizer = TfidfVectorizer(
        max_features=1000,
        stop_words='english',
        ngram_range=(1, 2)
    )
    
    X_text = vectorizer.fit_transform(df['combined_text'])
    
    # Save vectorizer
    joblib.dump(vectorizer, 'models/tfidf_vectorizer.pkl')
    
    return X_text, df['problem_category']

# 3. Model Training
def train_classification_model():
    """Train incident classification model"""
    df = load_training_data()
    X, y = prepare_features(df)
    
    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    # Train Random Forest
    model = RandomForestClassifier(
        n_estimators=100,
        max_depth=10,
        random_state=42,
        class_weight='balanced'
    )
    
    model.fit(X_train, y_train)
    
    # Evaluate
    y_pred = model.predict(X_test)
    print(classification_report(y_test, y_pred))
    
    # Save model
    joblib.dump(model, 'models/classification_model.pkl')
    
    return model

# 4. Action Prediction Model Training
def train_action_prediction_model():
    """Train action recommendation model"""
    df = load_training_data()
    
    # Features: problem_category + severity + historical success
    feature_cols = ['problem_category', 'severity']
    X = pd.get_dummies(df[feature_cols])
    y = df['action_taken']
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    model = RandomForestClassifier(
        n_estimators=50,
        random_state=42
    )
    
    model.fit(X_train, y_train)
    
    # Save model
    joblib.dump(model, 'models/action_prediction_model.pkl')
    
    return model

# 5. Model Validation
def validate_models():
    """Cross-validation and performance metrics"""
    from sklearn.model_selection import cross_val_score
    
    df = load_training_data()
    X, y = prepare_features(df)
    
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    
    # Cross-validation
    cv_scores = cross_val_score(model, X, y, cv=5, scoring='f1_weighted')
    
    print(f"Cross-validation F1 scores: {cv_scores}")
    print(f"Mean F1 score: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
    
    return cv_scores

# Example usage in notebook:
if __name__ == "__main__":
    print("Training Classification Model...")
    classification_model = train_classification_model()
    
    print("Training Action Prediction Model...")
    action_model = train_action_prediction_model()
    
    print("Validating Models...")
    validation_scores = validate_models()

## 7. Model Training via API {#api-training}

### 7.1 Automated Training Pipeline

The system provides REST API endpoints for triggering model retraining without manual notebook execution.

#### Training Triggers
- **Scheduled**: Cron-based automatic retraining (daily/weekly)
- **Threshold-based**: Retrain when model performance drops below threshold
- **Manual**: On-demand training via API call
- **Data-driven**: Retrain when new training samples reach minimum threshold

### 7.2 Training API Endpoints

#### Trigger Training
```http
POST /api/v1/ai/retrain
Content-Type: application/json

{
    "force_retrain": true,
    "model_types": ["classification", "action_prediction"],
    "include_recent_data": true,
    "training_config": {
        "test_size": 0.2,
        "cv_folds": 5,
        "min_samples": 100
    }
}
```

#### Monitor Training Progress
```http
GET /api/v1/ai/training-status/{task_id}
```

#### Get Model Performance
```http
GET /api/v1/ai/model-metrics
```

### 7.3 Training Configuration

#### Environment Variables
- `MIN_TRAINING_SAMPLES`: Minimum samples required for training (default: 100)
- `RETRAIN_THRESHOLD`: Performance threshold for automatic retraining (default: 0.85)
- `TRAINING_SCHEDULE`: Cron expression for scheduled training
- `MODEL_BACKUP_COUNT`: Number of model versions to keep

In [None]:
# API Training Implementation

# FastAPI endpoint for model retraining
from fastapi import APIRouter, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import uuid

router = APIRouter(prefix="/api/v1/ai", tags=["AI Training"])

class TrainingRequest(BaseModel):
    force_retrain: bool = False
    model_types: List[str] = ["classification", "action_prediction"]
    include_recent_data: bool = True
    training_config: Optional[dict] = None

class TrainingResponse(BaseModel):
    task_id: str
    status: str
    message: str
    estimated_completion: str

@router.post("/retrain", response_model=TrainingResponse)
async def trigger_model_retraining(
    request: TrainingRequest,
    background_tasks: BackgroundTasks
):
    """Trigger model retraining via Celery task"""
    
    # Generate unique task ID
    task_id = f"retrain_models_{uuid.uuid4().hex[:8]}"
    
    # Queue Celery task
    from services.ai_service import retrain_models_task
    
    task = retrain_models_task.delay(
        model_types=request.model_types,
        force_retrain=request.force_retrain,
        include_recent_data=request.include_recent_data,
        config=request.training_config or {}
    )
    
    return TrainingResponse(
        task_id=task.id,
        status="queued",
        message="Model retraining task queued successfully",
        estimated_completion="2024-01-31T11:00:00Z"
    )

# Celery task implementation
from celery import Celery
from services.database import get_db
from ai.model_trainer import ModelTrainer
import logging

@celery_app.task(bind=True)
def retrain_models_task(self, model_types, force_retrain=False, include_recent_data=True, config=None):
    """Background task for model retraining"""
    
    try:
        # Update task status
        self.update_state(state='PROGRESS', meta={'status': 'Loading training data'})
        
        # Initialize trainer
        trainer = ModelTrainer(config=config)
        
        # Load training data
        training_data = trainer.load_training_data(include_recent=include_recent_data)
        
        self.update_state(state='PROGRESS', meta={'status': 'Training models'})
        
        results = {}
        
        # Train each requested model type
        for model_type in model_types:
            if model_type == "classification":
                results['classification'] = trainer.train_classification_model(training_data)
            elif model_type == "action_prediction":
                results['action_prediction'] = trainer.train_action_prediction_model(training_data)
        
        # Save models
        self.update_state(state='PROGRESS', meta={'status': 'Saving models'})
        trainer.save_models(results)
        
        # Update model registry
        trainer.update_model_registry(results)
        
        return {
            'status': 'completed',
            'results': results,
            'training_samples': len(training_data),
            'completion_time': datetime.utcnow().isoformat()
        }
        
    except Exception as exc:
        logging.error(f"Model training failed: {exc}")
        self.update_state(
            state='FAILURE',
            meta={'status': 'failed', 'error': str(exc)}
        )
        raise

# Model Trainer Class
class ModelTrainer:
    def __init__(self, config=None):
        self.config = config or {}
        self.db = get_db()
    
    def load_training_data(self, include_recent=True):
        """Load training data from database"""
        query = """
        SELECT i.*, td.* FROM incidents i
        JOIN training_data td ON i.id = td.incident_id
        WHERE i.status = 'resolved'
        """
        
        if include_recent:
            query += " AND i.created_at >= NOW() - INTERVAL '30 days'"
        
        return pd.read_sql(query, self.db.connection())
    
    def train_classification_model(self, data):
        """Train incident classification model"""
        # Implementation similar to notebook version
        # but optimized for production use
        pass
    
    def save_models(self, results):
        """Save trained models with versioning"""
        for model_type, model_data in results.items():
            model_path = f"models/{model_type}_v{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl"
            joblib.dump(model_data['model'], model_path)
            
            # Update model registry in database
            self.update_model_registry(model_type, model_path, model_data['metrics'])

# Usage example
import asyncio
import requests

async def retrain_models_example():
    """Example of triggering model retraining"""
    
    # API call to trigger retraining
    response = requests.post(
        "http://localhost:8000/api/v1/ai/retrain",
        json={
            "force_retrain": True,
            "model_types": ["classification", "action_prediction"],
            "include_recent_data": True,
            "training_config": {
                "test_size": 0.2,
                "cv_folds": 5,
                "min_samples": 100
            }
        }
    )
    
    task_id = response.json()["task_id"]
    print(f"Training task queued: {task_id}")
    
    # Poll for completion
    while True:
        status_response = requests.get(f"http://localhost:8000/api/v1/ai/training-status/{task_id}")
        status = status_response.json()
        
        if status["state"] in ["SUCCESS", "FAILURE"]:
            print(f"Training completed with status: {status['state']}")
            break
        
        await asyncio.sleep(30)  # Check every 30 seconds

## 8. Development Environment Setup {#dev-setup}

### 8.1 Prerequisites

#### System Requirements
- **Python**: 3.9+ 
- **Docker**: 20.10+
- **Docker Compose**: 2.0+
- **Git**: Latest version
- **Node.js**: 16+ (for frontend development)

#### Development Tools
- VS Code with Python extension
- Jupyter Lab/Notebook
- PostgreSQL client (psql or GUI tool)
- Redis CLI
- Postman or similar API testing tool

### 8.2 Local Development Setup

#### Step 1: Repository Setup
```bash
# Clone repository
git clone https://github.com/your-org/on-call-agent.git
cd on-call-agent

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt
pip install -r requirements-dev.txt
```

#### Step 2: Environment Configuration
```bash
# Copy environment template
cp .env.example .env

# Edit .env with your settings
# Key variables:
DATABASE_URL=postgresql://user:password@localhost:5432/oncollagent_dev
REDIS_URL=redis://localhost:6379/0
AIRFLOW_API_URL=http://localhost:8080/api/v1
OPENAI_API_KEY=your_openai_key
DEFAULT_DAG_ID=simple_test_dag
```

#### Step 3: Database Setup
```bash
# Start PostgreSQL and Redis
docker-compose up -d postgres redis

# Run database migrations
alembic upgrade head

# Seed test data (optional)
python scripts/seed_test_data.py
```

### 8.3 Development Services

#### Service Start Order
1. **Database & Cache**: PostgreSQL, Redis
2. **Background Workers**: Celery worker, Celery beat
3. **API Server**: FastAPI application
4. **Monitoring**: Airflow (if testing integrations)

In [None]:
# Development Startup Commands

# 1. Start Infrastructure Services
docker-compose up -d postgres redis

# 2. Start Celery Worker (Terminal 1)
celery -A app.core.celery worker --loglevel=info --concurrency=4

# 3. Start Celery Beat Scheduler (Terminal 2)  
celery -A app.core.celery beat --loglevel=info

# 4. Start FastAPI Development Server (Terminal 3)
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

# 5. Optional: Start Jupyter Lab for notebook development
jupyter lab --port 8888 --no-browser

# Alternative: One-command startup using Docker Compose
docker-compose up --build

# Development helper scripts
chmod +x scripts/dev-setup.sh
./scripts/dev-setup.sh

# Run tests
pytest tests/ -v --cov=app

# Database operations
alembic revision --autogenerate -m "description"
alembic upgrade head
alembic downgrade -1

# Lint and format code
black app/ tests/
isort app/ tests/
flake8 app/ tests/

# View logs
docker-compose logs -f api
docker-compose logs -f worker
docker-compose logs -f postgres

## 9. Production Deployment {#production}

### 9.1 Infrastructure Requirements

#### Compute Resources
- **API Server**: 2+ CPU cores, 4GB+ RAM
- **Worker Nodes**: 1+ CPU core, 2GB+ RAM per worker
- **Database**: 2+ CPU cores, 8GB+ RAM, SSD storage
- **Cache**: 1 CPU core, 1GB+ RAM

#### External Dependencies
- PostgreSQL 13+ (managed service recommended)
- Redis 6+ (managed service recommended)
- Load balancer (nginx, ALB, etc.)
- SSL certificates
- Monitoring & logging infrastructure

### 9.2 Container Orchestration

#### Docker Compose (Simple Deployment)
```yaml
# docker-compose.prod.yml
version: '3.8'
services:
  api:
    image: on-call-agent:latest
    environment:
      - ENV=production
      - DATABASE_URL=${DATABASE_URL}
      - REDIS_URL=${REDIS_URL}
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
    restart: unless-stopped
    
  worker:
    image: on-call-agent:latest
    command: celery -A app.core.celery worker --loglevel=info
    environment:
      - ENV=production
      - DATABASE_URL=${DATABASE_URL}
      - REDIS_URL=${REDIS_URL}
    depends_on:
      - postgres
      - redis
    restart: unless-stopped
    
  beat:
    image: on-call-agent:latest
    command: celery -A app.core.celery beat --loglevel=info
    environment:
      - ENV=production
      - DATABASE_URL=${DATABASE_URL}
      - REDIS_URL=${REDIS_URL}
    depends_on:
      - postgres
      - redis
    restart: unless-stopped
```

#### Kubernetes (Advanced Deployment)
- Helm charts in `deploy/kubernetes/`
- Horizontal Pod Autoscaling (HPA)
- Resource quotas and limits
- Health checks and readiness probes
- Secret management for sensitive data

### 9.3 Environment Configuration

#### Production Environment Variables
```bash
# Application
ENV=production
DEBUG=false
LOG_LEVEL=INFO

# Database
DATABASE_URL=postgresql://user:pass@prod-db:5432/oncollagent
DATABASE_POOL_SIZE=20
DATABASE_MAX_OVERFLOW=10

# Redis
REDIS_URL=redis://prod-redis:6379/0
REDIS_POOL_SIZE=10

# Security
SECRET_KEY=your-secure-secret-key
API_KEY_HEADER=X-API-Key
ALLOWED_HOSTS=api.on-call-agent.com

# External APIs
AIRFLOW_API_URL=https://airflow.company.com/api/v1
AIRFLOW_USERNAME=on_call_agent
AIRFLOW_PASSWORD=secure_password
OPENAI_API_KEY=your_openai_key

# Monitoring
SENTRY_DSN=https://your-sentry-dsn
PROMETHEUS_METRICS=true
LOG_FORMAT=json
```

In [None]:
# Production Deployment Commands

# 1. Build Production Image
docker build -t on-call-agent:latest -f Dockerfile.prod .

# 2. Deploy with Docker Compose
docker-compose -f docker-compose.prod.yml up -d

# 3. Database Migration (Production)
docker-compose -f docker-compose.prod.yml exec api alembic upgrade head

# 4. Health Check
curl -f http://localhost:8000/health || exit 1

# 5. Kubernetes Deployment (if using K8s)
helm install on-call-agent ./deploy/helm/on-call-agent \
  --values ./deploy/helm/values.prod.yaml \
  --namespace production

# 6. Scaling Workers
docker-compose -f docker-compose.prod.yml up -d --scale worker=3

# 7. Log Monitoring
docker-compose -f docker-compose.prod.yml logs -f api worker beat

# 8. Backup Database
pg_dump $DATABASE_URL > backup_$(date +%Y%m%d_%H%M%S).sql

# 9. Rolling Update
docker-compose -f docker-compose.prod.yml pull
docker-compose -f docker-compose.prod.yml up -d --no-deps api worker

# 10. Production Testing
pytest tests/integration/ --env=production

# SSL Certificate Setup (Let's Encrypt)
certbot --nginx -d api.on-call-agent.com

# Nginx Configuration
upstream on_call_api {
    server 127.0.0.1:8000;
}

server {
    listen 443 ssl;
    server_name api.on-call-agent.com;
    
    ssl_certificate /etc/letsencrypt/live/api.on-call-agent.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/api.on-call-agent.com/privkey.pem;
    
    location / {
        proxy_pass http://on_call_api;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}

# Monitoring Commands
# Prometheus metrics endpoint
curl http://localhost:8000/metrics

# Application logs
tail -f /var/log/on-call-agent/app.log

# System monitoring
htop  # CPU/Memory usage
iostat  # Disk I/O
netstat -tulpn  # Network connections

## 10. Testing Guide {#testing}

### 10.1 Testing Strategy

#### Test Types
- **Unit Tests**: Individual functions and classes
- **Integration Tests**: Service interactions
- **API Tests**: HTTP endpoint validation
- **End-to-End Tests**: Complete workflow testing
- **Performance Tests**: Load and stress testing

#### Test Structure
```
tests/
├── unit/
│   ├── test_ai_service.py
│   ├── test_enhanced_incident_service.py
│   └── test_action_service.py
├── integration/
│   ├── test_database_operations.py
│   ├── test_celery_tasks.py
│   └── test_airflow_integration.py
├── api/
│   ├── test_incident_endpoints.py
│   ├── test_ai_endpoints.py
│   └── test_auth_endpoints.py
└── e2e/
    ├── test_incident_workflow.py
    └── test_training_pipeline.py
```

### 10.2 API Endpoint Testing

#### Test Environment Setup
```bash
# Start test environment
docker-compose -f docker-compose.test.yml up -d

# Run API tests
pytest tests/api/ -v --env=test
```

#### Testing Tools
- **pytest**: Test framework
- **httpx**: Async HTTP client
- **pytest-asyncio**: Async test support
- **factories**: Test data generation
- **fixtures**: Test setup/teardown

In [None]:
# Comprehensive Testing Examples

# 1. API Endpoint Testing
import pytest
import httpx
import asyncio
from datetime import datetime

@pytest.fixture
async def client():
    """Create test client"""
    from app.main import app
    async with httpx.AsyncClient(app=app, base_url="http://test") as client:
        yield client

@pytest.fixture
def sample_incident_data():
    """Sample incident data for testing"""
    return {
        "title": "Test DAG Failed",
        "description": "Test incident for API validation",
        "severity": "high",
        "service_name": "test_service",
        "log_data": "ERROR: Test error message",
        "context": {
            "environment": "test",
            "dag_id": "simple_test_dag",
            "task_id": "test_task"
        },
        "auto_resolve": True,
        "metadata": {
            "source": "airflow",
            "dag_id": "simple_test_dag"
        }
    }

# Test Enhanced Incidents Endpoint
@pytest.mark.asyncio
async def test_create_enhanced_incident(client, sample_incident_data):
    """Test incident creation with auto-resolution"""
    response = await client.post(
        "/api/v1/enhanced-incidents/",
        json=sample_incident_data
    )
    
    assert response.status_code == 201
    data = response.json()
    assert data["title"] == sample_incident_data["title"]
    assert data["status"] in ["processing", "resolved"]
    assert "ai_analysis" in data
    assert "id" in data
    
    return data["id"]

@pytest.mark.asyncio
async def test_get_incident_details(client, sample_incident_data):
    """Test retrieving incident details"""
    # First create incident
    create_response = await client.post(
        "/api/v1/enhanced-incidents/",
        json=sample_incident_data
    )
    incident_id = create_response.json()["id"]
    
    # Then retrieve it
    response = await client.get(f"/api/v1/enhanced-incidents/{incident_id}")
    
    assert response.status_code == 200
    data = response.json()
    assert data["id"] == incident_id
    assert "ai_analysis" in data
    assert "actions_taken" in data

@pytest.mark.asyncio
async def test_list_incidents_with_filters(client):
    """Test incident listing with filters"""
    response = await client.get(
        "/api/v1/enhanced-incidents/",
        params={
            "severity": "high",
            "status": "resolved",
            "limit": 10
        }
    )
    
    assert response.status_code == 200
    data = response.json()
    assert "items" in data
    assert "total" in data
    assert len(data["items"]) <= 10

# Test AI Training Endpoints
@pytest.mark.asyncio
async def test_trigger_model_retraining(client):
    """Test model retraining trigger"""
    training_request = {
        "force_retrain": True,
        "model_types": ["classification"],
        "include_recent_data": True,
        "training_config": {
            "test_size": 0.2,
            "min_samples": 10  # Lower for testing
        }
    }
    
    response = await client.post(
        "/api/v1/ai/retrain",
        json=training_request
    )
    
    assert response.status_code == 200
    data = response.json()
    assert "task_id" in data
    assert data["status"] == "queued"

@pytest.mark.asyncio
async def test_get_model_metrics(client):
    """Test model performance metrics"""
    response = await client.get("/api/v1/ai/model-metrics")
    
    assert response.status_code == 200
    data = response.json()
    assert "classification_model" in data or "no_models_trained" in data

# Test Action Endpoints
@pytest.mark.asyncio
async def test_list_actions(client):
    """Test action listing"""
    response = await client.get("/api/v1/actions/")
    
    assert response.status_code == 200
    data = response.json()
    assert isinstance(data, list)

@pytest.mark.asyncio
async def test_execute_manual_action(client):
    """Test manual action execution"""
    action_request = {
        "action_type": "restart_dag",
        "parameters": {
            "dag_id": "simple_test_dag"
        },
        "reason": "Manual test execution"
    }
    
    response = await client.post(
        "/api/v1/actions/execute",
        json=action_request
    )
    
    assert response.status_code == 200
    data = response.json()
    assert "action_id" in data
    assert data["status"] in ["queued", "executing"]

# Integration Tests
@pytest.mark.asyncio
async def test_complete_incident_workflow(client, sample_incident_data):
    """Test complete incident resolution workflow"""
    
    # 1. Create incident
    create_response = await client.post(
        "/api/v1/enhanced-incidents/",
        json=sample_incident_data
    )
    incident_id = create_response.json()["id"]
    
    # 2. Wait for processing (in real tests, use polling)
    await asyncio.sleep(2)
    
    # 3. Check incident status
    status_response = await client.get(f"/api/v1/enhanced-incidents/{incident_id}")
    incident_data = status_response.json()
    
    # 4. Verify AI analysis was performed
    assert "ai_analysis" in incident_data
    assert incident_data["ai_analysis"]["classification"] is not None
    
    # 5. Verify actions were taken if auto_resolve=True
    if sample_incident_data["auto_resolve"]:
        assert len(incident_data["actions_taken"]) > 0
        assert incident_data["status"] in ["resolved", "processing"]

# Performance Testing
@pytest.mark.asyncio
async def test_api_performance(client, sample_incident_data):
    """Test API response times"""
    import time
    
    start_time = time.time()
    
    # Create multiple incidents concurrently
    tasks = []
    for i in range(5):
        task = client.post(
            "/api/v1/enhanced-incidents/",
            json={**sample_incident_data, "title": f"Test Incident {i}"}
        )
        tasks.append(task)
    
    responses = await asyncio.gather(*tasks)
    
    end_time = time.time()
    total_time = end_time - start_time
    
    # Verify all requests succeeded
    for response in responses:
        assert response.status_code == 201
    
    # Verify reasonable performance (adjust threshold as needed)
    assert total_time < 10.0  # Should complete within 10 seconds

# Database Testing
def test_database_connection():
    """Test database connectivity"""
    from app.core.database import get_db
    
    db = next(get_db())
    result = db.execute("SELECT 1").scalar()
    assert result == 1

def test_incident_model_creation():
    """Test incident model CRUD operations"""
    from app.models.incident import Incident
    from app.core.database import get_db
    
    db = next(get_db())
    
    # Create
    incident = Incident(
        title="Test Incident",
        description="Test description",
        severity="medium",
        service_name="test_service"
    )
    db.add(incident)
    db.commit()
    
    # Read
    retrieved = db.query(Incident).filter(Incident.title == "Test Incident").first()
    assert retrieved is not None
    assert retrieved.severity == "medium"
    
    # Update
    retrieved.status = "resolved"
    db.commit()
    
    # Verify update
    updated = db.query(Incident).filter(Incident.id == retrieved.id).first()
    assert updated.status == "resolved"
    
    # Delete
    db.delete(updated)
    db.commit()

# Manual Testing Scripts
def manual_test_simple_incident():
    """Manual test script for simple incident creation"""
    import requests
    
    test_data = {
        "title": "Manual Test DAG Issue",
        "description": "Testing the system manually",
        "severity": "medium",
        "service_name": "test_service",
        "log_data": "ERROR: Manual test error",
        "context": {"environment": "development"},
        "auto_resolve": True,
        "metadata": {"dag_id": "simple_test_dag"}
    }
    
    response = requests.post(
        "http://localhost:8000/api/v1/enhanced-incidents/",
        json=test_data
    )
    
    print(f"Status: {response.status_code}")
    print(f"Response: {response.json()}")
    
    return response.json()["id"] if response.status_code == 201 else None

# Load Testing Example
async def load_test_api():
    """Simple load test for API endpoints"""
    import aiohttp
    import asyncio
    
    async def create_incident(session, incident_num):
        async with session.post(
            "http://localhost:8000/api/v1/enhanced-incidents/",
            json={
                "title": f"Load Test Incident {incident_num}",
                "description": "Load testing",
                "severity": "low",
                "service_name": "load_test",
                "auto_resolve": False
            }
        ) as response:
            return await response.json()
    
    async with aiohttp.ClientSession() as session:
        tasks = [create_incident(session, i) for i in range(50)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful = sum(1 for r in results if not isinstance(r, Exception))
        print(f"Successful requests: {successful}/50")

if __name__ == "__main__":
    # Run manual tests
    incident_id = manual_test_simple_incident()
    if incident_id:
        print(f"Created incident: {incident_id}")
        
    # Run load test
    asyncio.run(load_test_api())

## 11. Quick Reference {#quick-reference}

### 11.1 Development Commands Cheatsheet

```bash
# Environment Setup
python -m venv venv && source venv/bin/activate
pip install -r requirements.txt
cp .env.example .env

# Start Services
docker-compose up -d postgres redis
celery -A app.core.celery worker --loglevel=info
uvicorn app.main:app --reload

# Database
alembic upgrade head
alembic revision --autogenerate -m "description"

# Testing
pytest tests/ -v --cov=app
pytest tests/api/ -k "test_incident"

# Code Quality
black app/ && isort app/ && flake8 app/
```

### 11.2 API Quick Reference

| Endpoint | Method | Purpose |
|----------|--------|---------|
| `/api/v1/enhanced-incidents/` | POST | Create incident with auto-resolution |
| `/api/v1/enhanced-incidents/{id}` | GET | Get incident details |
| `/api/v1/ai/retrain` | POST | Trigger model retraining |
| `/api/v1/ai/model-metrics` | GET | Get model performance |
| `/api/v1/actions/execute` | POST | Execute manual action |
| `/health` | GET | Health check |
| `/docs` | GET | API documentation |

### 11.3 Key Configuration Variables

```bash
# Core
DATABASE_URL=postgresql://user:pass@host:5432/db
REDIS_URL=redis://host:6379/0
DEFAULT_DAG_ID=simple_test_dag

# AI/ML
OPENAI_API_KEY=your_key
MIN_TRAINING_SAMPLES=100
RETRAIN_THRESHOLD=0.85

# External Services
AIRFLOW_API_URL=http://airflow:8080/api/v1
AIRFLOW_USERNAME=admin
AIRFLOW_PASSWORD=password
```

### 11.4 Troubleshooting Guide

#### Common Issues

**Database Connection Errors**
- Check `DATABASE_URL` format
- Verify PostgreSQL is running
- Check network connectivity

**Celery Worker Issues**
- Verify Redis is accessible
- Check worker logs: `celery -A app.core.celery inspect active`
- Restart workers if stuck

**Model Training Failures**
- Ensure minimum training samples exist
- Check feature extraction pipeline
- Verify model directory permissions

**Airflow Integration Issues**
- Test API connectivity: `curl $AIRFLOW_API_URL/health`
- Verify credentials and permissions
- Check DAG exists and is active

### 11.5 Monitoring & Logs

```bash
# Application Logs
tail -f logs/app.log

# Docker Services
docker-compose logs -f api worker beat

# Database Queries
psql $DATABASE_URL -c "SELECT COUNT(*) FROM incidents;"

# Redis Queue Status
redis-cli -u $REDIS_URL info

# System Resources
htop
df -h
```

---

## Conclusion

This AI On-Call Agent system provides a comprehensive automation solution for ETL infrastructure monitoring and incident resolution. The system combines:

- **Intelligent Analysis**: AI-powered incident classification and action recommendation
- **Automated Resolution**: Seamless integration with external systems like Airflow
- **Scalable Architecture**: Microservices design with async processing
- **Production Ready**: Docker containerization, monitoring, and deployment guides

The documentation covers all aspects from development setup to production deployment, ensuring teams can effectively implement, maintain, and extend the system.

For additional support or questions, refer to the API documentation at `/docs` or check the project repository for the latest updates.