## B2. Alignment Analysis – Need vs AI Implementation

**Description**  
This section evaluates the alignment between healthcare need and AI implementation levels across hospitals. Tertiles are computed for both variables to assess patterns of alignment. 

**Purpose**  
To examine whether AI implementation correspond with areas of greatest need. 

**Method Summary**  
- Rank-based tertiles were created for HPSA, MUA, ADI, SVI scores.  
- AI implementation scores were already categorized into three ctegories (Low, Medium, High).  
- Cross-tabulations were generated and visualized using heatmaps.  


### 1 Load necessary libraries, functions, and pre-processed data 

In [None]:

# load necessary libraries 
import geopandas as gpd
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

In [None]:
AHA_master = pd.read_csv("../../data/AHA_master_external_data.csv", low_memory=False)
AHA_IT = AHA_master[AHA_master.id_it.notna()]

### 2 Data engineering 

In [None]:
# Import all functions but only use what you need
import sys
sys.path.append('../')
from calculate_scores import (
    create_union_aipred_row, 
    calculate_base_ai_implementation_row_imputed
)

In [None]:
AHA_IT['aipred_it_union'] = AHA_IT.apply(calculate_scores.create_union_aipred_row, axis=1)
AHA_IT = calculate_scores.apply_ai_scores_to_dataframe(AHA_IT)

In [None]:
state_to_division = {
    # Division 1: New England
    'ME': 'New England', 'NH': 'New England', 'VT': 'New England', 
    'MA': 'New England', 'RI': 'New England', 'CT': 'New England',
    
    # Division 2: Mid Atlantic
    'NY': 'Mid Atlantic', 'NJ': 'Mid Atlantic', 'PA': 'Mid Atlantic',
    
    # Division 3: South Atlantic
    'DE': 'South Atlantic', 'MD': 'South Atlantic', 'DC': 'South Atlantic',
    'VA': 'South Atlantic', 'WV': 'South Atlantic', 'NC': 'South Atlantic',
    'SC': 'South Atlantic', 'GA': 'South Atlantic', 'FL': 'South Atlantic',
    
    # Division 4: East North Central
    'OH': 'East North Central', 'IN': 'East North Central', 'IL': 'East North Central',
    'MI': 'East North Central', 'WI': 'East North Central',
    
    # Division 5: East South Central
    'KY': 'East South Central', 'TN': 'East South Central', 
    'AL': 'East South Central', 'MS': 'East South Central',
    
    # Division 6: West North Central
    'MN': 'West North Central', 'IA': 'West North Central', 'MO': 'West North Central',
    'ND': 'West North Central', 'SD': 'West North Central', 'NE': 'West North Central',
    'KS': 'West North Central',
    
    # Division 7: West South Central
    'AR': 'West South Central', 'LA': 'West South Central', 
    'OK': 'West South Central', 'TX': 'West South Central',
    
    # Division 8: Mountain
    'MT': 'Mountain', 'ID': 'Mountain', 'WY': 'Mountain', 'CO': 'Mountain',
    'NM': 'Mountain', 'AZ': 'Mountain', 'UT': 'Mountain', 'NV': 'Mountain',
    
    # Division 9: Pacific
    'WA': 'Pacific', 'OR': 'Pacific', 'CA': 'Pacific', 
    'AK': 'Pacific', 'HI': 'Pacific',
    
    # Territories
    'PR': 'Territories', 'GU': 'Territories', 'VI': 'Territories', 
    'AS': 'Territories', 'MP': 'Territories'
}
division_to_region = {
    'New England' : 'Northeast',
    'Mid Atlantic' : 'Northeast', 
    'East North Central' : 'Midwest', 
    'West North Central' : 'Midwest', 
    'South Atlantic' : 'South', 
    'East South Central' : 'South', 
    'West South Central' : 'South', 
    'Mountain' : 'West', 
    'Pacific' : 'West'
 }
