# 02 - Data Processing & Harmonization

This notebook processes raw data into analysis-ready format with consistent FIPS codes, time windows, and calculated metrics.

In [None]:
import pandas as pd
import geopandas as gpd
import numpy as np
from pathlib import Path
import logging
from scipy import stats

# Setup paths
project_root = Path.cwd().parent
data_raw = project_root / 'data' / 'raw'
data_processed = project_root / 'data' / 'processed'

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## 1. Process Electoral Data

In [None]:
def process_election_data(year):
    """Process county presidential election returns"""
    
    # Load raw data
    file_path = data_raw / 'elections' / f'county_presidential_{year}.csv'
    df = pd.read_csv(file_path)
    
    # Standardize column names
    df.columns = df.columns.str.lower()
    
    # Create FIPS code (state + county)
    df['fips'] = df['state_fips'].astype(str).str.zfill(2) + df['county_fips'].astype(str).str.zfill(3)
    
    # Filter for major candidates
    trump_votes = df[df['candidate'].str.contains('TRUMP', case=False, na=False)].groupby('fips')['candidatevotes'].sum()
    
    if year == 2016:
        dem_votes = df[df['candidate'].str.contains('CLINTON', case=False, na=False)].groupby('fips')['candidatevotes'].sum()
    else:
        dem_votes = df[df['candidate'].str.contains('BIDEN', case=False, na=False)].groupby('fips')['candidatevotes'].sum()
    
    total_votes = df.groupby('fips')['candidatevotes'].sum()
    
    # Calculate metrics
    results = pd.DataFrame({
        f'trump_votes_{year}': trump_votes,
        f'dem_votes_{year}': dem_votes,
        f'total_votes_{year}': total_votes,
        f'trump_share_{year}': trump_votes / (trump_votes + dem_votes) * 100,
        f'trump_margin_{year}': ((trump_votes - dem_votes) / total_votes * 100)
    })
    
    results.index.name = 'fips'
    results = results.reset_index()
    
    logger.info(f"Processed {year} election data: {len(results)} counties")
    return results

# Process 2016 and 2020 elections
# election_2016 = process_election_data(2016)
# election_2020 = process_election_data(2020)

## 2. Process CDC WONDER Mortality Data

In [None]:
def process_cdc_wonder(file_path, metric_name):
    """Process CDC WONDER mortality data files"""
    
    df = pd.read_csv(file_path, sep='\t')
    
    # Clean county names and extract FIPS
    df['fips'] = df['County Code'].astype(str).str.zfill(5)
    
    # Handle suppressed data (marked as 'Suppressed' or < 10 deaths)
    df['Deaths'] = pd.to_numeric(df['Deaths'], errors='coerce')
    df['Age Adjusted Rate'] = pd.to_numeric(df['Age Adjusted Rate'], errors='coerce')
    
    # Calculate period averages
    period_avg = df.groupby('fips').agg({
        'Deaths': 'sum',
        'Population': 'mean',
        'Age Adjusted Rate': 'mean'
    }).round(2)
    
    period_avg.columns = [f'{metric_name}_deaths', f'{metric_name}_pop', f'{metric_name}_rate']
    
    logger.info(f"Processed {metric_name}: {len(period_avg)} counties")
    return period_avg.reset_index()

# Process each mortality type
# overdose_1316 = process_cdc_wonder(data_raw / 'cdc_wonder/overdose_2013_2016.txt', 'od_1316')
# overdose_1720 = process_cdc_wonder(data_raw / 'cdc_wonder/overdose_2017_2020.txt', 'od_1720')

## 3. Process CDC PLACES Health Indicators

In [None]:
def process_cdc_places():
    """Process CDC PLACES county-level health data"""
    
    df = pd.read_csv(data_raw / 'cdc_places/places_county_2023.csv')
    
    # Filter for county-level data
    county_df = df[df['DataValueTypeID'] == 'AgeAdjPrv'].copy()
    
    # Select pain-related indicators
    pain_indicators = {
        'ARTHRITIS': 'arthritis_pct',
        'PHLTH': 'freq_phys_distress_pct',
        'DISABILITY': 'disability_pct',
        'DEPRESSION': 'depression_pct',
        'BPHIGH': 'high_bp_pct',
        'DIABETES': 'diabetes_pct'
    }
    
    # Pivot to wide format
    places_wide = county_df[county_df['MeasureId'].isin(pain_indicators.keys())].pivot(
        index='LocationID',
        columns='MeasureId',
        values='Data_Value'
    )
    
    # Rename columns
    places_wide.rename(columns=pain_indicators, inplace=True)
    
    # Extract FIPS from LocationID
    places_wide.index = places_wide.index.astype(str).str.zfill(5)
    places_wide.index.name = 'fips'
    
    logger.info(f"Processed CDC PLACES: {len(places_wide)} counties, {places_wide.shape[1]} indicators")
    return places_wide.reset_index()

