In [None]:
# President: 2016 (Trump), 2020 (Biden), 2024 (Trump)
# Governor: 2018 (Whitmer), 2022 (Whitmer)
# Secretary of State: 2018 (Benson), 2022 (Benson)
# Attorney General: 2018 (Nessel), 2022 (Nessel)
# U.S. Senate: 2014 (Peters), 2018 (Stabenow), 2020 (Peters), 2024 (Slotkin)
# U.S. House: every cycle
# State Senate: 2014, 2018, 2022
# State House: every cycle
# State Board of Education: 2014, 2016, 2018, 2020, 2022, 2024 (every even number year)
# University of Michigan Board of Regents: 2014, 2016, 2018, 2020, 2022, 2024 (every two years)
# Michigan State University Board of Trustees: 2014, 2016, 2018, 2020, 2022, 2024 (every two years)
# Wayne State University Board of Governors: 2014, 2016, 2018, 2020, 2022, 2024 (every two years)
# Straight Party – 2020 and later (OpenElection daty only)

OFFICES = ['U.S. House', 'State House']
YEARS = ['2022'] # Remove first two cycles

# OFFICES = ['U.S. Senate']
# YEARS = ['2014', '2018', '2020', '2024']

# OFFICES = ['State Senate']
# YEARS = ['2014', '2018', '2022']

# OFFICES = ['President']
# YEARS = ['2024'] # you can only do 2024, given only two previous elections

# OFFICES = ['Governor', 'Secretary of State', 'Attorney General']
# YEARS = ['2018', '2022']

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
from functools import reduce
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.feature_selection import mutual_info_regression
from sklearn.impute import SimpleImputer
from sklearn.inspection import permutation_importance
from sklearn.linear_model import LassoCV, LinearRegression, LogisticRegression
from sklearn.metrics import r2_score, accuracy_score, f1_score, accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
import csv
import gc
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import shap
import xgboost as xgb

In [None]:
# pd.set_option('display.max_rows', None)
pd.set_option("display.max_columns", None)

### Compute partisanship

In [None]:
def categorize_partisanship(row):
    if row["dem_share_prev"] >= 0.667:
        return "strong democrat"
    elif row["dem_share_prev"] >= 0.501:
        return "leans democrat"
    elif row["rep_share_prev"] >= 0.667:
        return "strong republican"
    elif row["rep_share_prev"] >= 0.501:
        return "leans republican"
    elif row["oth_share_prev"] >= 0.667:
        return "strong independent"
    elif row["oth_share_prev"] >= 0.501:
        return "leans independent"
    else:
        return "neutral"

In [None]:
def categorize_partisan_change(row):
    # Model with a number line, ignore "other" parties
    # negative = left = more dem, positive = right = more rep
    change = row["rep_share_change"] - row["dem_share_change"]
    
    if np.abs(change) >= 0.01:
        if change > 0.5:
            return "gargantuanly more republican"
        if change > 0.35:
            return "massively more republican"
        if change > 0.25:
            return "much much more republican"
        if change > 0.15:
            return "much more republican"
        if change > 0.1:
            return "more republican"
        if change > 0.05:
            return "slightly more republican"
        elif change > 0.01:
            return "very slightly more republican"
        elif change > 0.005:
            return "infinitesimally more republican"
        elif change < -0.5:
            return "gargantuanly more democrat"
        elif change < -0.35:
            return "massively more democrat"
        elif change < -0.25:
            return "much much democrat"
        elif change < -0.15:
            return "much more democrat"
        elif change < -0.1:
            return "more democrat"
        elif change < -0.05:
            return "slightly more democrat"
        elif change < -0.01:
            return "very slightly more democrat"
        elif change < -0.005:
            return "infinitesimally more democrat"
    else:
        return "no change"

In [None]:
def categorize_partisan_change_amount(row):
    # Model with a number line, ignore "other" parties
    # negative = left = more dem, positive = right = more rep
    change = row["rep_share_change"] - row["dem_share_change"]
    return change

### Predictions
US House and State House for 2018, 2020, 2022 ~ 120 mins