# Add census division column to the dataframe
AHA_IT['division'] = AHA_IT['mstate_it'].map(state_to_division)
AHA_IT['region'] = AHA_IT['division'].map(division_to_region)
AHA_IT_US = AHA_IT[AHA_IT['division']!='Territories']

In [None]:
# Create model_type mapping
AHA_IT_US['model_type'] = AHA_IT_US['ai_base_score_imputed'].map({
    0: 'No Models',
    1: 'Non-AI Predictive Models', 
    2: 'AI Predictive Models'
})

### 3 Alignment analysis 

In [None]:
def create_rank_based_tertiles(df, column_name, labels=['Low Need', 'Medium Need', 'High Need']):
        return pd.qcut(df[column_name].rank(method='first'), 3, labels=labels)

def create_standard_tertiles(df, column_name, labels=['Low Need', 'Medium Need', 'High Need']):
        return pd.qcut(df[column_name], 3, labels=labels)

def create_designation_binary(df, column_name):
        return (df[column_name] > 0).astype(int)

def create_designation_MUA(df, column_name):
        return (df[column_name] <= 62).astype(int)
    
def rr_from_joint_table(tbl, ai_label="AI Predictive Models", hi_label=None, lo_label=None):
    if hi_label is None or lo_label is None:
        idx = list(tbl.index)
        hi_label = next((lab for lab in idx if "high" in str(lab).lower()), idx[-1])
        lo_label = next((lab for lab in idx if "low" in str(lab).lower()), idx[0])
    num_H = float(tbl.loc[hi_label, ai_label])
    den_H = float(tbl.loc[hi_label, :].sum())
    num_L = float(tbl.loc[lo_label, ai_label])
    den_L = float(tbl.loc[lo_label, :].sum())
    pH = num_H / den_H if den_H > 0 else np.nan
    pL = num_L / den_L if den_L > 0 else np.nan
    RR = np.nan if (pL == 0 or np.isnan(pL)) else pH / pL
    rel = 100 * (RR - 1) if np.isfinite(RR) else np.nan
    return pH, pL, RR, rel, hi_label, lo_label

In [None]:
# Create AI implementation categories
AHA_IT_US['model_type'] = AHA_IT_US['ai_base_score_imputed'].map({
    0: 'No Models',
    1: 'Non-AI Predictive Models', 
    2: 'AI Predictive Models'
})

AHA_IT_US['model_type'] = pd.Categorical(
    AHA_IT_US['model_type'],
    categories=['No Models', 'Non-AI Predictive Models', 'AI Predictive Models'],
    ordered=True
)

In [None]:
# HPSA/MUA measures - RANK-BASED tertiles
hpsa_mua_measures = {
    'primary_hpss_tertile': 'mean_primary_hpss',
    'mental_hpss_tertile': 'mean_mental_hpss',
    'dental_hpss_tertile': 'mean_dental_hpss',
    'mua_score_tertile': 'mean_mua_shortage',
    'mua_elder_tertile': 'mean_mua_elders_shortage',
    'mua_infant_tertile': 'mean_mua_infant_shortage'
}
for tertile_col, score_col in hpsa_mua_measures.items():
    AHA_IT_US[tertile_col] = create_rank_based_tertiles(AHA_IT_US, score_col)

# Socioeconomic measures - STANDARD tertiles
socio_measures = {
    'adi_tertile': 'national_adi_median',
    'svi_tertile': 'svi_themes_median'
}
    
for tertile_col, score_col in socio_measures.items():
    AHA_IT_US[tertile_col] = create_standard_tertiles(AHA_IT_US, score_col)


In [None]:
# Define model type labels
MODEL_LABELS = ['No Models', 'Non-AI Predictive Models', 'AI Predictive Models']
AI_LABEL = 'AI Predictive Models'

In [None]:
# Create socioeconomic cross-tabulations
socioeconomic_measures = {
    'Area Deprivation Index': 'adi_tertile',
    'Social Vulnerability Index': 'svi_tertile'
}