# places_data = process_cdc_places()

## 4. Process USDA Rural-Urban Codes

In [None]:
def process_rucc_codes():
    """Process USDA Rural-Urban Continuum Codes"""
    
    df = pd.read_excel(data_raw / 'usda/rucc_2023.xlsx')
    
    # Create FIPS code
    df['fips'] = df['FIPS'].astype(str).str.zfill(5)
    
    # Create binary rural indicator (RUCC >= 4)
    df['rural'] = (df['RUCC_2023'] >= 4).astype(int)
    
    # Create categories
    df['rucc_category'] = pd.cut(
        df['RUCC_2023'],
        bins=[0, 3, 6, 9],
        labels=['Metro', 'Micropolitan', 'Rural']
    )
    
    # Select columns
    rucc_processed = df[['fips', 'RUCC_2023', 'rural', 'rucc_category']].copy()
    rucc_processed.columns = ['fips', 'rucc', 'rural', 'rucc_category']
    
    logger.info(f"Processed RUCC: {len(rucc_processed)} counties")
    return rucc_processed

# rucc_data = process_rucc_codes()

## 5. Merge All Data Sources

In [None]:
def merge_all_data():
    """Merge all processed data sources into a single analysis dataset"""
    
    # Load county boundaries as base
    counties = gpd.read_file(data_raw / 'shapefiles/tl_2023_us_county.shp')
    counties['fips'] = counties['GEOID']
    
    # Filter to continental US (exclude territories, Alaska, Hawaii)
    counties = counties[~counties['STATEFP'].isin(['02', '15', '60', '66', '69', '72', '78'])]
    
    # Start with geometry
    merged = counties[['fips', 'NAME', 'STATEFP', 'geometry']].copy()
    
    # List of dataframes to merge
    # This would include all the processed dataframes from above
    # dfs_to_merge = [
    #     election_2016, election_2020,
    #     overdose_1316, overdose_1720,
    #     suicide_1316, suicide_1720,
    #     places_data,
    #     rucc_data,
    #     census_data
    # ]
    
    # for df in dfs_to_merge:
    #     merged = merged.merge(df, on='fips', how='left')
    
    # Calculate derived variables
    # merged['trump_shift_16_20'] = merged['trump_share_2020'] - merged['trump_share_2016']
    # merged['od_rate_change'] = merged['od_1720_rate'] - merged['od_1316_rate']
    
    logger.info(f"Final dataset: {len(merged)} counties, {merged.shape[1]} variables")
    return merged

# final_data = merge_all_data()
# final_data.to_file(data_processed / 'counties_analysis.geojson', driver='GeoJSON')

## 6. Data Quality Checks

In [None]:
def data_quality_report(df):
    """Generate data quality report"""
    
    report = {
        'n_counties': len(df),
        'n_variables': df.shape[1],
        'missing_by_column': df.isnull().sum().to_dict(),
        'pct_missing_by_column': (df.isnull().sum() / len(df) * 100).round(2).to_dict()
    }
    
    # Identify counties with high missingness
    row_missingness = df.isnull().sum(axis=1)
    high_missing = df[row_missingness > df.shape[1] * 0.3][['fips', 'NAME']]
    report['high_missing_counties'] = high_missing.to_dict('records')
    
    # Check for outliers in key variables
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    outliers = {}
    
    for col in numeric_cols:
        if df[col].notna().sum() > 0:
            z_scores = np.abs(stats.zscore(df[col].dropna()))
            outliers[col] = (z_scores > 3).sum()
    
    report['outliers_by_column'] = outliers
    
    return report

# quality_report = data_quality_report(final_data)
# pd.DataFrame([quality_report]).T