# 2016 Election "What-If" Simulator- Notebook
## Electoral College Edition

### SETUP & DATA LOADING
#### Cell 1: Load Raw Data

In [0]:
df = spark.table("workspace.default.1976_2020_president")
df.show(5)

#### Cell 2: Create Bronze Table (Raw Data)

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("elections_bronze")

#### Cell 3: Explore Bronze Data

In [0]:
# Read the bronze table
bronze_df = spark.table("elections_bronze")

# Check what columns you have
print(bronze_df.columns)
bronze_df.select("year", "state", "party_simplified", "candidate", "candidatevotes", "totalvotes").show(10)

#### Cell 4: Clean & Transform (Silver Layer)

In [0]:
from pyspark.sql.functions import col, sum, first

# Clean and aggregate to state level
silver_df = bronze_df.filter(col("candidatevotes").isNotNull()) \
    .groupBy("year", "state", "party_simplified", "candidate") \
    .agg(
        sum("candidatevotes").alias("total_votes"),
        first("totalvotes").alias("state_total_votes")
    ) \
    .withColumn("vote_share", col("total_votes") / col("state_total_votes"))

silver_df.show(10)

#### Cell 5: Save Silver Table

In [0]:
silver_df.write.format("delta").mode("overwrite").saveAsTable("elections_silver")

#### Cell 6: Filter to 2016 Election

In [0]:
# Election to simulate: 2016
base_election = spark.table("elections_silver").filter(col("year") == 2016)
base_election.show()

### ELECTORAL COLLEGE SIMULATION
#### Cell 7: Electoral College Map

In [0]:
# Map states to electoral votes (2016 counts)
electoral_votes = {
    'ALABAMA': 9, 'ALASKA': 3, 'ARIZONA': 11, 'ARKANSAS': 6,
    'CALIFORNIA': 55, 'COLORADO': 9, 'CONNECTICUT': 7, 'DELAWARE': 3,
    'DISTRICT OF COLUMBIA': 3, 'FLORIDA': 29, 'GEORGIA': 16, 'HAWAII': 4,
    'IDAHO': 4, 'ILLINOIS': 20, 'INDIANA': 11, 'IOWA': 6,
    'KANSAS': 6, 'KENTUCKY': 8, 'LOUISIANA': 8, 'MAINE': 4,
    'MARYLAND': 10, 'MASSACHUSETTS': 11, 'MICHIGAN': 16, 'MINNESOTA': 10,
    'MISSISSIPPI': 6, 'MISSOURI': 10, 'MONTANA': 3, 'NEBRASKA': 5,
    'NEVADA': 6, 'NEW HAMPSHIRE': 4, 'NEW JERSEY': 14, 'NEW MEXICO': 5,
    'NEW YORK': 29, 'NORTH CAROLINA': 15, 'NORTH DAKOTA': 3, 'OHIO': 18,
    'OKLAHOMA': 7, 'OREGON': 7, 'PENNSYLVANIA': 20, 'RHODE ISLAND': 4,
    'SOUTH CAROLINA': 9, 'SOUTH DAKOTA': 3, 'TENNESSEE': 11, 'TEXAS': 38,
    'UTAH': 6, 'VERMONT': 3, 'VIRGINIA': 13, 'WASHINGTON': 12,
    'WEST VIRGINIA': 5, 'WISCONSIN': 10, 'WYOMING': 3
}

# Use Python's builtin sum, not PySpark's sum
total_ev = 0
for ev in electoral_votes.values():
    total_ev += ev
print(f"Total Electoral Votes: {total_ev}")

#### Cell 8: Electoral College Simulation Function

In [0]:
from pyspark.sql.functions import rand, when, lit, sum, col
import random