socioeconomic_tables = {}
socioeconomic_stats = {}

for name, column in socioeconomic_measures.items():
    print(f"\nProcessing {name}...")
    
    # Filter to valid model types
    valid_data = AHA_IT_US[AHA_IT_US['model_type'].notna()].copy()
    total_all = len(valid_data)
    
    # ========================================
    # Method 1: From raw counts (for verification)
    # ========================================
    # Get counts for High and Low Need
    high_mask = valid_data[column] == 'High Need'
    low_mask = valid_data[column] == 'Low Need'
    
    nH = int(high_mask.sum())
    nL = int(low_mask.sum())
    
    # Count AI in each group
    aH = int((valid_data.loc[high_mask, 'model_type'] == AI_LABEL).sum())
    aL = int((valid_data.loc[low_mask, 'model_type'] == AI_LABEL).sum())
    
    # Calculate proportions
    pH_counts = aH / nH if nH > 0 else np.nan
    pL_counts = aL / nL if nL > 0 else np.nan
    RR_counts = pH_counts / pL_counts if (pL_counts > 0 and not np.isnan(pL_counts)) else np.nan
    rel_counts = 100 * (RR_counts - 1) if np.isfinite(RR_counts) else np.nan
    
    # ========================================
    # Method 2: From cross-tabulation table
    # ========================================
    # Create cross-tabulation (global %)
    cross_tab = pd.crosstab(
        valid_data[column], 
        valid_data['model_type'], 
        normalize=True
    ) * 100
    
    # Reorder to put High Need at TOP, Low Need at BOTTOM
    cross_tab = cross_tab.reindex(['High Need', 'Medium Need', 'Low Need'])
    
    # Ensure all expected columns exist, fill missing with 0
    for col in MODEL_LABELS:
        if col not in cross_tab.columns:
            cross_tab[col] = 0
    
    # Reorder columns
    cross_tab = cross_tab[MODEL_LABELS]
    
    socioeconomic_tables[name] = cross_tab
    
    # Calculate RR from table
    pH_table, pL_table, RR_table, rel_table, hi_label, lo_label = rr_from_joint_table(
        cross_tab, 
        ai_label=AI_LABEL
    )
    
    
    # ========================================
    # Store comprehensive statistics
    # ========================================
    socioeconomic_stats[name] = {
        # Sample sizes
        'nH': nH,  # High Need hospitals
        'nL': nL,  # Low Need hospitals
        'total': total_all,
        
        # AI counts
        'aH': aH,  # AI in High Need
        'aL': aL,  # AI in Low Need
        
        # Proportions (use table-based as primary)
        'pH': pH_table,  # P(AI | High Need)
        'pL': pL_table,  # P(AI | Low Need)
        
        # Risk Ratio
        'RR': RR_table,
        'rel': rel_table,  # Relative change (%)
        
        # Labels
        'hi_label': hi_label,
        'lo_label': lo_label,
    }
    
    # Print summary
    print(f"  High Need: n={nH}, AI={aH} ({100*pH_table:.1f}%)")
    print(f"  Low Need:  n={nL}, AI={aL} ({100*pL_table:.1f}%)")
    print(f"  RR = {RR_table:.3f} ({rel_table:+.1f}%)")

# ========================================
# Print summary table
# ========================================
print("\n" + "="*80)
print("SUMMARY: SOCIOECONOMIC MEASURES")
print("="*80 + "\n")

summary_data = []
for name, stats in socioeconomic_stats.items():
    summary_data.append({
        'Measure': name,
        'High Need (n)': stats['nH'],
        'Low Need (n)': stats['nL'],
        'P(AI | High)': f"{100*stats['pH']:.1f}%",
        'P(AI | Low)': f"{100*stats['pL']:.1f}%",
        'Risk Ratio': f"{stats['RR']:.3f}",
        'Change': f"{stats['rel']:+.1f}%"
    })