In [None]:
census_datasets = [
    'b02001_race', 'b04007_ancestry', 'b05012_nativity_us', 'b08303_travel_time_work', 
    'b25003_housing_rentership', 'dp04_housing_characteristics', 'dp05_age_race', 's0101_age_sex', 
    's1101_households_families', 's1201_marital_status', 's1501_educational_attainment', 's1701_income_poverty', 
    's1903_median_income', 's2101_veteran_status', 's2201_food_stamps', 's2301_employment_status', 
    's2401_occupation_sex', 's2403_industry_sex', 's2501_occupancy_characteristics', 
    's2503_financial_characteristics', 's2701_health_insurance',
]

active_target = 'partisanship_change'
all_targets = ['partisanship', 'partisanship_change', 'partisanship_change_amount', 'turnout_pct']

# Rank n top features: [1, < infinity]
top_n_features_to_rank = 100000000

# Predict with n top features
# 16 = 0.4697 w/o engineered features
# 23 = 16 + 7 engineered fields we later throw away
top_n_feature_for_preds = 23

In [None]:
for year in YEARS:
    print(f'Processing year {year}')

    for office in OFFICES:
        print(f'Processing office {office}')

        # Load precinct-level election data
        df_precincts = pd.read_csv('data/generated_data/df_06_tract_' + year + '_' + office.replace('.', '').replace(' ', '_') + '.csv')
        df_precincts['standardized_id_num'] = df_precincts['standardized_id_num'].astype(str).str.zfill(13)

        # Assign partisanship categories
        df_precincts["partisanship"] = df_precincts.apply(categorize_partisanship, axis=1)
        df_precincts["partisanship_change"] = df_precincts.apply(lambda row: categorize_partisan_change(row), axis=1)
        df_precincts["partisanship_change_amount"] = df_precincts.apply(lambda row: categorize_partisan_change_amount(row), axis=1)

        print(f'Loading census data')
        census_dataset_dfs = []
        for census_dataset in census_datasets:
            census_dataset = census_dataset.lower()
            if census_dataset[:1] == 's':
                census_dataset_label = census_dataset[6:]
            elif census_dataset[:1] == 'b':
                census_dataset_label = census_dataset[7:]

            df_census_dataset = pd.read_csv(f'data/generated_data/df_06_{census_dataset_label}_' + year + '_' + office.replace('.', '').replace(' ', '_') + '.csv')
            df_census_dataset.rename(columns={f'geoid_{census_dataset_label}': 'geoidfq_tract'}, inplace=True)
            census_dataset_dfs.append(df_census_dataset)

        # Merge all datasets
        dfs = [df_precincts] + census_dataset_dfs
        df = reduce(lambda left, right: pd.merge(left, right, on='geoidfq_tract', how='left'), dfs)

        print(f'Cleaning columns')
        drop_cols = [
            "City/Township Description", "District Code", "Election Type",
            "Michigan County Code", "Office Description", "Precinct Label", "Status Code",
            "County Name", "Precinct Number", "Election Year",
            "total_votes", "registered_voters", "turnout_pct",
            "nearest_bound_census_tract", "nearest_bound_school_district", "nearest_bound_zipcode",
            "Office Code", "Census County Code", "City/Township Code", "Ward Number",
            "county", "office", "electionye", "registered_voters_change",
            "dem_votes", "rep_votes", "oth_votes",
            "dem_share", "rep_share", "oth_share",
            "dem_votes_change", "rep_votes_change", "oth_votes_change",
            "locale_full", "objectid", "precinct_num", "precinct_wp_id",
            "standardized_id", "geometry", "geometry_tract", "geoidfq_tract", "name_tract",
            "subdivision_fips", "ward_num", "locale_full", "county_fips", "objectid", "precinct_num",
            "nearest_tract", "awater_tract", "aland_tract", "tractce_tract", "shapestarea", "shapestlength", "geoid_tract"
        ]

        string_columns = ["standardized_id_num", "partisanship", "partisanship_change"]
        df = df.drop(columns=drop_cols, errors='ignore')
        numeric_df = df.drop(columns=string_columns, errors='ignore').apply(pd.to_numeric, errors='coerce')
        numeric_df = numeric_df.fillna(numeric_df.median())
        df = pd.concat([df[string_columns], numeric_df], axis=1)
        df = df.dropna(subset=['partisanship_change', 'partisanship_change_amount'])

        # Per-target prediction
        for active_target in all_targets:
            if active_target not in df.columns:
                print(f"Skipping {active_target} (not in data)")
                continue

            print(f"Processing target: {active_target}")

            # Encode target labels
            label_encoder = LabelEncoder()
            df[active_target] = df[active_target].astype(str)
            df[active_target] = label_encoder.fit_transform(df[active_target])

            excluded_targets = [t for t in all_targets if t != active_target and t in df.columns]
            excluded_features = ["standardized_id_num", "registered_voters"] + excluded_targets
            X = df.drop(columns=[active_target] + excluded_features, errors='ignore')
            y = df[active_target]

            X = X.select_dtypes(include=[np.number])
            imputer = SimpleImputer(strategy="median", add_indicator=True)
            X_imputed = imputer.fit_transform(X)
            new_column_names = list(X.columns) + [f"missing_{i}" for i in range(X_imputed.shape[1] - X.shape[1])]
            X = pd.DataFrame(X_imputed, columns=new_column_names)
            y = y.fillna(y.median())

            scaler = StandardScaler()
            X_scaled = scaler.fit_transform(X)
            X = pd.DataFrame(X_scaled, columns=X.columns)
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

            print(f'Measuring feature importance')
            correlations = X.corrwith(y).abs().sort_values(ascending=False)

            rf = RandomForestRegressor(n_estimators=100, random_state=42)
            rf.fit(X_train, y_train)
            rf_importances = pd.Series(rf.feature_importances_, index=X.columns).sort_values(ascending=False)

            lasso = LassoCV(cv=5, random_state=42).fit(X_train, y_train)
            lasso_importances = pd.Series(np.abs(lasso.coef_), index=X.columns).sort_values(ascending=False)

            mi_scores = mutual_info_regression(X_train, y_train)
            mi_importances = pd.Series(mi_scores, index=X.columns).sort_values(ascending=False)

            xgb_model = xgb.XGBRegressor(n_estimators=100, max_depth=5, learning_rate=0.1, random_state=42)
            xgb_model.fit(X_train, y_train)
            shap_sample = X_train.sample(n=min(500, len(X_train)), random_state=42)
            explainer = shap.Explainer(xgb_model, shap_sample)
            shap_values = explainer(shap_sample)
            shap_importances = pd.Series(np.abs(shap_values.values).mean(axis=0), index=X.columns).sort_values(ascending=False)

            feature_rankings = pd.DataFrame({
                "Correlation": correlations,
                "RandomForest": rf_importances,
                "Lasso": lasso_importances,
                "MutualInfo": mi_importances,
                "SHAP": shap_importances
            })
            feature_rankings_ranked = feature_rankings.rank(axis=0, pct=True, method="average", ascending=False)
            feature_rankings["Average_Rank"] = feature_rankings_ranked.mean(axis=1)
            feature_rankings.to_csv(f'data/generated_data/feature_rankings_{active_target}_{year}_{office.replace(".", "").replace(" ", "_")}.csv')

            top_features = feature_rankings.sort_values("Average_Rank", ascending=True).index[:top_n_feature_for_preds].tolist()
            valid_features = [f for f in top_features if f in X.columns]
            X_selected = X[valid_features]

            X_train, X_test, y_train, y_test = train_test_split(X_selected, y, test_size=0.2, random_state=42)
            model = RandomForestClassifier(n_estimators=100, random_state=42)
            model.fit(X_train, y_train)

            print(f'Make predictions')
            y_pred = model.predict(X_test)
            accuracy = accuracy_score(y_test, y_pred)
            print(f"Test Set Accuracy: {accuracy:.4f}")

            target_names = list(label_encoder.classes_)
            print("\nClassification Report:\n", classification_report(y_test, y_pred, target_names=target_names))

            df[f"predicted_{active_target}"] = label_encoder.inverse_transform(model.predict(X_selected))
            decoded_y_true = label_encoder.inverse_transform(y)
            decoded_y_pred = df[f"predicted_{active_target}"]
            overall_accuracy = np.mean(decoded_y_true == decoded_y_pred)
            print(f"Full Dataset Accuracy (Excel-style): {overall_accuracy:.4f}")

            out_df = df[["standardized_id_num", f"predicted_{active_target}"]].copy()
            out_df["standardized_id_num"] = out_df["standardized_id_num"].apply(lambda x: str(x).replace('.0', '').zfill(13))
            out_df["true_label"] = decoded_y_true
            out_df.to_csv(f"data/generated_data/predicted_{active_target}_{year}_{office.replace('.', '').replace(' ', '_')}.csv", index=False)