def simulate_election_ec(base_df, swing_states, flip_probability, electoral_map):
    """
    Simulate election using Electoral College
    Randomly flips swing states and calculates electoral votes
    """
    result_df = base_df.withColumn("adjusted_votes", col("total_votes"))
    
    # Track which states flipped
    flipped_states = []
    
    # Flip each swing state randomly
    for state in swing_states:
        if random.random() < flip_probability:
            flipped_states.append(state)
            
            # Get top 2 candidates in this state
            state_results = result_df.filter(col("state") == state) \
                .filter(col("party_simplified").isin(["DEMOCRAT", "REPUBLICAN"])) \
                .orderBy(col("total_votes").desc()) \
                .limit(2) \
                .collect()
        
            if len(state_results) >= 2:
                winner_candidate = state_results[0]["candidate"]
                second_candidate = state_results[1]["candidate"]
                winner_votes = state_results[0]["total_votes"]
                second_votes = state_results[1]["total_votes"]

                # Swap the votes between top 2 candidates
                result_df = result_df.withColumn(
                    "adjusted_votes",
                    when(
                        (col("state") == state) & (col("candidate") == winner_candidate),
                        lit(second_votes)
                    ).when(
                        (col("state") == state) & (col("candidate") == second_candidate),
                        lit(winner_votes)
                    ).otherwise(col("adjusted_votes"))
                )
    
    # Calculate electoral votes for each candidate
    # Find winner in each state
    state_winners = result_df.filter(col("party_simplified").isin(["DEMOCRAT", "REPUBLICAN"])) \
        .groupBy("state", "candidate") \
        .agg(sum("adjusted_votes").alias("state_total")) \
        .orderBy("state", col("state_total").desc())
    
    # Get top candidate per state
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number
    
    window_spec = Window.partitionBy("state").orderBy(col("state_total").desc())
    state_winners = state_winners.withColumn("rank", row_number().over(window_spec)) \
        .filter(col("rank") == 1) \
        .drop("rank")
    
    # Calculate electoral votes
    ec_results = {}
    for row in state_winners.collect():
        state = row["state"]
        candidate = row["candidate"]
        ev = electoral_map.get(state, 0)
        
        if candidate not in ec_results:
            ec_results[candidate] = 0
        ec_results[candidate] += ev
    
    # Find winner (270 to win)
    winner = max(ec_results.items(), key=lambda x: x[1])
    
    return {
        'candidate': winner[0],
        'electoral_votes': winner[1],
        'flipped_states': flipped_states,
        'all_results': ec_results
    }

#### Cell 9: Test Simulation Once

In [0]:
# Test it once
swing_states = ["FLORIDA", "PENNSYLVANIA", "MICHIGAN", "WISCONSIN"]
result = simulate_election_ec(base_election, swing_states, 0.5, electoral_votes)

print(f"Winner: {result['candidate']}")
print(f"Electoral Votes: {result['electoral_votes']}")
print(f"Flipped States: {result['flipped_states']}")
print(f"All Results: {result['all_results']}")

### RUN SIMULATIONS
#### Cell 10: Run 1000 Simulations (Base Scenario)

In [0]:
# Run 100 simulations
swing_states = ["FLORIDA", "PENNSYLVANIA", "MICHIGAN", "WISCONSIN"]
results = []

for i in range(1000):
    result = simulate_election_ec(base_election, swing_states, flip_probability=0.5, electoral_map=electoral_votes)
    results.append((
        i, 
        result['candidate'], 
        result['electoral_votes'],
        len(result['flipped_states']),
        str(result['flipped_states'])
    ))

# Convert to DataFrame
results_df = spark.createDataFrame(
    results, 
    ["sim_id", "winner", "electoral_votes", "num_flips", "flipped_states"]
)

results_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("simulation_results")

#### Cell 11: Analyze Base Results

In [0]:
from pyspark.sql.functions import count, avg

# Count wins per candidate
win_counts = spark.table("simulation_results") \
    .groupBy("winner") \
    .agg(
        count("*").alias("wins"),
        (count("*") / 1000 * 100).alias("win_probability")
    ) \
    .orderBy(col("wins").desc())

win_counts.show()

# Average electoral votes
avg_ev = spark.table("simulation_results") \
    .groupBy("winner") \
    .agg(avg("electoral_votes").alias("avg_electoral_votes"))

avg_ev.show()

# Save as gold table
win_counts.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("elections_gold")

#### Cell 12: Aggressive Scenario (80% flip probability)

In [0]:
from pyspark.sql.functions import count

# Scenario 1: Higher flip probability
results_aggressive = []

for i in range(1000):
    result = simulate_election_ec(base_election, swing_states, flip_probability=0.8, electoral_map=electoral_votes)
    results_aggressive.append((
        i, 
        result['candidate'], 
        result['electoral_votes']
    ))

results_aggressive_df = spark.createDataFrame(
    results_aggressive, 
    ["sim_id", "winner", "electoral_votes"]
)

results_aggressive_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("simulation_aggressive")

# Check results
spark.table("simulation_aggressive") \
    .groupBy("winner") \
    .agg(
        count("*").alias("wins"),
        (count("*") / 1000 * 100).alias("win_probability")
    ) \
    .show()

#### Cell 13: More Swings States Scenario

In [0]:
# Scenario 2: Add more swing states
more_swing_states = ["FLORIDA", "PENNSYLVANIA", "MICHIGAN", "WISCONSIN", 
                     "ARIZONA", "GEORGIA", "NORTH CAROLINA", "NEVADA"]

results_more_states = []

for i in range(1000):
    result = simulate_election_ec(base_election, more_swing_states, flip_probability=0.5, electoral_map=electoral_votes)
    results_more_states.append((
        i, 
        result['candidate'], 
        result['electoral_votes']
    ))

results_more_df = spark.createDataFrame(
    results_more_states, 
    ["sim_id", "winner", "electoral_votes"]
)

results_more_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("simulation_more_states")

