# Case Study 1: Claim Resubmission Ingestion Pipeline

This notebook implements a comprehensive pipeline for processing insurance claim data from multiple Electronic Medical Records (EMR) systems. The solution addresses key data engineering challenges and evaluation criteria:

**Core Functionality:**
1. Data Ingestion: processing of multiple EMR sources with different formats
2. Schema Normalization: data wrangling and mapping skills
3. Eligibility Analysis : logic for resubmission eligibility
4. Output Generation: JSON output for downstream automation systems

**Key Evaluation Criteria Addressed:**
- Data Wrangling & Schema Mapping : Handles CSV and JSON formats with different field structures
- Data Handling: Graceful processing of malformed, missing, and inconsistent data
- Modular Code: Object-oriented design with specialized processing engines
- Clear Communication: Comprehensive logging, comments, and structured output
- Ambiguity Handling: Mock LLM classifier for uncertain denial reasons

**Final Deliverables:**
- Working Jupter notebook with all processing steps
- resubmission_candidates.json output file
- Comprehensive metrics and logging
- error handling for edge cases

### 1. Environment Setup and Library Imports

First, we will import all necessary libraries for data processing, file handling, and logging.

In [21]:
# Import necessary libraries for healthcare claims processing
import pandas as pd
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Union, Tuple
from pathlib import Path
import warnings
import numpy as np
import random
import re
from dataclasses import dataclass, asdict
from enum import Enum

