# Healthcare Administrative Assistant Agent

A multi-agent AI system that automates healthcare administrative tasks including appointment scheduling, patient intake, records management, insurance verification, and appointment follow-ups.

## Project Overview

**Problem:** Healthcare providers spend 20-40% of time on administrative tasks.
**Solution:** Multi-agent system that automates key administrative workflows.

## Features

- **Orchestrator Agent** - Routes requests to appropriate specialized agents
- **Intake Agent** - Processes patient information and medical history
- **Scheduling Agent** - Manages appointment booking and rescheduling
- **Insurance Verification Agent** - Verifies coverage and eligibility
- **Followup Agent** - Sends reminders and post-visit communications

This notebook demonstrates the full system capabilities in a standalone environment.

In [1]:
# Install dependencies
!pip install -q "pydantic[email]"

# Import required libraries
import logging
import uuid
import hashlib
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field, EmailStr

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

In [2]:
# --- Data Models ---

class AppointmentType(str, Enum):
    CHECKUP = "checkup"
    FOLLOWUP = "followup"
    CONSULTATION = "consultation"
    PROCEDURE = "procedure"
    URGENT = "urgent"

class AppointmentStatus(str, Enum):
    SCHEDULED = "scheduled"
    CONFIRMED = "confirmed"
    COMPLETED = "completed"
    CANCELLED = "cancelled"
    RESCHEDULED = "rescheduled"

class PatientIntakeRequest(BaseModel):
    """New patient intake form"""
    first_name: str = Field(..., min_length=1)
    last_name: str = Field(..., min_length=1)
    email: EmailStr
    phone: str = Field(..., pattern=r"^\+?1?\d{9,15}$")
    date_of_birth: datetime
    gender: Optional[str] = None
    address: str
    city: str
    state: str
    zip_code: str
    medical_history: Optional[str] = None
    allergies: Optional[List[str]] = None
    current_medications: Optional[List[str]] = None
    insurance_provider: str
    insurance_id: str
    insurance_group_number: Optional[str] = None

class AppointmentRequest(BaseModel):
    """Appointment scheduling request"""
    patient_id: str
    appointment_type: AppointmentType
    preferred_date: datetime
    preferred_provider: Optional[str] = None
    notes: Optional[str] = None
    duration_minutes: int = 30

class RescheduleRequest(BaseModel):
    """Appointment reschedule request"""
    appointment_id: str
    new_date: datetime
    reason: Optional[str] = None

class InsuranceVerificationRequest(BaseModel):
    """Insurance verification request"""
    patient_id: str
    insurance_id: str
    provider: str

class AppointmentResponse(BaseModel):
    """Appointment confirmation response"""
    appointment_id: str
    patient_id: str
    appointment_type: AppointmentType
    scheduled_date: datetime
    provider_name: str
    location: str
    status: AppointmentStatus
    confirmation_sent: bool
    reminder_scheduled: bool

class InsuranceVerificationResponse(BaseModel):
    """Insurance verification response"""
    patient_id: str
    is_eligible: bool
    coverage_details: Optional[dict] = None
    copay: Optional[float] = None
    deductible: Optional[float] = None
    estimated_cost: Optional[float] = None
    message: str

In [3]:
# --- Base Agent ---

class BaseAgent(ABC):
    """Abstract base class for all agents"""
    
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
        self.created_at = datetime.now()
        self.session_id: Optional[str] = None
        self.context: Dict[str, Any] = {}
        
    @abstractmethod
    async def process(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Process a request and return response"""
        pass
    
    def set_session_context(self, session_id: str, context: Dict[str, Any]):
        """Set the session context for this agent"""
        self.session_id = session_id
        self.context = context
        logger.info(f"{self.name} - Session context set: {session_id}")
    
    def get_session_context(self) -> Dict[str, Any]:
        """Get current session context"""
        return self.context
    
    def log_action(self, action: str, details: Dict[str, Any]):
        """Log an agent action for audit trail"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "agent": self.name,
            "action": action,
            "session_id": self.session_id,
            "details": details
        }
        logger.info(f"Agent Action: {log_entry}")
        return log_entry

In [4]:
# --- Intake Agent ---