# Check results
spark.table("simulation_more_states") \
    .groupBy("winner") \
    .agg(
        count("*").alias("wins"),
        (count("*") / 1000 * 100).alias("win_probability")
    ) \
    .show()

### VISUALIZATION
#### Cell 14: Compare All Scenarios

In [0]:
import matplotlib.pyplot as plt
import pandas as pd

# Get results from all three scenarios
base_wins = spark.table("simulation_results").groupBy("winner").count().toPandas()
base_wins['scenario'] = 'Base (50% flip, 4 states)'

aggressive_wins = spark.table("simulation_aggressive").groupBy("winner").count().toPandas()
aggressive_wins['scenario'] = 'Aggressive (80% flip, 4 states)'

more_states_wins = spark.table("simulation_more_states").groupBy("winner").count().toPandas()
more_states_wins['scenario'] = 'More States (50% flip, 8 states)'

# Combine
all_results = pd.concat([base_wins, aggressive_wins, more_states_wins])

# Plot
fig, ax = plt.subplots(figsize=(12, 6))
all_results_pivot = all_results.pivot(index='scenario', columns='winner', values='count').fillna(0)
all_results_pivot.plot(kind='bar', ax=ax, width=0.8)

plt.title('Election Outcomes Across Different Scenarios (100 simulations each)', fontsize=14, fontweight='bold')
plt.ylabel('Number of Wins', fontsize=12)
plt.xlabel('Scenario', fontsize=12)
plt.legend(title='Winner', fontsize=10)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

#### Cell 15: Electoral Vote Distribution

In [0]:
# Show distribution of electoral votes for each winner
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Base scenario
base_ev = spark.table("simulation_results").toPandas()
for winner in base_ev['winner'].unique():
    winner_data = base_ev[base_ev['winner'] == winner]['electoral_votes']
    axes[0].hist(winner_data, bins=20, alpha=0.7, label=winner)
axes[0].set_title('Base Scenario')
axes[0].set_xlabel('Electoral Votes')
axes[0].set_ylabel('Frequency')
axes[0].legend()
axes[0].axvline(270, color='red', linestyle='--', label='270 to win')

# Aggressive scenario
agg_ev = spark.table("simulation_aggressive").toPandas()
for winner in agg_ev['winner'].unique():
    winner_data = agg_ev[agg_ev['winner'] == winner]['electoral_votes']
    axes[1].hist(winner_data, bins=20, alpha=0.7, label=winner)
axes[1].set_title('Aggressive Scenario')
axes[1].set_xlabel('Electoral Votes')
axes[1].legend()
axes[1].axvline(270, color='red', linestyle='--')

# More states scenario
more_ev = spark.table("simulation_more_states").toPandas()
for winner in more_ev['winner'].unique():
    winner_data = more_ev[more_ev['winner'] == winner]['electoral_votes']
    axes[2].hist(winner_data, bins=20, alpha=0.7, label=winner)
axes[2].set_title('More States Scenario')
axes[2].set_xlabel('Electoral Votes')
axes[2].legend()
axes[2].axvline(270, color='red', linestyle='--')

plt.tight_layout()
plt.show()

### SUMMARY REPORT
#### Cell 16: Generate Summary Statistics

In [0]:
from pyspark.sql.functions import count, avg, min, max

print("=" * 80)
print("2016 ELECTION 'WHAT-IF' SIMULATION SUMMARY")
print("=" * 80)
print()

# Scenario 1: Base
print("SCENARIO 1: Base Case (4 swing states, 50% flip probability)")
print("-" * 80)
base_summary = spark.table("simulation_results").groupBy("winner").agg(
    count("*").alias("wins"),
    avg("electoral_votes").alias("avg_ev"),
    min("electoral_votes").alias("min_ev"),
    max("electoral_votes").alias("max_ev")
).toPandas()
print(base_summary.to_string(index=False))
print()

# Scenario 2: Aggressive
print("SCENARIO 2: Aggressive (4 swing states, 80% flip probability)")
print("-" * 80)
agg_summary = spark.table("simulation_aggressive").groupBy("winner").agg(
    count("*").alias("wins"),
    avg("electoral_votes").alias("avg_ev"),
    min("electoral_votes").alias("min_ev"),
    max("electoral_votes").alias("max_ev")
).toPandas()
print(agg_summary.to_string(index=False))
print()

# Scenario 3: More states
print("SCENARIO 3: More Swing States (8 swing states, 50% flip probability)")
print("-" * 80)
more_summary = spark.table("simulation_more_states").groupBy("winner").agg(
    count("*").alias("wins"),
    avg("electoral_votes").alias("avg_ev"),
    min("electoral_votes").alias("min_ev"),
    max("electoral_votes").alias("max_ev")
).toPandas()
print(more_summary.to_string(index=False))
print()

print("=" * 80)