# Configure comprehensive logging for pipeline monitoring
# This addresses the "Communication" evaluation criteria
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('healthcare_pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Suppress pandas warnings for cleaner output
warnings.filterwarnings('ignore')

### 2. Data Ingestion Functions

These functions handle reading data from different EMR sources with robust error handling.

In [22]:
def ingest_csv_data(filepath: str) -> List[Dict]:
    """
    Ingest data from CSV format (EMR Alpha system).
    Implements robust error handling for malformed data.
    
    Args:
        filepath: Path to the CSV file
        
    Returns:
        List of dictionaries containing claim data
        
    Error Handling:
        - File not found scenarios
        - Empty or corrupted files
        - Encoding issues
        - Malformed CSV structure
    """
    try:
        df = pd.read_csv(filepath)
        logger.info(f"Successfully loaded {len(df)} records from {filepath}")
        
        # Convert to list of dictionaries for uniform processing
        records = df.to_dict('records')
        return records
    except FileNotFoundError:
        logger.error(f"File not found: {filepath}")
        return []
    except pd.errors.EmptyDataError:
        logger.error(f"Empty or malformed CSV file: {filepath}")
        return []
    except pd.errors.ParserError as e:
        logger.error(f"CSV parsing error in {filepath}: {str(e)}")
        return []
    except Exception as e:
        logger.error(f"Unexpected error reading CSV file {filepath}: {str(e)}")
        return []

def ingest_json_data(filepath: str) -> List[Dict]:
    """
    Ingest data from JSON format (EMR Beta system).
    Handles various JSON structure inconsistencies.
    
    Args:
        filepath: Path to the JSON file
        
    Returns:
        List of dictionaries containing claim data
        
    Error Handling:
        - Invalid JSON syntax
        - Unexpected JSON structure
        - Encoding issues
        - File access problems
    """
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        # Handle case where JSON is not a list
        if not isinstance(data, list):
            data = [data] if isinstance(data, dict) else []
        
        logger.info(f"Successfully loaded {len(data)} records from {filepath}")
        return data
    except FileNotFoundError:
        logger.error(f"File not found: {filepath}")
        return []
    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON format in {filepath}: {str(e)}")
        return []
    except Exception as e:
        logger.error(f"Unexpected error reading JSON file {filepath}: {str(e)}")
        return []

### 3. Schema Normalization 

These functions transform data from different EMR sources into a unified schema, addressing the core evaluation criteria for data wrangling skills. The normalization process handles:

- Field Mapping: Different field names across systems (e.g., 'member' vs 'patient_id')
- Data Type Conversion: Consistent data types across sources
- Date Normalization: Multiple date formats to ISO standard
- Null Handling: Various representations of missing data
- Error Recovery: Graceful handling of malformed records

**Unified Schema Output:**
```json
{
    "claim_id": "string",
    "patient_id": "string or null", 
    "procedure_code": "string",
    "denial_reason": "string or null",
    "status": "string (normalized to lowercase)",
    "submitted_at": "ISO datetime string",
    "source_system": "alpha or beta"
}
```

In [23]:
def normalize_alpha_record(record: Dict) -> Optional[Dict]:
    """
    Normalize a record from EMR Alpha (CSV format).
    Handles inconsistent data representations and missing values.
    
    Args:
        record: Raw record from alpha source
        
    Returns:
        Normalized record following unified schema or None if invalid
        
    Data Handling Features:
        - Null value standardization (empty strings, NaN, 'None' text)
        - Date format conversion (YYYY-MM-DD to ISO)
        - Status normalization (lowercase)
        - Error recovery for malformed records
    """
    try:
        # Handle multiple representations of missing patient_id
        patient_id = record.get('patient_id')
        if pd.isna(patient_id) or patient_id == '' or patient_id == 'None':
            patient_id = None
        
        # Normalize denial reason handling
        denial_reason = record.get('denial_reason')
        if pd.isna(denial_reason) or denial_reason == 'None' or denial_reason == '':
            denial_reason = None
        
        # Parse and normalize date with error handling
        submitted_at = record.get('submitted_at')
        normalized_date = None
        if submitted_at and not pd.isna(submitted_at):
            try:
                # Primary format: YYYY-MM-DD
                normalized_date = datetime.strptime(str(submitted_at), "%Y-%m-%d").isoformat()
            except ValueError:
                # Alternative formats for robust handling
                for fmt in ["%m/%d/%Y", "%d/%m/%Y", "%Y-%m-%d %H:%M:%S"]:
                    try:
                        normalized_date = datetime.strptime(str(submitted_at), fmt).isoformat()
                        break
                    except ValueError:
                        continue
                else:
                    logger.warning(f"Could not parse date: {submitted_at}")
        
        # Create normalized record with unified schema
        normalized = {
            "claim_id": record.get('claim_id'),
            "patient_id": patient_id,
            "procedure_code": record.get('procedure_code'),
            "denial_reason": denial_reason,
            "status": str(record.get('status', '')).lower(),
            "submitted_at": normalized_date,
            "source_system": "alpha"
        }
        
        return normalized
        
    except Exception as e:
        logger.warning(f"Error normalizing alpha record {record}: {str(e)}")
        return None

def normalize_beta_record(record: Dict) -> Optional[Dict]:
    """
    Normalize a record from EMR Beta (JSON format).
    Maps different field names to unified schema.
    
    Args:
        record: Raw record from beta source
        
    Returns:
        Normalized record following unified schema or None if invalid
        
    Schema Mapping:
        - 'member' -> 'patient_id'
        - 'id' -> 'claim_id'
        - 'code' -> 'procedure_code'
        - 'error_msg' -> 'denial_reason'
        - 'date' -> 'submitted_at' (ISO format conversion)
    """
    try:
        # Map beta field 'member' to unified 'patient_id'
        patient_id = record.get('member')
        if patient_id is None or patient_id == '' or str(patient_id).lower() == 'none':
            patient_id = None
        
        # Map beta field 'error_msg' to unified 'denial_reason'
        denial_reason = record.get('error_msg')
        if denial_reason is None or denial_reason == '' or str(denial_reason).lower() == 'none':
            denial_reason = None
        
        # Parse ISO datetime format from beta system
        submitted_at = record.get('date')
        normalized_date = None
        if submitted_at and not pd.isna(submitted_at):
            try:
                # Handle ISO format with potential timezone information
                dt = datetime.fromisoformat(str(submitted_at).replace('T00:00:00', ''))
                normalized_date = dt.isoformat()
            except ValueError as e:
                logger.warning(f"Could not parse beta date format: {submitted_at}")
        
        # Create normalized record with unified schema
        normalized = {
            "claim_id": record.get('id'),
            "patient_id": patient_id,
            "procedure_code": record.get('code'),
            "denial_reason": denial_reason,
            "status": str(record.get('status', '')).lower(),
            "submitted_at": normalized_date,
            "source_system": "beta"
        }
        
        return normalized
        
    except Exception as e:
        logger.warning(f"Error normalizing beta record {record}: {str(e)}")
        return None

### 4. Denial Reason Classification System

**Handling Ambiguity and Incomplete Input**

This section implements intelligent classification for denial reasons, addressing the evaluation criteria for "Ability to handle ambiguity and incomplete input." The system uses:

- Rule-Based Classification: Clear business rules for known denial reasons
- Heuristic Mapping: Intelligent handling of ambiguous cases
- Mock Classifier: Simulates advanced AI classification for uncertain scenarios
- Fallback Logic: Graceful handling of null/missing denial reasons

**Classification Categories:**
- Retryable: Claims that can potentially be corrected and resubmitted
- Non-Retryable: Claims with fundamental issues that cannot be easily resolved

In [24]:
RETRYABLE_REASONS = {
    "missing modifier",
    "incorrect npi", 
    "prior auth required"
}

NON_RETRYABLE_REASONS = {
    "authorization expired",
    "incorrect provider type"
}

AMBIGUOUS_MAPPINGS = {
    "incorrect procedure": "retryable",  
    "form incomplete": "retryable",     
    "not billable": "non_retryable",    
    None: "non_retryable"               
}

def classify_denial_reason(denial_reason: Optional[str]) -> str:
    """
    Classify denial reason as retryable or non-retryable.
    Implements mocked classifier for ambiguous cases as per requirements.
    
    This function demonstrates:
    - Clear, testable logic for classification
    - Handling of ambiguous and incomplete input
    - Robust fallback mechanisms
    
    Args:
        denial_reason: The denial reason text (may be None)
        
    Returns:
        Classification result: 'retryable' or 'non_retryable'
    """
    # Handle None/missing denial reasons gracefully
    if denial_reason is None:
        logger.info("No denial reason provided, classifying as non-retryable")
        return "non_retryable"
    
    # Normalize for consistent comparison
    normalized_reason = denial_reason.lower().strip()
    
    # Apply explicit business rules first
    if normalized_reason in RETRYABLE_REASONS:
        logger.info(f"'{denial_reason}' matched explicit retryable rule")
        return "retryable"
    
    if normalized_reason in NON_RETRYABLE_REASONS:
        logger.info(f"'{denial_reason}' matched explicit non-retryable rule")
        return "non_retryable"
    
    # Handle ambiguous cases with mocked classifier logic
    if normalized_reason in AMBIGUOUS_MAPPINGS:
        result = AMBIGUOUS_MAPPINGS[normalized_reason]
        logger.info(f"'{denial_reason}' classified as {result} via ambiguous mapping")
        return result
    # This simulates an AI/ML classifier for handling uncertainty
    retryable_keywords = ['missing', 'incorrect', 'incomplete', 'required', 'invalid', 'wrong']
    non_retryable_keywords = ['expired', 'unauthorized', 'not covered', 'excluded', 'terminated']
    
    # Check for retryable indicators
    for keyword in retryable_keywords:
        if keyword in normalized_reason:
            logger.info(f"'{denial_reason}' classified as retryable based on keyword '{keyword}'")
            return "retryable"
    # Check for non-retryable indicators
    for keyword in non_retryable_keywords:
        if keyword in normalized_reason:
            logger.info(f"'{denial_reason}' classified as non-retryable based on keyword '{keyword}'")
            return "non_retryable"
    # Conservative fallback for completely unknown cases
    logger.warning(f"Unknown denial reason '{denial_reason}', defaulting to non-retryable")
    return "non_retryable"

def generate_recommended_changes(denial_reason: Optional[str]) -> str:
    """
    Generate actionable recommendations for claim resubmission.
    
    Args:
        denial_reason: The denial reason
        
    Returns:
        Specific recommendation string for claim processing teams
    """
    if denial_reason is None:
        return "Review claim details and resubmit with complete information"
    
    normalized_reason = denial_reason.lower().strip()
    
    # Specific recommendations for known issues
    recommendations = {
        "missing modifier": "Add appropriate CPT modifier codes and resubmit",
        "incorrect npi": "Review NPI number and resubmit",
        "prior auth required": "Obtain prior authorization before resubmission",
        "incorrect procedure": "Verify procedure code accuracy against documentation",
        "form incomplete": "Complete all required fields including diagnosis codes"
    }
    
    return recommendations.get(normalized_reason, 
                             f"Review and correct '{denial_reason}' issue before resubmission")

# Validation and testing
print("Denial reason classification system configured")
print("Mocked classifier implemented for ambiguous cases")
print(f"Explicit retryable rules: {len(RETRYABLE_REASONS)}")
print(f"Explicit non-retryable rules: {len(NON_RETRYABLE_REASONS)}")
print(f"Ambiguous case mappings: {len(AMBIGUOUS_MAPPINGS)}")
print("Classification system ready for processing")

Denial reason classification system configured
Mocked classifier implemented for ambiguous cases
Explicit retryable rules: 3
Explicit non-retryable rules: 2
Ambiguous case mappings: 4
Classification system ready for processing


### 5. Resubmission Eligibility Logic

This section implements the core business logic for determining claim resubmission eligibility.
**Eligibility Criteria:**

1. Status Check: Claim must have 'denied' status
2. Patient Validation: Patient ID must be present and valid
3. Timing Requirements: Minimum 7-day waiting period after submission
4. Denial Classification: Reason must be classified as 'retryable'

In [25]:
# Current business date for calculations (we assume today is 30 july, 2025)
REFERENCE_DATE = datetime(2025, 7, 30) 
# Minimum waiting period per business rules
MIN_DAYS_FOR_RESUBMISSION = 7  

def is_eligible_for_resubmission(claim: Dict) -> Tuple[bool, str]:
    """
    Determine if a claim is eligible for automated resubmission.
    
    This function implements clear, testable business logic with comprehensive
    validation and transparent decision making.
    
    Eligibility Criteria (All must be satisfied):
    1. Status must be 'denied'
    2. Patient ID must not be null (data integrity requirement)
    3. Claim must be submitted more than 7 days ago (business rule)
    4. Denial reason must be classified as 'retryable' (intelligent classification)
    
    Args:
        claim: Normalized claim record dictionary
        
    Returns:
        Tuple of (is_eligible: bool, decision_reason: str)
        
    Example:
        eligible, reason = is_eligible_for_resubmission(claim)
        if eligible:
            # Process for resubmission
        else:
            # Log exclusion reason
    """
    # Claims must be explicitly denied to be eligible for resubmission
    claim_status = claim.get('status', '').lower()
    if claim_status != 'denied':
        return False, f"Status is '{claim_status}', not 'denied'"
    # Patient ID is required for claim processing and tracking
    patient_id = claim.get('patient_id')
    if patient_id is None or str(patient_id).strip() == '':
        return False, "Patient ID is null or empty"
    # Claims need minimum waiting period before resubmission attempt
    submitted_at = claim.get('submitted_at')
    if submitted_at is None:
        return False, "Submission date is missing"
    
    try:
        submission_date = datetime.fromisoformat(submitted_at)
        days_since_submission = (REFERENCE_DATE - submission_date).days
        
        if days_since_submission <= MIN_DAYS_FOR_RESUBMISSION:
            return False, f"Submitted only {days_since_submission} days ago (requires >{MIN_DAYS_FOR_RESUBMISSION} days)"
    except ValueError as e:
        logger.error(f"Error parsing submission date '{submitted_at}': {str(e)}")
        return False, "Invalid submission date format"
    except Exception as e:
        logger.error(f"Unexpected error processing submission date: {str(e)}")
        return False, "Error processing submission date"
    # Only retryable denial reasons should be resubmitted
    denial_reason = claim.get('denial_reason')
    classification = classify_denial_reason(denial_reason)
    
    if classification != 'retryable':
        return False, f"Denial reason '{denial_reason}' classified as {classification}"
    
    # All criteria satisfied - eligible for resubmission
    return True, f"Eligible: denied claim with retryable reason, submitted {days_since_submission} days ago"

def create_resubmission_candidate(claim: Dict) -> Dict:
    """
    Create a resubmission candidate record in the required output format.
    
    This function generates the final deliverable format as specified:
    {
        "claim_id": "A124",
        "resubmission_reason": "Incorrect NPI", 
        "source_system": "alpha",
        "recommended_changes": "Review NPI number and resubmit"
    }
    
    Args:
        claim: Eligible normalized claim record
        
    Returns:
        Dictionary in the required resubmission candidate format
    """
    return {
        "claim_id": claim['claim_id'],
        "resubmission_reason": claim['denial_reason'],
        "source_system": claim['source_system'],
        "recommended_changes": generate_recommended_changes(claim['denial_reason'])
    }
print("Resubmission eligibility logic configured")
print(f"Reference date: {REFERENCE_DATE.strftime('%Y-%m-%d')}")
print(f"Minimum resubmission period: {MIN_DAYS_FOR_RESUBMISSION} days")
print("Clear, testable business rules implemented")
print("Ready to evaluate claims for automated resubmission")

Resubmission eligibility logic configured
Reference date: 2025-07-30
Minimum resubmission period: 7 days
Clear, testable business rules implemented
Ready to evaluate claims for automated resubmission


### 6. Main Data Processing Pipeline
This section implements the main orchestration pipeline:

- Modular Design: Integration of all specialized components
- Error Handling: Graceful processing of malformed data
- Detailed Logging: Complete audit trail for all operations
- Metrics Collection: Comprehensive tracking for final deliverables

**Pipeline Steps:**
1. Data Ingestion: Load data from multiple EMR sources
2. Schema Normalization: Transform to unified format
3. Eligibility Analysis: Apply business rules for resubmission
4. Output Generation: Create final deliverable format

In [26]:
class HealthcarePipelineMetrics:
    """Class to track pipeline execution metrics."""
    
    def __init__(self):
        self.total_claims_processed = 0
        self.claims_by_source = {}
        self.flagged_for_resubmission = 0
        self.excluded_claims = 0
        self.exclusion_reasons = {}
        self.processing_errors = 0
    
    def add_exclusion_reason(self, reason: str):
        """Add an exclusion reason to metrics."""
        if reason not in self.exclusion_reasons:
            self.exclusion_reasons[reason] = 0
        self.exclusion_reasons[reason] += 1

def process_healthcare_claims(alpha_file: str, beta_file: str) -> tuple[List[Dict], HealthcarePipelineMetrics]:
    """
    Main processing function to ingest, normalize, and analyze claims.
    
    Args:
        alpha_file: Path to alpha EMR CSV file
        beta_file: Path to beta EMR JSON file
        
    Returns:
        Tuple of (resubmission_candidates, metrics)
    """
    logger.info("Starting healthcare claims processing pipeline")
    metrics = HealthcarePipelineMetrics()
    
    try:
        # step 1: load data from both sources
        logger.info("Ingesting data from EMR sources...")
        alpha_records = ingest_csv_data(alpha_file)
        beta_records = ingest_json_data(beta_file)
        
        metrics.claims_by_source['alpha'] = len(alpha_records)
        metrics.claims_by_source['beta'] = len(beta_records)
        
        # step 2: Normalize all records
        logger.info("Normalizing schemas...")
        normalized_claims = []
        
        # [rocess alpha records (csv files record)
        for record in alpha_records:
            try:
                normalized = normalize_alpha_record(record)
                if normalized:
                    normalized_claims.append(normalized)
                else:
                    metrics.processing_errors += 1
            except Exception as e:
                logger.error(f"Error processing alpha record: {e}")
                metrics.processing_errors += 1
        
        # process beta records (json file records)
        for record in beta_records:
            try:
                normalized = normalize_beta_record(record)
                if normalized:
                    normalized_claims.append(normalized)
                else:
                    metrics.processing_errors += 1
            except Exception as e:
                logger.error(f"Error processing beta record: {e}")
                metrics.processing_errors += 1
        
        metrics.total_claims_processed = len(normalized_claims)
        logger.info(f"Normalized {len(normalized_claims)} total claims")
        
        # Step 3: analyze eligibility for resubmission
        logger.info("Analyzing resubmission eligibility...")
        resubmission_candidates = []
        
        for claim in normalized_claims:
            try:
                is_eligible, reason = is_eligible_for_resubmission(claim)
                
                if is_eligible:
                    candidate = create_resubmission_candidate(claim)
                    resubmission_candidates.append(candidate)
                    metrics.flagged_for_resubmission += 1
                    logger.debug(f"Claim {claim['claim_id']} eligible: {reason}")
                else:
                    metrics.excluded_claims += 1
                    metrics.add_exclusion_reason(reason)
                    logger.debug(f"Claim {claim['claim_id']} excluded: {reason}")
                    
            except Exception as e:
                logger.error(f"Error analyzing claim {claim.get('claim_id', 'unknown')}: {e}")
                metrics.processing_errors += 1
        
        logger.info(f"Pipeline completed successfully!")
        logger.info(f"Identified {len(resubmission_candidates)} claims for resubmission")
        
        return resubmission_candidates, metrics
        
    except Exception as e:
        logger.error(f"Pipeline execution failed: {str(e)}")
        raise

### 7. Output Generation and Validation

Functions to generate final output and validate the structure for downstream systems.

In [27]:
def validate_resubmission_candidates(candidates: List[Dict]) -> bool:
    """
    Validate the structure of resubmission candidates.
    
    Args:
        candidates: List of resubmission candidate records
        
    Returns:
        True if all candidates are valid, False otherwise
    """
    required_fields = {'claim_id', 'resubmission_reason', 'source_system', 'recommended_changes'}
    
    for i, candidate in enumerate(candidates):
        if not isinstance(candidate, dict):
            logger.error(f"Candidate {i} is not a dictionary")
            return False
        
        missing_fields = required_fields - set(candidate.keys())
        if missing_fields:
            logger.error(f"Candidate {i} missing fields: {missing_fields}")
            return False
        
        # Validate field types
        if not isinstance(candidate['claim_id'], str):
            logger.error(f"Candidate {i} claim_id is not a string")
            return False
    
    logger.info(f"All {len(candidates)} candidates passed validation")
    return True

def save_resubmission_candidates(candidates: List[Dict], output_file: str = "resubmission_candidates.json") -> bool:
    """
    Save resubmission candidates to JSON file with validation.
    
    Args:
        candidates: List of resubmission candidates
        output_file: Output filename
        
    Returns:
        True if saved successfully, False otherwise
    """
    try:
        # Validate before saving
        if not validate_resubmission_candidates(candidates):
            logger.error("Validation failed, not saving file")
            return False
        
        # Save to JSON file
        with open(output_file, 'w') as f:
            json.dump(candidates, f, indent=2, ensure_ascii=False)
        
        logger.info(f"Results saved to {output_file}")
        
        # Verify file was created and readable
        try:
            with open(output_file, 'r') as f:
                verification_data = json.load(f)
            logger.info(f"File verification successful: {len(verification_data)} records")
            return True
        except Exception as e:
            logger.error(f"File verification failed: {e}")
            return False
            
    except Exception as e:
        logger.error(f"Error saving results: {str(e)}")
        return False

def display_sample_output(candidates: List[Dict], max_samples: int = 3):
    """
    Display sample resubmission candidates for review.
    
    Args:
        candidates: List of resubmission candidates
        max_samples: Maximum number of samples to display
    """
    print(f"\n{'='*60}")
    print("SAMPLE RESUBMISSION CANDIDATES")
    print(f"{'='*60}")
    
    if not candidates:
        print("No candidates found for resubmission")
        return
    
    for i, candidate in enumerate(candidates[:max_samples], 1):
        print(f"\n{i}. Claim ID: {candidate['claim_id']}")
        print(f"Denial Reason: {candidate['resubmission_reason']}")
        print(f"Source System: {candidate['source_system'].upper()}")
        print(f"Recommended Action: {candidate['recommended_changes']}")
    
    if len(candidates) > max_samples:
        print(f"\n... and {len(candidates) - max_samples} more candidates")
    
    print(f"\n{'='*60}")

### 8. Logging and Metrics Collection

Comprehensive reporting system for pipeline execution metrics and performance analysis.

In [28]:
def generate_pipeline_report(metrics: HealthcarePipelineMetrics, candidates: List[Dict]):
    """
    Generate comprehensive pipeline execution report.
    
    Args:
        metrics: Pipeline execution metrics
        candidates: List of resubmission candidates
    """
    print(f"\n{'='*70}")
    print("HEALTHCARE CLAIMS PIPELINE EXECUTION REPORT")
    print(f"{'='*70}")
    print(f"Execution Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Reference Date: {REFERENCE_DATE.strftime('%Y-%m-%d')}")
    print(f"\nPROCESSING SUMMARY")
    print(f"{'─'*40}")
    print(f"Total claims processed: {metrics.total_claims_processed}")
    print(f"Processing errors: {metrics.processing_errors}")
    print(f"Success rate: {((metrics.total_claims_processed - metrics.processing_errors) / max(metrics.total_claims_processed, 1) * 100):.1f}%")
    
    print(f"\n SOURCE BREAKDOWN")
    print(f"{'─'*40}")
    total_source_claims = sum(metrics.claims_by_source.values())
    for source, count in metrics.claims_by_source.items():
        print(f"EMR {source.upper()}: {count} claims")
    
    print(f"\nELIGIBILITY ANALYSIS")
    print(f"{'─'*40}")
    print(f"Claims flagged for resubmission: {metrics.flagged_for_resubmission}")
    print(f"Claims excluded: {metrics.excluded_claims}")
    
    if metrics.exclusion_reasons:
        print(f"\n EXCLUSION REASONS")
        print(f"{'─'*40}")
        for reason, count in sorted(metrics.exclusion_reasons.items(), key=lambda x: x[1], reverse=True):
            print(f"{reason}: {count}")
    
    print(f"\n BUSINESS IMPACT")
    print(f"{'─'*40}")
    if metrics.flagged_for_resubmission > 0:
        print(f" {metrics.flagged_for_resubmission} claims ready for automated resubmission")
        print(f" Potential revenue recovery opportunity identified")
        print(f" Recommended next steps: Review and process flagged claims")
    else:
        print(f"No claims currently eligible for resubmission")
        print(f"Recommended: Review exclusion reasons for improvement opportunities")
    
    print(f"\n QUALITY METRICS")
    print(f"{'─'*40}")
    if candidates:
        source_distribution = {}
        for candidate in candidates:
            source = candidate['source_system']
            source_distribution[source] = source_distribution.get(source, 0) + 1
        
        print("Resubmission candidates by source:")
        for source, count in source_distribution.items():
            print(f"EMR {source.upper()}: {count}")
    
    print(f"{'='*70}")

def save_metrics_report(metrics: HealthcarePipelineMetrics, output_file: str = "pipeline_metrics.json"):
    """
    Save detailed metrics to JSON file for further analysis.
    
    Args:
        metrics: Pipeline execution metrics
        output_file: Output filename for metrics
    """
    try:
        metrics_dict = {
            "execution_timestamp": datetime.now().isoformat(),
            "reference_date": REFERENCE_DATE.isoformat(),
            "total_claims_processed": metrics.total_claims_processed,
            "claims_by_source": metrics.claims_by_source,
            "flagged_for_resubmission": metrics.flagged_for_resubmission,
            "excluded_claims": metrics.excluded_claims,
            "exclusion_reasons": metrics.exclusion_reasons,
            "processing_errors": metrics.processing_errors,
        }
        
        with open(output_file, 'w') as f:
            json.dump(metrics_dict, f, indent=2)
        
        logger.info(f"Metrics saved to {output_file}")
        return True
    except Exception as e:
        logger.error(f"Error saving metrics: {e}")
        return False

### 9. Pipeline Execution

Now let's run the complete healthcare claims processing pipeline with our sample data.

In [29]:
# define file paths for EMR data sources
ALPHA_FILE = "emr_alpha.csv"
BETA_FILE = "emr_beta.json"

# execute the complete pipeline
try:
    print("Starting Healthcare Claims Processing Pipeline")
    print("=" * 60)
    
    # process claims through the pipeline
    resubmission_candidates, pipeline_metrics = process_healthcare_claims(ALPHA_FILE, BETA_FILE)
    
    # generate and display comprehensive report
    generate_pipeline_report(pipeline_metrics, resubmission_candidates)
    
    # display sample results
    display_sample_output(resubmission_candidates, max_samples=5)
    
    # save results to files
    candidates_saved = save_resubmission_candidates(resubmission_candidates)
    metrics_saved = save_metrics_report(pipeline_metrics)
    
    if candidates_saved and metrics_saved:
        print(f"\n Pipeline execution completed successfully!")
        print(f" Output files generated:")
        print(f"resubmission_candidates.json")
        print(f"pipeline_metrics.json")
    else:
        print(f"\n  Pipeline completed with file save issues")
    
except Exception as e:
    print(f"\n Pipeline execution failed: {str(e)}")
    logger.error(f"Pipeline failure: {e}", exc_info=True)

2025-08-16 16:09:57,380 - INFO - Starting healthcare claims processing pipeline
2025-08-16 16:09:57,384 - INFO - Ingesting data from EMR sources...


Starting Healthcare Claims Processing Pipeline


2025-08-16 16:09:57,467 - INFO - Successfully loaded 6 records from emr_alpha.csv
2025-08-16 16:09:57,481 - INFO - Successfully loaded 4 records from emr_beta.json
2025-08-16 16:09:57,483 - INFO - Normalizing schemas...
2025-08-16 16:09:57,485 - INFO - Normalized 10 total claims
2025-08-16 16:09:57,487 - INFO - Analyzing resubmission eligibility...
2025-08-16 16:09:57,488 - INFO - 'Missing modifier' matched explicit retryable rule
2025-08-16 16:09:57,488 - INFO - 'Incorrect NPI' matched explicit retryable rule
2025-08-16 16:09:57,490 - INFO - 'Prior auth required' matched explicit retryable rule
2025-08-16 16:09:57,491 - INFO - 'Incorrect provider type' matched explicit non-retryable rule
2025-08-16 16:09:57,493 - INFO - 'Missing modifier' matched explicit retryable rule
2025-08-16 16:09:57,494 - INFO - Pipeline completed successfully!
2025-08-16 16:09:57,495 - INFO - Identified 4 claims for resubmission
2025-08-16 16:09:57,496 - INFO - All 4 candidates passed validation
2025-08-16 16:


HEALTHCARE CLAIMS PIPELINE EXECUTION REPORT
Execution Date: 2025-08-16 16:09:57
Reference Date: 2025-07-30

PROCESSING SUMMARY
────────────────────────────────────────
Total claims processed: 10
Processing errors: 0
Success rate: 100.0%

 SOURCE BREAKDOWN
────────────────────────────────────────
EMR ALPHA: 6 claims
EMR BETA: 4 claims

ELIGIBILITY ANALYSIS
────────────────────────────────────────
Claims flagged for resubmission: 4
Claims excluded: 6

 EXCLUSION REASONS
────────────────────────────────────────
Patient ID is null or empty: 3
Status is 'approved', not 'denied': 2
Denial reason 'Incorrect provider type' classified as non_retryable: 1

 BUSINESS IMPACT
────────────────────────────────────────
 4 claims ready for automated resubmission
 Potential revenue recovery opportunity identified
 Recommended next steps: Review and process flagged claims

 QUALITY METRICS
────────────────────────────────────────
Resubmission candidates by source:
EMR ALPHA: 3
EMR BETA: 1

SAMPLE RESUBM

### 10. Results Analysis and Verification

Let's examine the detailed results and verify our business logic implementation.

In [30]:
# Let's analyze the results in detail
print(" DETAILED RESULTS ANALYSIS")
print("=" * 50)

# Load the saved results for verification
try:
    with open("resubmission_candidates.json", 'r') as f:
        saved_candidates = json.load(f)
    
    print(f" Successfully loaded {len(saved_candidates)} candidates from file")
    
    # Analyze each original claim to understand the decision logic
    print(f"\n CLAIM-BY-CLAIM ANALYSIS")
    print("-" * 40)
    
    # Re-read original data for analysis
    alpha_data = ingest_csv_data(ALPHA_FILE)
    beta_data = ingest_json_data(BETA_FILE)
    
    all_claims = []
    
    # Process alpha claims
    for record in alpha_data:
        normalized = normalize_alpha_record(record)
        if normalized:
            all_claims.append(normalized)
    
    # Process beta claims
    for record in beta_data:
        normalized = normalize_beta_record(record)
        if normalized:
            all_claims.append(normalized)
    
    # Analyze each claim
    for i, claim in enumerate(all_claims, 1):
        is_eligible, reason = is_eligible_for_resubmission(claim)
        status_icon = "Ok" if is_eligible else "Not Okay"
        
        print(f"\n{i}. Claim {claim['claim_id']} ({claim['source_system'].upper()}) {status_icon}")
        print(f"   Status: {claim['status']}")
        print(f"   Patient ID: {claim['patient_id'] or 'NULL'}")
        print(f"   Denial Reason: {claim['denial_reason'] or 'None'}")
        print(f"   Submitted: {claim['submitted_at']}")
        
        if claim['submitted_at']:
            days_ago = (REFERENCE_DATE - datetime.fromisoformat(claim['submitted_at'])).days
            print(f"   Days Since Submission: {days_ago}")
        
        if claim['denial_reason']:
            classification = classify_denial_reason(claim['denial_reason'])
            print(f"   Reason Classification: {classification}")
        
        print(f"   Decision: {reason}")
    
    # Summary statistics
    print(f"\n SUMMARY STATISTICS")
    print("-" * 30)
    eligible_count = sum(1 for claim in all_claims if is_eligible_for_resubmission(claim)[0])
    print(f"Total claims analyzed: {len(all_claims)}")
    print(f"Eligible for resubmission: {eligible_count}")
    
except FileNotFoundError:
    print(" Results file not found. Please run the pipeline first.")
except Exception as e:
    print(f" Error analyzing results: {e}")

2025-08-16 16:09:57,539 - INFO - Successfully loaded 6 records from emr_alpha.csv
2025-08-16 16:09:57,544 - INFO - Successfully loaded 4 records from emr_beta.json
2025-08-16 16:09:57,546 - INFO - 'Missing modifier' matched explicit retryable rule
2025-08-16 16:09:57,548 - INFO - 'Missing modifier' matched explicit retryable rule
2025-08-16 16:09:57,549 - INFO - 'Incorrect NPI' matched explicit retryable rule
2025-08-16 16:09:57,549 - INFO - 'Incorrect NPI' matched explicit retryable rule
2025-08-16 16:09:57,550 - INFO - 'Authorization expired' matched explicit non-retryable rule
2025-08-16 16:09:57,552 - INFO - 'Prior auth required' matched explicit retryable rule
2025-08-16 16:09:57,554 - INFO - 'Prior auth required' matched explicit retryable rule
2025-08-16 16:09:57,556 - INFO - 'Incorrect NPI' matched explicit retryable rule
2025-08-16 16:09:57,558 - INFO - 'Incorrect provider type' matched explicit non-retryable rule
2025-08-16 16:09:57,560 - INFO - 'Incorrect provider type' matc

 DETAILED RESULTS ANALYSIS
 Successfully loaded 4 candidates from file

 CLAIM-BY-CLAIM ANALYSIS
----------------------------------------

1. Claim A123 (ALPHA) Ok
   Status: denied
   Patient ID: P001
   Denial Reason: Missing modifier
   Submitted: 2025-07-01T00:00:00
   Days Since Submission: 29
   Reason Classification: retryable
   Decision: Eligible: denied claim with retryable reason, submitted 29 days ago

2. Claim A124 (ALPHA) Ok
   Status: denied
   Patient ID: P002
   Denial Reason: Incorrect NPI
   Submitted: 2025-07-10T00:00:00
   Days Since Submission: 20
   Reason Classification: retryable
   Decision: Eligible: denied claim with retryable reason, submitted 20 days ago

3. Claim A125 (ALPHA) Not Okay
   Status: denied
   Patient ID: NULL
   Denial Reason: Authorization expired
   Submitted: 2025-07-05T00:00:00
   Days Since Submission: 25
   Reason Classification: non_retryable
   Decision: Patient ID is null or empty

4. Claim A126 (ALPHA) Not Okay
   Status: approved
 

2025-08-16 16:09:57,567 - INFO - 'Incorrect NPI' matched explicit retryable rule
2025-08-16 16:09:57,569 - INFO - 'Prior auth required' matched explicit retryable rule
2025-08-16 16:09:57,571 - INFO - 'Incorrect provider type' matched explicit non-retryable rule
2025-08-16 16:09:57,572 - INFO - 'Missing modifier' matched explicit retryable rule


Total claims analyzed: 10
Eligible for resubmission: 4