class IntakeAgent(BaseAgent):
    """
    Intake Agent processes new patient information including:
    - Parsing intake forms
    - Extracting demographics and medical history
    - Validating data completeness
    - Storing patient profiles in database
    - Flagging missing information
    """
    
    def __init__(self):
        super().__init__(
            name="IntakeAgent",
            description="Processes patient intake forms and medical history"
        )
        self.required_fields = [
            "first_name", "last_name", "email", "phone", 
            "date_of_birth", "insurance_provider", "insurance_id"
        ]
        self.optional_fields = [
            "middle_name", "gender", "address", "medical_history", 
            "allergies", "current_medications"
        ]
    
    async def process(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process patient intake request
        
        Args:
            request: Contains patient_info with form data
        
        Returns:
            Response with patient_id and validation results
        """
        request_id = request.get("request_id", "INTAKE_REQUEST")
        patient_info = request.get("patient_info", {})
        
        logger.info(f"[{request_id}] Intake Agent processing: {patient_info.get('first_name')} {patient_info.get('last_name')}")
        
        try:
            # Step 1: Validate required fields
            validation_result = self._validate_intake_data(patient_info)
            if not validation_result["is_valid"]:
                logger.warning(f"[{request_id}] Validation failed: {validation_result['missing_fields']}")
                return {
                    "success": False,
                    "error": "Missing required fields",
                    "missing_fields": validation_result["missing_fields"],
                    "message": "Please provide all required information"
                }
            
            # Step 2: Parse and structure intake data
            parsed_data = self._parse_intake_form(patient_info)
            
            # Step 3: Extract critical information
            critical_info = self._extract_critical_info(parsed_data)
            
            # Step 4: Generate patient ID
            patient_id = self._generate_patient_id()
            
            # Step 5: Store in database (mock)
            store_result = self._store_patient_record(patient_id, parsed_data)
            
            # Log the action
            self.log_action("intake_processed", {
                "request_id": request_id,
                "patient_id": patient_id,
                "patient_name": f"{parsed_data['first_name']} {parsed_data['last_name']}",
                "validation_passed": True,
                "allergies_count": len(parsed_data.get("allergies", [])),
                "medications_count": len(parsed_data.get("current_medications", []))
            })
            
            return {
                "success": True,
                "patient_id": patient_id,
                "patient_name": f"{parsed_data['first_name']} {parsed_data['last_name']}",
                "email": parsed_data["email"],
                "phone": parsed_data["phone"],
                "date_of_birth": parsed_data["date_of_birth"],
                "insurance_provider": parsed_data["insurance_provider"],
                "insurance_id": parsed_data["insurance_id"],
                "critical_info": critical_info,
                "status": "intake_complete",
                "next_steps": ["Insurance Verification", "Schedule Appointment"],
                "message": f"Patient {parsed_data['first_name']} {parsed_data['last_name']} registered successfully"
            }
        
        except Exception as e:
            logger.error(f"[{request_id}] Intake Agent error: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "message": "Failed to process intake"
            }

    def _validate_intake_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate that all required fields are present"""
        missing = []
        for field in self.required_fields:
            if field not in data or not data[field]:
                missing.append(field)
        
        return {
            "is_valid": len(missing) == 0,
            "missing_fields": missing
        }
    
    def _parse_intake_form(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Parse and normalize intake form data"""
        # In a real app, this would handle different form formats
        # For now, we just clean up the dictionary
        return {k: v for k, v in data.items() if v is not None}
    
    def _extract_critical_info(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Extract critical medical info like allergies"""
        allergies = data.get("allergies", [])
        medications = data.get("current_medications", [])
        history = data.get("medical_history", "")
        
        critical_flags = []
        if allergies:
            critical_flags.append("HAS_ALLERGIES")
        
        return {
            "has_allergies": bool(allergies),
            "allergies": allergies,
            "medications_count": len(medications),
            "critical_flags": critical_flags
        }
    
    def _generate_patient_id(self) -> str:
        """Generate a unique patient ID"""
        return f"PAT_{uuid.uuid4().hex[:8].upper()}"
    
    def _store_patient_record(self, patient_id: str, data: Dict[str, Any]) -> bool:
        """Mock storage of patient record"""
        # In production, this would write to database
        return True

In [5]:
# --- Scheduling Agent ---

class SchedulingAgent(BaseAgent):
    """
    Scheduling Agent manages:
    - Provider availability queries
    - Appointment booking
    - Conflict detection
    - Rescheduling requests
    - Waitlist management
    """
    
    def __init__(self):
        super().__init__(
            name="SchedulingAgent",
            description="Manages appointment scheduling and calendar operations"
        )
        # Mock provider database
        self.providers = self._init_mock_providers()
        self.scheduled_appointments = {}
    
    async def process(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process scheduling request
        
        Args:
            request: Contains appointment details
        
        Returns:
            Response with appointment confirmation
        """
        request_id = request.get("request_id", "SCHEDULE_REQUEST")
        request_type = request.get("appointment_action", "schedule")
        
        logger.info(f"[{request_id}] Scheduling Agent - Action: {request_type}")
        
        try:
            if request_type == "check_availability":
                return await self._handle_availability_check(request, request_id)
            elif request_type == "book":
                return await self._handle_appointment_booking(request, request_id)
            elif request_type == "reschedule":
                return await self._handle_rescheduling(request, request_id)
            elif request_type == "cancel":
                return await self._handle_cancellation(request, request_id)
            else:
                return {
                    "success": False,
                    "error": f"Unknown appointment action: {request_type}"
                }
        
        except Exception as e:
            logger.error(f"[{request_id}] Scheduling Agent error: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "message": "Failed to process scheduling request"
            }
    
    async def _handle_availability_check(self, request: Dict[str, Any], request_id: str) -> Dict[str, Any]:
        """Check provider availability for given date range"""
        logger.info(f"[{request_id}] Checking availability")
        
        preferred_date = request.get("preferred_date")
        appointment_type = request.get("appointment_type", "checkup")
        duration_minutes = request.get("duration_minutes", 30)
        
        # Mock: Generate available slots
        available_slots = self._get_available_slots(
            preferred_date, 
            appointment_type, 
            duration_minutes
        )
        
        return {
            "success": True,
            "available_slots": available_slots,
            "total_slots": len(available_slots),
            "message": f"Found {len(available_slots)} available slots"
        }
    
    async def _handle_appointment_booking(self, request: Dict[str, Any], request_id: str) -> Dict[str, Any]:
        """Book an appointment"""
        logger.info(f"[{request_id}] Booking appointment")
        
        patient_id = request.get("patient_id")
        appointment_type = request.get("appointment_type", "checkup")
        appointment_datetime = request.get("preferred_date")
        provider_id = request.get("preferred_provider", "PROV_001")
        
        # Generate appointment ID
        appointment_id = self._generate_appointment_id()
        
        # Mock: Book the appointment
        booking_result = {
            "appointment_id": appointment_id,
            "patient_id": patient_id,
            "provider_id": provider_id,
            "provider_name": self.providers.get(provider_id, {}).get("name", "Dr. Unknown"),
            "appointment_datetime": appointment_datetime,
            "appointment_type": appointment_type,
            "location": self.providers.get(provider_id, {}).get("location", "Downtown Clinic"),
            "duration_minutes": 30,
            "status": "scheduled",
            "booked_at": datetime.now().isoformat()
        }
        
        # Store appointment
        self.scheduled_appointments[appointment_id] = booking_result
        
        self.log_action("appointment_booked", {
            "request_id": request_id,
            "appointment_id": appointment_id,
            "patient_id": patient_id,
            "appointment_datetime": appointment_datetime
        })
        
        return {
            "success": True,
            "appointment_id": appointment_id,
            "patient_id": patient_id,
            "provider_name": booking_result["provider_name"],
            "appointment_datetime": appointment_datetime,
            "location": booking_result["location"],
            "confirmation_token": f"CONF_{appointment_id}",
            "status": "scheduled",
            "next_steps": [
                "Insurance verification",
                "Send confirmation email",
                "Schedule reminders"
            ],
            "message": f"Appointment scheduled for {appointment_datetime}"
        }
    
    async def _handle_rescheduling(self, request: Dict[str, Any], request_id: str) -> Dict[str, Any]:
        """Reschedule an existing appointment"""
        logger.info(f"[{request_id}] Rescheduling appointment")
        
        appointment_id = request.get("appointment_id")
        new_date = request.get("new_date")
        
        # Check if appointment exists
        if appointment_id not in self.scheduled_appointments:
            return {
                "success": False,
                "error": f"Appointment {appointment_id} not found"
            }
        
        # Update appointment
        old_appointment = self.scheduled_appointments[appointment_id]
        old_appointment["status"] = "rescheduled"
        old_appointment["previous_datetime"] = old_appointment["appointment_datetime"]
        old_appointment["appointment_datetime"] = new_date
        old_appointment["updated_at"] = datetime.now().isoformat()
        
        self.log_action("appointment_rescheduled", {
            "request_id": request_id,
            "appointment_id": appointment_id,
            "old_date": old_appointment["previous_datetime"],
            "new_date": new_date
        })
        
        return {
            "success": True,
            "appointment_id": appointment_id,
            "status": "rescheduled",
            "new_datetime": new_date,
            "old_datetime": old_appointment["previous_datetime"],
            "message": f"Appointment rescheduled to {new_date}",
            "next_steps": [
                "Cancel old reminders",
                "Schedule new reminders",
                "Send updated confirmation"
            ]
        }
    
    async def _handle_cancellation(self, request: Dict[str, Any], request_id: str) -> Dict[str, Any]:
        """Cancel an appointment"""
        logger.info(f"[{request_id}] Cancelling appointment")
        
        appointment_id = request.get("appointment_id")
        reason = request.get("reason", "No reason provided")
        
        # Check if appointment exists
        if appointment_id not in self.scheduled_appointments:
            return {
                "success": False,
                "error": f"Appointment {appointment_id} not found"
            }
        
        # Update appointment status
        appointment = self.scheduled_appointments[appointment_id]
        appointment["status"] = "cancelled"
        appointment["cancellation_reason"] = reason
        appointment["cancelled_at"] = datetime.now().isoformat()
        
        self.log_action("appointment_cancelled", {
            "request_id": request_id,
            "appointment_id": appointment_id,
            "reason": reason
        })
        
        return {
            "success": True,
            "appointment_id": appointment_id,
            "status": "cancelled",
            "cancellation_timestamp": appointment["cancelled_at"],
            "message": "Appointment cancelled successfully"
        }
    
    def _get_available_slots(self, preferred_date: str, appointment_type: str, duration_minutes: int) -> List[Dict[str, Any]]:
        """Generate mock available appointment slots"""
        slots = []
        
        # Generate 5 available slots around preferred date
        try:
            base_date = datetime.fromisoformat(preferred_date.replace('Z', '+00:00'))
        except ValueError:
            # Fallback if parsing fails
            base_date = datetime.now() + timedelta(days=1)

        for i in range(5):
            slot_time = base_date + timedelta(days=i)
            slot_time = slot_time.replace(hour=9 + (i % 3))  # 9 AM, 10 AM, 11 AM
            
            slots.append({
                "start_time": slot_time.isoformat(),
                "end_time": (slot_time + timedelta(minutes=duration_minutes)).isoformat(),
                "duration_minutes": duration_minutes,
                "provider_name": "Dr. Jane Smith",
                "location": "Downtown Clinic",
                "appointment_type": appointment_type,
                "availability_id": f"SLOT_{i}"
            })
        
        return slots
    
    def _generate_appointment_id(self) -> str:
        """Generate unique appointment ID"""
        return f"APT_{uuid.uuid4().hex[:8].upper()}"
    
    def _init_mock_providers(self) -> Dict[str, Dict[str, Any]]:
        """Initialize mock provider database"""
        return {
            "PROV_001": {
                "name": "Dr. Jane Smith",
                "specialty": "General Practice",
                "location": "Downtown Clinic",
                "phone": "+1-202-555-0123"
            },
            "PROV_002": {
                "name": "Dr. John Martinez",
                "specialty": "Cardiology",
                "location": "Medical Plaza",
                "phone": "+1-202-555-0124"
            },
            "PROV_003": {
                "name": "Dr. Sarah Chen",
                "specialty": "Dermatology",
                "location": "Downtown Clinic",
                "phone": "+1-202-555-0125"
            }
        }

In [6]:
# --- Verification Agent ---

class VerificationAgent(BaseAgent):
    """
    Insurance Verification Agent:
    - Verifies patient insurance coverage
    - Checks eligibility for services
    - Calculates copay and deductible
    - Estimates appointment costs
    - Identifies coverage gaps
    """
    
    def __init__(self):
        super().__init__(
            name="VerificationAgent",
            description="Verifies insurance coverage and eligibility"
        )
        self.insurance_providers = self._init_mock_insurance_db()
    
    async def process(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process insurance verification request
        
        Args:
            request: Contains insurance details
        
        Returns:
            Response with verification results
        """
        request_id = request.get("request_id", "VERIFY_REQUEST")
        
        logger.info(f"[{request_id}] Verification Agent processing insurance check")
        
        try:
            insurance_provider = request.get("insurance_provider", "").strip()
            insurance_id = request.get("insurance_id", "").strip()
            patient_id = request.get("patient_id")
            
            # Validate inputs
            if not insurance_provider or not insurance_id:
                return {
                    "success": False,
                    "error": "Missing insurance provider or ID",
                    "is_eligible": False
                }
            
            # Check insurance eligibility
            verification_result = self._verify_eligibility(
                insurance_provider,
                insurance_id
            )
            
            if not verification_result["is_eligible"]:
                self.log_action("insurance_verification_failed", {
                    "request_id": request_id,
                    "patient_id": patient_id,
                    "insurance_provider": insurance_provider,
                    "reason": verification_result.get("reason", "Unknown")
                })
                
                return {
                    "success": False,
                    "is_eligible": False,
                    "error": verification_result.get("reason", "Insurance verification failed"),
                    "coverage_status": "inactive",
                    "message": "Insurance coverage not active or invalid",
                    "action_items": [
                        "Verify insurance information with patient",
                        "Check for alternative coverage",
                        "Consider uninsured pricing"
                    ]
                }
            
            # Calculate costs
            cost_estimate = self._estimate_costs(insurance_provider, insurance_id)
            
            self.log_action("insurance_verified", {
                "request_id": request_id,
                "patient_id": patient_id,
                "insurance_provider": insurance_provider,
                "copay": cost_estimate.get("copay")
            })
            
            return {
                "success": True,
                "patient_id": patient_id,
                "is_eligible": True,
                "coverage_status": "active",
                "insurance_provider": insurance_provider,
                "insurance_id": insurance_id,
                "coverage_details": self.insurance_providers.get(
                    insurance_provider, {}
                ).get("coverage", {}),
                "copay": cost_estimate["copay"],
                "coinsurance": cost_estimate["coinsurance"],
                "deductible": cost_estimate["deductible"],
                "deductible_met": cost_estimate["deductible_met"],
                "out_of_pocket_max": cost_estimate["out_of_pocket_max"],
                "estimated_appointment_cost": cost_estimate["estimated_appointment_cost"],
                "verification_timestamp": datetime.now().isoformat(),
                "valid_through": "2025-12-31",
                "message": "Insurance verified successfully",
                "action_items": [
                    "Proceed with appointment booking",
                    "Inform patient of copay amount",
                    "Schedule appointment confirmation"
                ],
                "disclaimers": [
                    "This is an estimate based on information available",
                    "Actual costs may vary based on specific services",
                    "Patient may have balance due after insurance processing"
                ]
            }
        
        except Exception as e:
            logger.error(f"[{request_id}] Verification Agent error: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "message": "Failed to verify insurance"
            }
    
    def _verify_eligibility(self, provider: str, insurance_id: str) -> Dict[str, Any]:
        """Verify insurance eligibility"""
        # Mock verification - in production, would call insurance API
        
        if provider.upper() not in self.insurance_providers:
            return {
                "is_eligible": False,
                "reason": f"Insurance provider '{provider}' not found in system"
            }
        
        # Check insurance ID format
        if not insurance_id or len(insurance_id) < 6:
            return {
                "is_eligible": False,
                "reason": "Invalid insurance ID format"
            }
        
        # Mock: Verify status (90% of IDs are valid)
        hash_value = int(hashlib.md5(insurance_id.encode()).hexdigest(), 16)
        is_valid = (hash_value % 100) < 90
        
        if not is_valid:
            return {
                "is_eligible": False,
                "reason": "Insurance coverage is inactive or expired"
            }
        
        return {
            "is_eligible": True
        }
    
    def _estimate_costs(self, provider: str, insurance_id: str) -> Dict[str, Any]:
        """Estimate appointment costs"""
        provider_data = self.insurance_providers.get(provider, {})
        coverage = provider_data.get("coverage", {})
        
        return {
            "copay": coverage.get("office_visit_copay", 30),
            "coinsurance": coverage.get("coinsurance_percentage", 20),
            "deductible": coverage.get("annual_deductible", 1000),
            "deductible_met": coverage.get("deductible_met", 500),
            "out_of_pocket_max": coverage.get("out_of_pocket_max", 5000),
            "estimated_appointment_cost": coverage.get("office_visit_copay", 30)
        }
    
    def _init_mock_insurance_db(self) -> Dict[str, Dict[str, Any]]:
        """Initialize mock insurance provider database"""
        return {
            "BLUE SHIELD": {
                "name": "Blue Shield of California",
                "coverage": {
                    "office_visit_copay": 30,
                    "specialist_copay": 50,
                    "prescription_copay": 10,
                    "emergency_copay": 250,
                    "coinsurance_percentage": 20,
                    "annual_deductible": 1000,
                    "deductible_met": 500,
                    "out_of_pocket_max": 5000
                }
            },
            "AETNA": {
                "name": "Aetna Health",
                "coverage": {
                    "office_visit_copay": 25,
                    "specialist_copay": 45,
                    "prescription_copay": 15,
                    "emergency_copay": 300,
                    "coinsurance_percentage": 15,
                    "annual_deductible": 750,
                    "deductible_met": 400,
                    "out_of_pocket_max": 4500
                }
            },
            "UNITED": {
                "name": "UnitedHealth Group",
                "coverage": {
                    "office_visit_copay": 35,
                    "specialist_copay": 60,
                    "prescription_copay": 20,
                    "emergency_copay": 350,
                    "coinsurance_percentage": 25,
                    "annual_deductible": 1200,
                    "deductible_met": 600,
                    "out_of_pocket_max": 5500
                }
            },
            "CIGNA": {
                "name": "Cigna Health",
                "coverage": {
                    "office_visit_copay": 28,
                    "specialist_copay": 48,
                    "prescription_copay": 12,
                    "emergency_copay": 280,
                    "coinsurance_percentage": 18,
                    "annual_deductible": 950,
                    "deductible_met": 550,
                    "out_of_pocket_max": 4800
                }
            }
        }

In [7]:
# --- Followup Agent ---

class FollowupAgent(BaseAgent):
    """
    Followup Agent manages:
    - Appointment reminders (24h, 1h before)
    - Post-visit surveys
    - Prescription refill reminders
    - Test result follow-ups
    - No-show tracking and rescheduling
    """
    
    def __init__(self):
        super().__init__(
            name="FollowupAgent",
            description="Sends reminders and post-visit communications"
        )
        self.scheduled_reminders = {}
        self.email_templates = self._init_email_templates()
        self.sms_templates = self._init_sms_templates()
    
    async def process(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process followup request
        
        Args:
            request: Contains followup action and details
        
        Returns:
            Response with reminder scheduling result
        """
        request_id = request.get("request_id", "FOLLOWUP_REQUEST")
        action = request.get("followup_action", "schedule_reminder")
        
        logger.info(f"[{request_id}] Followup Agent - Action: {action}")
        
        try:
            if action == "schedule_reminder":
                return await self._schedule_reminders(request, request_id)
            elif action == "cancel_reminders":
                return await self._cancel_reminders(request, request_id)
            elif action == "send_survey":
                return await self._send_post_visit_survey(request, request_id)
            elif action == "process_no_show":
                return await self._process_no_show(request, request_id)
            else:
                return {
                    "success": False,
                    "error": f"Unknown followup action: {action}"
                }
        
        except Exception as e:
            logger.error(f"[{request_id}] Followup Agent error: {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "message": "Failed to process followup action"
            }
    
    async def _schedule_reminders(self, request: Dict[str, Any], request_id: str) -> Dict[str, Any]:
        """Schedule appointment reminders"""
        logger.info(f"[{request_id}] Scheduling reminders")
        
        appointment_id = request.get("appointment_id")
        appointment_datetime = request.get("appointment_datetime")
        patient_email = request.get("patient_email")
        patient_phone = request.get("patient_phone")
        provider_name = request.get("provider_name", "Your Healthcare Provider")
        location = request.get("location", "Clinic")
        
        # Parse appointment datetime
        try:
            appt_time = datetime.fromisoformat(appointment_datetime)
        except:
            return {"success": False, "error": "Invalid appointment datetime"}
        
        reminders = []
        
        # Schedule 24-hour reminder
        reminder_24h_time = appt_time - timedelta(hours=24)
        reminder_24h = {
            "reminder_id": f"REM_{appointment_id}_24H",
            "type": "appointment_reminder_24h",
            "scheduled_time": reminder_24h_time.isoformat(),
            "delivery_channels": ["email", "sms"] if patient_phone else ["email"],
            "status": "scheduled",
            "message_template": "reminder_24h"
        }
        reminders.append(reminder_24h)
        
        # Schedule 1-hour reminder
        reminder_1h_time = appt_time - timedelta(hours=1)
        reminder_1h = {
            "reminder_id": f"REM_{appointment_id}_1H",
            "type": "appointment_reminder_1h",
            "scheduled_time": reminder_1h_time.isoformat(),
            "delivery_channels": ["sms", "email"] if patient_phone else ["email"],
            "status": "scheduled",
            "message_template": "reminder_1h"
        }
        reminders.append(reminder_1h)
        
        # Store reminders
        self.scheduled_reminders[appointment_id] = reminders
        
        # Build message previews
        message_preview = self._build_reminder_message(
            appointment_datetime,
            provider_name,
            location
        )
        
        self.log_action("reminders_scheduled", {
            "request_id": request_id,
            "appointment_id": appointment_id,
            "reminder_count": len(reminders),
            "delivery_channels": "email, sms" if patient_phone else "email"
        })
        
        return {
            "success": True,
            "appointment_id": appointment_id,
            "reminders_scheduled": len(reminders),
            "reminders": reminders,
            "message_preview": message_preview,
            "status": "reminders_queued",
            "message": f"Scheduled {len(reminders)} reminders for appointment",
            "next_steps": [
                "24-hour reminder will be sent automatically",
                "1-hour reminder will be sent automatically",
                "Patient can reply to confirm attendance"
            ]
        }
    
    async def _cancel_reminders(self, request: Dict[str, Any], request_id: str) -> Dict[str, Any]:
        """Cancel scheduled reminders"""
        logger.info(f"[{request_id}] Cancelling reminders")
        
        appointment_id = request.get("appointment_id")
        
        if appointment_id not in self.scheduled_reminders:
            return {
                "success": False,
                "error": f"No reminders found for appointment {appointment_id}"
            }
        
        # Mark reminders as cancelled
        reminders = self.scheduled_reminders[appointment_id]
        for reminder in reminders:
            reminder["status"] = "cancelled"
        
        self.log_action("reminders_cancelled", {
            "request_id": request_id,
            "appointment_id": appointment_id,
            "reminders_cancelled": len(reminders)
        })
        
        return {
            "success": True,
            "appointment_id": appointment_id,
            "reminders_cancelled": len(reminders),
            "message": f"Cancelled {len(reminders)} reminders"
        }
    
    async def _send_post_visit_survey(self, request: Dict[str, Any], request_id: str) -> Dict[str, Any]:
        """Send post-visit satisfaction survey"""
        logger.info(f"[{request_id}] Sending post-visit survey")
        
        patient_email = request.get("patient_email")
        patient_name = request.get("patient_name", "Patient")
        provider_name = request.get("provider_name", "Your Provider")
        appointment_id = request.get("appointment_id")
        
        survey_link = f"https://survey.healthcare.app/feedback/{appointment_id}"
        
        self.log_action("survey_sent", {
            "request_id": request_id,
            "appointment_id": appointment_id,
            "patient_email": patient_email
        })
        
        return {
            "success": True,
            "survey_id": f"SURV_{appointment_id}",
            "patient_email": patient_email,
            "survey_link": survey_link,
            "delivery_status": "queued",
            "message": "Post-visit survey scheduled for delivery",
            "survey_questions": 5,
            "estimated_completion_time": "3-5 minutes"
        }
    
    async def _process_no_show(self, request: Dict[str, Any], request_id: str) -> Dict[str, Any]:
        """Process no-show and trigger follow-up"""
        logger.info(f"[{request_id}] Processing no-show")
        
        appointment_id = request.get("appointment_id")
        patient_id = request.get("patient_id")
        patient_email = request.get("patient_email")
        patient_phone = request.get("patient_phone")
        
        self.log_action("no_show_recorded", {
            "request_id": request_id,
            "appointment_id": appointment_id,
            "patient_id": patient_id
        })
        
        return {
            "success": True,
            "appointment_id": appointment_id,
            "status": "no_show",
            "actions_taken": [
                "Recorded no-show in patient record",
                "Cancelled all subsequent reminders",
                "Triggered follow-up outreach",
                "Freed up appointment slot for others",
                "Sent apology message to patient"
            ],
            "followup_messages": {
                "email_scheduled": True,
                "phone_call_scheduled": bool(patient_phone),
                "reschedule_opportunity": True
            },
            "next_steps": [
                "Contact patient to reschedule",
                "Note in medical record",
                "Track no-show history"
            ]
        }
    
    def _build_reminder_message(self, appointment_datetime: str, provider_name: str, location: str) -> str:
        """Build reminder message preview"""
        return f"""
        Appointment Reminder
        ────────────────────
        Date & Time: {appointment_datetime}
        Provider: {provider_name}
        Location: {location}
        
        Please arrive 10 minutes early.
        Reply CONFIRM to confirm attendance.
        """
    
    def _init_email_templates(self) -> Dict[str, str]:
        """Initialize email templates"""
        return {
            "reminder_24h": "Your appointment with {provider} is scheduled for tomorrow at {time}.",
            "reminder_1h": "Your appointment with {provider} is in 1 hour. Please head to {location}.",
            "confirmation": "Your appointment has been confirmed for {datetime} with {provider}.",
            "cancellation": "Your appointment on {datetime} has been cancelled.",
            "survey": "Please help us improve by taking a 5-minute survey about your visit."
        }
    
    def _init_sms_templates(self) -> Dict[str, str]:
        """Initialize SMS templates"""
        return {
            "reminder_24h": "Reminder: Appointment tomorrow at {time} with {provider}. Arrive early.",
            "reminder_1h": "Reminder: Appointment in 1 hour at {location}. Reply Y to confirm.",
            "confirmation": "Confirmed: {date} {time} with {provider}. Location: {location}",
            "cancellation": "Cancelled: Your {date} appointment has been cancelled."
        }

In [8]:
# --- Orchestrator Agent ---

class OrchestratorAgent(BaseAgent):
    """
    Master orchestrator agent that:
    - Routes patient requests to appropriate sub-agents
    - Manages workflow state and session context
    - Aggregates responses from sub-agents
    - Handles error recovery and fallbacks
    """
    
    def __init__(self):
        super().__init__(
            name="Orchestrator",
            description="Master agent that routes requests to specialized agents"
        )
        self.sub_agents = {}
        self.request_id_counter = 0
    
    async def process(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process incoming request by routing to appropriate agent(s)
        
        Args:
            request: Dictionary containing request details with keys:
                - patient_id or patient_info: Patient identification
                - request_type: Type of request (appointment, intake, verify, etc.)
                - details: Request-specific details
                - session_id: Optional existing session ID
        
        Returns:
            Response dictionary with aggregated results from sub-agents
        """
        self.request_id_counter += 1
        request_id = f"REQ_{datetime.now().timestamp()}_{self.request_id_counter}"
        
        logger.info(f"[{request_id}] Orchestrator processing request: {request.get('request_type')}")
        
        try:
            # Classify the request
            request_type = request.get("request_type", "unknown")
            session_id = request.get("session_id", request_id)
            
            # Route to appropriate agent(s)
            response = await self._route_request(request_type, request, request_id, session_id)
            
            # Log the orchestration
            self.log_action("route_request", {
                "request_id": request_id,
                "request_type": request_type,
                "success": response.get("success", False)
            })
            
            return response
        
        except Exception as e:
            logger.error(f"[{request_id}] Orchestrator error: {str(e)}")
            return {
                "request_id": request_id,
                "success": False,
                "error": str(e),
                "message": "Failed to process request"
            }
    
    async def _route_request(
        self, 
        request_type: str, 
        request: Dict[str, Any],
        request_id: str,
        session_id: str
    ) -> Dict[str, Any]:
        """
        Route request to appropriate agent(s)
        """
        
        routing_map = {
            "new_patient_appointment": self._handle_new_patient,
            "schedule_appointment": self._handle_schedule,
            "reschedule_appointment": self._handle_reschedule,
            "verify_insurance": self._handle_verify,
            "get_records": self._handle_records,
            "send_reminder": self._handle_reminder,
            "intake_form": self._handle_intake,
        }
        
        handler = routing_map.get(request_type)
        
        if not handler:
            logger.warning(f"[{request_id}] Unknown request type: {request_type}")
            return {
                "request_id": request_id,
                "success": False,
                "error": f"Unknown request type: {request_type}",
                "supported_types": list(routing_map.keys())
            }
        
        return await handler(request, request_id, session_id)
    
    async def _handle_new_patient(
        self, 
        request: Dict[str, Any], 
        request_id: str,
        session_id: str
    ) -> Dict[str, Any]:
        """Handle new patient appointment - Intake + Scheduling in parallel"""
        logger.info(f"[{request_id}] Routing to Intake + Scheduling agents (parallel)")
        
        return {
            "request_id": request_id,
            "session_id": session_id,
            "success": True,
            "message": "New patient appointment workflow initiated",
            "agents_involved": ["Intake", "Scheduling", "Verification"],
            "workflow_steps": [
                "Parse patient intake form",
                "Query provider availability",
                "Verify insurance coverage",
                "Send appointment confirmation"
            ]
        }
    
    async def _handle_schedule(
        self, 
        request: Dict[str, Any], 
        request_id: str,
        session_id: str
    ) -> Dict[str, Any]:
        """Handle appointment scheduling"""
        logger.info(f"[{request_id}] Routing to Scheduling agent")
        
        return {
            "request_id": request_id,
            "session_id": session_id,
            "success": True,
            "message": "Appointment scheduling initiated",
            "agents_involved": ["Scheduling"]
        }
    
    async def _handle_reschedule(
        self, 
        request: Dict[str, Any], 
        request_id: str,
        session_id: str
    ) -> Dict[str, Any]:
        """Handle appointment rescheduling"""
        logger.info(f"[{request_id}] Routing to Scheduling + Followup agents")
        
        return {
            "request_id": request_id,
            "session_id": session_id,
            "success": True,
            "message": "Appointment rescheduling initiated",
            "agents_involved": ["Scheduling", "Followup"]
        }
    
    async def _handle_verify(
        self, 
        request: Dict[str, Any], 
        request_id: str,
        session_id: str
    ) -> Dict[str, Any]:
        """Handle insurance verification"""
        logger.info(f"[{request_id}] Routing to Verification agent")
        
        return {
            "request_id": request_id,
            "session_id": session_id,
            "success": True,
            "message": "Insurance verification initiated",
            "agents_involved": ["Verification"]
        }
    
    async def _handle_records(
        self, 
        request: Dict[str, Any], 
        request_id: str,
        session_id: str
    ) -> Dict[str, Any]:
        """Handle records retrieval"""
        logger.info(f"[{request_id}] Routing to Records agent")
        
        return {
            "request_id": request_id,
            "session_id": session_id,
            "success": True,
            "message": "Records retrieval initiated",
            "agents_involved": ["Records"]
        }
    
    async def _handle_reminder(
        self, 
        request: Dict[str, Any], 
        request_id: str,
        session_id: str
    ) -> Dict[str, Any]:
        """Handle appointment reminder"""
        logger.info(f"[{request_id}] Routing to Followup agent")
        
        return {
            "request_id": request_id,
            "session_id": session_id,
            "success": True,
            "message": "Reminder scheduling initiated",
            "agents_involved": ["Followup"]
        }
    
    async def _handle_intake(
        self, 
        request: Dict[str, Any], 
        request_id: str,
        session_id: str
    ) -> Dict[str, Any]:
        """Handle patient intake form submission"""
        logger.info(f"[{request_id}] Routing to Intake agent")
        
        return {
            "request_id": request_id,
            "session_id": session_id,
            "success": True,
            "message": "Patient intake processing initiated",
            "agents_involved": ["Intake"]
        }

In [9]:
# --- Main System Execution ---

class HealthcareAgentSystem:
    """Complete Healthcare Administrative Assistant System"""
    
    def __init__(self):
        self.orchestrator = OrchestratorAgent()
        self.intake_agent = IntakeAgent()
        self.scheduling_agent = SchedulingAgent()
        self.verification_agent = VerificationAgent()
        self.followup_agent = FollowupAgent()
        
        logger.info("✓ Healthcare Agent System initialized with all agents")
    
    async def process_new_patient_workflow(self):
        """Process complete new patient appointment workflow"""
        logger.info("\n" + "="*70)
        logger.info("WORKFLOW 1: NEW PATIENT APPOINTMENT")
        logger.info("="*70)
        
        # Step 1: Intake
        logger.info("\n[STEP 1] INTAKE AGENT - Process Patient Information")
        logger.info("-" * 70)
        intake_request = {
            "request_id": "DEMO_001",
            "patient_info": {
                "first_name": "John",
                "last_name": "Doe",
                "email": "john.doe@example.com",
                "phone": "+12125551234",
                "date_of_birth": "1985-01-15",
                "gender": "M",
                "address": "123 Main St",
                "city": "New York",
                "state": "NY",
                "zip_code": "10001",
                "medical_history": "Type 2 diabetes, hypertension",
                "allergies": ["Penicillin"],
                "current_medications": ["Metformin", "Lisinopril"],
                "insurance_provider": "Blue Shield",
                "insurance_id": "BSC123456",
                "insurance_group_number": "GRP789"
            }
        }
        
        intake_response = await self.intake_agent.process(intake_request)
        logger.info(f"✓ Intake Response: Patient {intake_response['patient_name']} registered")
        logger.info(f"  Patient ID: {intake_response['patient_id']}")
        logger.info(f"  Critical Info: {intake_response['critical_info']['critical_flags']}")
        
        patient_id = intake_response['patient_id']
        
        # Step 2: Check Availability
        logger.info("\n[STEP 2] SCHEDULING AGENT - Check Availability")
        logger.info("-" * 70)
        tomorrow = (datetime.now() + timedelta(days=1)).isoformat()
        
        availability_request = {
            "request_id": "DEMO_001",
            "patient_id": patient_id,
            "appointment_action": "check_availability",
            "preferred_date": tomorrow,
            "appointment_type": "checkup",
            "duration_minutes": 30
        }
        
        availability_response = await self.scheduling_agent.process(availability_request)
        logger.info(f"✓ Found {availability_response['total_slots']} available slots")
        for i, slot in enumerate(availability_response['available_slots'][:2]):
            logger.info(f"  Slot {i+1}: {slot['start_time']} - {slot['provider_name']}")
        
        # Step 3: Book Appointment
        logger.info("\n[STEP 3] SCHEDULING AGENT - Book Appointment")
        logger.info("-" * 70)
        booking_request = {
            "request_id": "DEMO_001",
            "patient_id": patient_id,
            "appointment_action": "book",
            "appointment_type": "checkup",
            "preferred_date": availability_response['available_slots'][0]['start_time'],
            "preferred_provider": "PROV_001"
        }
        
        booking_response = await self.scheduling_agent.process(booking_request)
        logger.info(f"✓ Appointment Booked: {booking_response['appointment_id']}")
        logger.info(f"  Date/Time: {booking_response['appointment_datetime']}")
        logger.info(f"  Provider: {booking_response['provider_name']}")
        logger.info(f"  Location: {booking_response['location']}")
        
        appointment_id = booking_response['appointment_id']
        
        # Step 4: Verify Insurance
        logger.info("\n[STEP 4] VERIFICATION AGENT - Insurance Verification")
        logger.info("-" * 70)
        verification_request = {
            "request_id": "DEMO_001",
            "patient_id": patient_id,
            "insurance_provider": "Blue Shield",
            "insurance_id": "BSC123456"
        }
        
        verification_response = await self.verification_agent.process(verification_request)
        if verification_response['success']:
            logger.info(f"✓ Insurance Verified: {verification_response['coverage_status']}")
            logger.info(f"  Copay: ${verification_response['copay']}")
            logger.info(f"  Estimated Cost: ${verification_response['estimated_appointment_cost']}")
        else:
            logger.warning(f"✗ Insurance Verification Failed: {verification_response['error']}")
        
        # Step 5: Schedule Reminders
        logger.info("\n[STEP 5] FOLLOWUP AGENT - Schedule Reminders")
        logger.info("-" * 70)
        reminder_request = {
            "request_id": "DEMO_001",
            "appointment_id": appointment_id,
            "followup_action": "schedule_reminder",
            "appointment_datetime": booking_response['appointment_datetime'],
            "patient_email": "john.doe@example.com",
            "patient_phone": "+12125551234",
            "provider_name": booking_response['provider_name'],
            "location": booking_response['location']
        }
        
        reminder_response = await self.followup_agent.process(reminder_request)
        logger.info(f"✓ Reminders Scheduled: {reminder_response['reminders_scheduled']}")
        for reminder in reminder_response['reminders']:
            logger.info(f"  - {reminder['type']}: {reminder['scheduled_time']}")
        
        logger.info("\n" + "="*70)
        logger.info("✓ NEW PATIENT WORKFLOW COMPLETE")
        logger.info("="*70)
        
        return {
            "patient_id": patient_id,
            "appointment_id": appointment_id,
            "status": "complete"
        }
    
    async def process_reschedule_workflow(self, appointment_id: str):
        """Process appointment rescheduling workflow"""
        logger.info("\n" + "="*70)
        logger.info("WORKFLOW 2: RESCHEDULE APPOINTMENT")
        logger.info("="*70)
        
        # Step 1: Check new availability
        logger.info("\n[STEP 1] SCHEDULING AGENT - Check New Availability")
        logger.info("-" * 70)
        new_date = (datetime.now() + timedelta(days=3)).isoformat()
        
        availability_request = {
            "request_id": "DEMO_002",
            "appointment_action": "check_availability",
            "preferred_date": new_date,
            "appointment_type": "checkup"
        }
        
        availability_response = await self.scheduling_agent.process(availability_request)
        logger.info(f"✓ Found {availability_response['total_slots']} available slots")
        
        # Step 2: Reschedule
        logger.info("\n[STEP 2] SCHEDULING AGENT - Reschedule Appointment")
        logger.info("-" * 70)
        reschedule_request = {
            "request_id": "DEMO_002",
            "appointment_id": appointment_id,
            "appointment_action": "reschedule",
            "new_date": availability_response['available_slots'][0]['start_time'],
            "reason": "Schedule conflict"
        }
        
        reschedule_response = await self.scheduling_agent.process(reschedule_request)
        logger.info(f"✓ Appointment Rescheduled")
        logger.info(f"  Old Date: {reschedule_response['old_datetime']}")
        logger.info(f"  New Date: {reschedule_response['new_datetime']}")
        
        # Step 3: Cancel old reminders and schedule new ones
        logger.info("\n[STEP 3] FOLLOWUP AGENT - Update Reminders")
        logger.info("-" * 70)
        
        cancel_request = {
            "request_id": "DEMO_002",
            "appointment_id": appointment_id,
            "followup_action": "cancel_reminders"
        }
        
        cancel_response = await self.followup_agent.process(cancel_request)
        logger.info(f"✓ Old Reminders Cancelled: {cancel_response['reminders_cancelled']}")
        
        schedule_request = {
            "request_id": "DEMO_002",
            "appointment_id": appointment_id,
            "followup_action": "schedule_reminder",
            "appointment_datetime": reschedule_response['new_datetime'],
            "patient_email": "john.doe@example.com",
            "patient_phone": "+12125551234",
            "provider_name": "Dr. Jane Smith",
            "location": "Downtown Clinic"
        }
        
        schedule_response = await self.followup_agent.process(schedule_request)
        logger.info(f"✓ New Reminders Scheduled: {schedule_response['reminders_scheduled']}")
        
        logger.info("\n" + "="*70)
        logger.info("✓ RESCHEDULE WORKFLOW COMPLETE")
        logger.info("="*70)
    
    async def process_no_show_workflow(self, appointment_id: str, patient_id: str):
        """Process no-show handling workflow"""
        logger.info("\n" + "="*70)
        logger.info("WORKFLOW 3: NO-SHOW HANDLING")
        logger.info("="*70)
        
        logger.info("\n[STEP 1] FOLLOWUP AGENT - Process No-Show")
        logger.info("-" * 70)
        
        no_show_request = {
            "request_id": "DEMO_003",
            "appointment_id": appointment_id,
            "patient_id": patient_id,
            "followup_action": "process_no_show",
            "patient_email": "john.doe@example.com",
            "patient_phone": "+12125551234"
        }
        
        no_show_response = await self.followup_agent.process(no_show_request)
        logger.info(f"✓ No-Show Recorded and Processed")
        for action in no_show_response['actions_taken']:
            logger.info(f"  ✓ {action}")
        
        logger.info("\n" + "="*70)
        logger.info("✓ NO-SHOW WORKFLOW COMPLETE")
        logger.info("="*70)

async def main():
    """Main demo function"""
    logger.info("\n")
    logger.info("╔" + "="*68 + "╗")
    logger.info("║" + " "*15 + "HEALTHCARE ADMINISTRATIVE ASSISTANT" + " "*17 + "║")
    logger.info("║" + " "*22 + "Multi-Agent System Demo" + " "*23 + "║")
    logger.info("╚" + "="*68 + "╝")
    
    # Initialize system
    system = HealthcareAgentSystem()
    
    # Run workflows
    try:
        # Workflow 1: New Patient Appointment
        workflow1_result = await system.process_new_patient_workflow()
        
        # Workflow 2: Reschedule
        await system.process_reschedule_workflow(workflow1_result['appointment_id'])
        
        # Workflow 3: No-Show Handling
        await system.process_no_show_workflow(
            workflow1_result['appointment_id'],
            workflow1_result['patient_id']
        )
        
        logger.info("\n" + "="*70)
        logger.info("✓✓✓ ALL WORKFLOWS COMPLETED SUCCESSFULLY ✓✓✓")
        logger.info("="*70)
    
    except Exception as e:
        logger.error(f"Error running workflows: {str(e)}", exc_info=True)

# Run the main function
await main()

2025-11-26 20:31:46,449 - __main__ - INFO - 

2025-11-26 20:31:46,452 - __main__ - INFO - ║               HEALTHCARE ADMINISTRATIVE ASSISTANT                 ║
2025-11-26 20:31:46,452 - __main__ - INFO - ║                      Multi-Agent System Demo                       ║
2025-11-26 20:31:46,454 - __main__ - INFO - ✓ Healthcare Agent System initialized with all agents
2025-11-26 20:31:46,455 - __main__ - INFO - 
2025-11-26 20:31:46,455 - __main__ - INFO - WORKFLOW 1: NEW PATIENT APPOINTMENT
2025-11-26 20:31:46,457 - __main__ - INFO - 
[STEP 1] INTAKE AGENT - Process Patient Information
2025-11-26 20:31:46,459 - __main__ - INFO - ----------------------------------------------------------------------
2025-11-26 20:31:46,459 - __main__ - INFO - [DEMO_001] Intake Agent processing: John Doe
2025-11-26 20:31:46,460 - __main__ - INFO - Agent Action: {'timestamp': '2025-11-26T20:31:46.460604', 'agent': 'IntakeAgent', 'action': 'intake_processed', 'session_id': None, 'details': {'request_id':