summary_df = pd.DataFrame(summary_data)
print(summary_df.to_string(index=False))

print("\n" + "="*80)
print("INTERPRETATION:")
print("="*80)
for name, stats in socioeconomic_stats.items():
    if stats['RR'] < 1:
        print(f"• {name}: AI adoption is {(1-stats['RR'])*100:.1f}% LOWER in high disadvantage areas")
    elif stats['RR'] > 1:
        print(f"• {name}: AI adoption is {(stats['RR']-1)*100:.1f}% HIGHER in high disadvantage areas")
    else:
        print(f"• {name}: AI adoption is EQUAL across disadvantage levels")

In [None]:
# ========================================
# HRSA DESIGNATION MEASURES
# ========================================

# Define designation measures
designation_measures = {
    'Primary HPSA': 'primary_hpsa_desig',
    'Mental HPSA': 'mental_hpsa_desig',
    'Dental HPSA': 'dental_hpsa_desig',
    'MUA Overall': 'mua_overall_desig'
}

# Create designation flags if they don't exist
print("Creating designation flags...")
AHA_IT_US['primary_hpsa_desig'] = (AHA_IT_US['mean_primary_hpss'] > 0).astype(int)
AHA_IT_US['mental_hpsa_desig'] = (AHA_IT_US['mean_mental_hpss'] > 0).astype(int)
AHA_IT_US['dental_hpsa_desig'] = (AHA_IT_US['mean_dental_hpss'] > 0).astype(int)
AHA_IT_US['mua_overall_desig'] = (AHA_IT_US['mean_mua_score'] <= 62).astype(int)

designation_tables = {}
designation_stats = {}

for name, column in designation_measures.items():
    print(f"\nProcessing {name}...")
    
    # Filter to valid model types
    valid_data = AHA_IT_US[AHA_IT_US['model_type'].notna()].copy()
    total_all = len(valid_data)
    
    # ========================================
    # Method 1: From raw counts (for verification)
    # ========================================
    # Get counts for Designated and Not Designated
    desig_mask = valid_data[column] == 1
    not_desig_mask = valid_data[column] == 0
    
    nD = int(desig_mask.sum())
    nN = int(not_desig_mask.sum())
    
    # Count AI in each group
    aD = int((valid_data.loc[desig_mask, 'model_type'] == AI_LABEL).sum())
    aN = int((valid_data.loc[not_desig_mask, 'model_type'] == AI_LABEL).sum())
    
    # Calculate proportions
    pD_counts = aD / nD if nD > 0 else np.nan
    pN_counts = aN / nN if nN > 0 else np.nan
    RR_counts = pD_counts / pN_counts if (pN_counts > 0 and not np.isnan(pN_counts)) else np.nan
    rel_counts = 100 * (RR_counts - 1) if np.isfinite(RR_counts) else np.nan
    
    # ========================================
    # Method 2: From cross-tabulation table
    # ========================================
    # Count by designation status and model type
    cnt_D = valid_data.loc[desig_mask, 'model_type'].value_counts().reindex(MODEL_LABELS, fill_value=0)
    cnt_N = valid_data.loc[not_desig_mask, 'model_type'].value_counts().reindex(MODEL_LABELS, fill_value=0)
    
    # Convert to GLOBAL % (all cells sum to 100%)
    pct_D = cnt_D / total_all * 100.0
    pct_N = cnt_N / total_all * 100.0
    
    # Create table
    cross_tab = pd.DataFrame([pct_D.values, pct_N.values],
                             index=["Designated", "Not Designated"],
                             columns=MODEL_LABELS)
    
    designation_tables[name] = cross_tab
    
    # Calculate RR from table using conditional probabilities
    # P(AI | Designated) = P(Designated, AI) / P(Designated)
    # P(AI | Not Designated) = P(Not Designated, AI) / P(Not Designated)
    
    ai_pct_D = cross_tab.loc["Designated", AI_LABEL]
    ai_pct_N = cross_tab.loc["Not Designated", AI_LABEL]
    
    all_pct_D = cross_tab.loc["Designated", :].sum()
    all_pct_N = cross_tab.loc["Not Designated", :].sum()
    
    pD_table = ai_pct_D / all_pct_D if all_pct_D > 0 else np.nan
    pN_table = ai_pct_N / all_pct_N if all_pct_N > 0 else np.nan
    RR_table = pD_table / pN_table if (pN_table > 0 and not np.isnan(pN_table)) else np.nan
    rel_table = 100 * (RR_table - 1) if np.isfinite(RR_table) else np.nan
    
    # ========================================
    # Store comprehensive statistics
    # ========================================
    designation_stats[name] = {
        # Sample sizes
        'nD': nD,  # Designated hospitals
        'nN': nN,  # Not designated hospitals
        'total': total_all,
        
        # AI counts
        'aD': aD,  # AI in Designated
        'aN': aN,  # AI in Not Designated
        
        # Proportions (use table-based as primary)
        'pD': pD_table,  # P(AI | Designated)
        'pN': pN_table,  # P(AI | Not Designated)
        
        # Risk Ratio
        'RR': RR_table,
        'rel': rel_table,  # Relative change (%)
        
    }
    
    # Print summary
    print(f"  Designated:     n={nD}, AI={aD} ({100*pD_table:.1f}%)")
    print(f"  Not Designated: n={nN}, AI={aN} ({100*pN_table:.1f}%)")
    print(f"  RR = {RR_table:.3f} ({rel_table:+.1f}%)")

# ========================================
# Print summary table for designations
# ========================================
print("\n" + "="*80)
print("SUMMARY: HRSA DESIGNATION MEASURES")
print("="*80 + "\n")

summary_data = []
for name, stats in designation_stats.items():
    summary_data.append({
        'Designation': name,
        'Designated (n)': stats['nD'],
        'Not Designated (n)': stats['nN'],
        'P(AI | Designated)': f"{100*stats['pD']:.1f}%",
        'P(AI | Not Designated)': f"{100*stats['pN']:.1f}%",
        'Risk Ratio': f"{stats['RR']:.3f}",
        'Change': f"{stats['rel']:+.1f}%"
    })

summary_df = pd.DataFrame(summary_data)
print(summary_df.to_string(index=False))

print("\n" + "="*80)
print("INTERPRETATION:")
print("="*80)
for name, stats in designation_stats.items():
    if stats['RR'] < 1:
        print(f"• {name}: AI adoption is {(1-stats['RR'])*100:.1f}% LOWER in designated areas")
    elif stats['RR'] > 1:
        print(f"• {name}: AI adoption is {(stats['RR']-1)*100:.1f}% HIGHER in designated areas")
    else:
        print(f"• {name}: AI adoption is EQUAL in designated vs not designated areas")

In [None]:
# ========================================
# DISPLAY ALL RESULTS
# ========================================

# Display socioeconomic results
print("Socioeconomic Measures (Tertiles)")
print("="*60)
for name, table in socioeconomic_tables.items():
    print(f"\n{name}:")
    print(table.round(1))
    stats = socioeconomic_stats[name]
    print(f"High vs Low AI: {100*stats['pH']:.1f}% vs {100*stats['pL']:.1f}%")
    print(f"RR = {stats['RR']:.2f} ({stats['rel']:+.0f}% relative)")

# Display designation results
print("\n\nHRSA Designation Measures (Binary)")
print("="*60)
for name, table in designation_tables.items():
    print(f"\n{name}:")
    print(table.round(1))
    stats = designation_stats[name]
    print(f"Designated vs Not Designated AI: {100*stats['pD']:.1f}% vs {100*stats['pN']:.1f}%")
    print(f"RR = {stats['RR']:.2f} ({stats['rel']:+.0f}% relative)")