In [5]:
import geopandas as gpd
import pandas as pd
import warnings

from tqdm import tqdm
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
tqdm.pandas()
warnings.filterwarnings("ignore")

In [6]:
area_df = gpd.read_file('data/processed/areas.shp')
area_df['latlng'] = area_df.lookup_lat.fillna(area_df.center_lat).apply(eval)
area_df = area_df.rename(columns={'intersecti': 'intersection_id'})
area_df.head()

Unnamed: 0,intersection_id,zip,county,district,population,state,state_id,center_lat,lookup_lat,best_width,link,geometry,latlng
0,1,68791,Cuming,1,1837.0,Nebraska,NE,"(41.98449635881243, -96.93118532229558)","(41.99103, -96.93245)",12607.980224,https://edbltn.github.io/show-me-the-ballot/da...,"POLYGON ((-84034.004 2097379.512, -84032.239 2...","(41.99103, -96.93245)"
1,3,68047,Cuming,1,1873.0,Nebraska,NE,"(42.062632719821345, -96.75557887109892)","(42.062632719821345, -96.75557887109892)",6308.918559,https://edbltn.github.io/show-me-the-ballot/da...,"POLYGON ((-72486.716 2121367.628, -72443.255 2...","(42.062632719821345, -96.75557887109892)"
2,5,68057,Cuming,1,1429.0,Nebraska,NE,"(41.74840889363864, -96.5930601332286)","(41.74840889363864, -96.5930601332286)",1515.191617,https://edbltn.github.io/show-me-the-ballot/da...,"MULTIPOLYGON (((-47814.365 2082200.786, -47831...","(41.74840889363864, -96.5930601332286)"
3,6,68038,Cuming,1,1220.0,Nebraska,NE,"(41.91353394503433, -96.57127049697733)","(41.91353394503433, -96.57127049697733)",1695.223324,https://edbltn.github.io/show-me-the-ballot/da...,"MULTIPOLYGON (((-45636.531 2112792.554, -45636...","(41.91353394503433, -96.57127049697733)"
4,7,68641,Cuming,1,1076.0,Nebraska,NE,"(41.802224568274966, -96.99334413039195)","(41.802224568274966, -96.99334413039195)",3324.730133,https://edbltn.github.io/show-me-the-ballot/da...,"POLYGON ((-79594.505 2082551.119, -79951.676 2...","(41.802224568274966, -96.99334413039195)"


In [7]:
area_df['district'] = area_df.district.str.replace('00', '01')

# Define Ballotpedia API lookup

Caches and retrieves Ballotpedia geographic data for a given latitude/longitude. Uses polite rate limiting and request headers. Function has a 100,000 item cache to avoid duplicate API calls.

In [8]:
import requests
from functools import lru_cache

@lru_cache(maxsize=100_000)
def get_ballotpedia_data_rigorous(lat, lng, rate_limit=2):
    url = "https://api4.ballotpedia.org/myvote_redistricting_with_historical"
    params = {
        'long': str(lng),
        'lat': str(lat),
        'include_volunteer': 'true'
    }
    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36',
        'Referer': 'https://sblv3.ballotpedia.org/',
        'Origin': 'https://sblv3.ballotpedia.org'
    }
    
    response = requests.get(url, params=params, headers=headers)

    # Rate limit
    time.sleep(rate_limit)
    
    if response.json().get('message') == 'Forbidden':
        raise PermissionError
    
    return response.json()
    

# Define processing functions

Functions to transform raw API election data into a structured format:
- Process ballot measures and candidate information
- Calculate decision metrics (number of races and options)
- Format final output as a pandas DataFrame with location data

In [9]:
import pandas as pd
from typing import Dict, List, Optional
from collections import defaultdict

def extract_election_data(api_response: Dict) -> List[Dict]:
    return api_response.get('data', {}).get('elections', [])

def process_ballot_measure(measure: Dict, common_data: Dict) -> Dict:
    return {
        **common_data,
        'race_type': 'Ballot Measure',
        'office.name': measure['name'],
        'office.type': 'Ballot Measure',
        'office.level': common_data['district_type'],
        'office.branch': 'N/A',
        'number_of_seats': 1,
        'person.name': 'Yes/No Question',
        'person.url': None,
        'party_affiliation': None,
        'status': 'On the Ballot',
        'is_incumbent': False,
        'running_mate.name': None,
        'measure_id': measure['id'],
        'measure_district_type': measure['district_type']
    }

def process_candidate(candidate: Dict, race: Dict, common_data: Dict) -> Dict:
    office = race['office']
    return {
        **common_data,
        'race_type': 'Candidate',
        'office.name': office['name'],
        'office.type': office['type'],
        'office.level': office['level'],
        'office.branch': office['branch'],
        'number_of_seats': race['number_of_seats'],
        'person.name': candidate['person']['name'],
        'person.url': candidate['person']['url'],
        'party_affiliation': candidate['party_affiliation'],
        'status': candidate['status'],
        'is_incumbent': candidate['is_incumbent'],
        'running_mate.name': candidate['running_mate']['name'] if candidate.get('running_mate') else None,
        'measure_id': None,
        'measure_district_type': None
    }

def process_district(district: Dict, election_date: str) -> List[Dict]:
    common_data = {
        'election_date': election_date,
        'district_name': district['name'],
        'district_type': district['type']
    }
    
    ballot_measures = [process_ballot_measure(measure, common_data) for measure in district.get('ballot_measures') or []]
    candidates = [process_candidate(candidate, race, common_data) 
                  for race in district.get('races')  or []
                  for candidate in race['candidates']]
    
    return ballot_measures + candidates

def calculate_decision_metrics(df: pd.DataFrame) -> pd.DataFrame:
    def count_decisions_and_options(group):
        decisions = defaultdict(int)
        for _, row in group.iterrows():
            if row['race_type'] == 'Ballot Measure':
                decisions[row['office.name']] = 2  # Yes/No options
            else:
                decisions[row['office.name']] += 1
        
        unique_decisions = len(decisions)
        total_options = sum(decisions.values())
        
        group['unique_decisions'] = unique_decisions
        group['total_options'] = total_options
        return group

    return df.groupby(['district_name', 'district_type']).apply(count_decisions_and_options).reset_index(drop=True)

def process_api_response(api_response: Dict, lat: float, lng: float) -> Optional[pd.DataFrame]:
    if not api_response:
        return None
        
    elections = extract_election_data(api_response)
    if not elections:
        return None
    
    processed_data = [
        item for election in elections
        for district in election['districts']
        for item in process_district(district, election['date'])
    ]
    
    df = pd.DataFrame(processed_data)
    df['group_id'] = df.groupby(['district_name', 'district_type']).ngroup()
    df['lat'], df['lng'] = lat, lng
    
    df = calculate_decision_metrics(df)
    
    return df

# Define main processing loop

Processes and accumulates data for each geographic point through the Ballotpedia API

In [71]:
import json
result_df = pd.read_csv('data/processed/results_2024-11-03.csv')

In [72]:
result_df['zip'] = result_df.zip.astype(str)
result_df['district'] = result_df.district.astype(str).str.slice(-2)
response_lookup = dict(result_df.set_index(['zip', 'county', 'district']).response.apply(json.loads))

In [78]:
import time

def process_dataframe(df):
    all_results = []
    
    for row in tqdm(df.itertuples(), total=len(df), desc="Processing rows"):

        lat, lng = row.latlng
        if (row.zip, row.county, row.district) in response_lookup:
            response = response_lookup[row.zip, row.county, row.district]
        else:
            response = None
            for i in range(60):
                try:
                    response = get_ballotpedia_data_rigorous(lat, lng, rate_limit=0.1)
                    if response['data']['districts'] is not None:
                        response['data']['districts'][0]
                    break
                except PermissionError:
                    if i == 0:
                        print('PermissionError:', lat, lng, row.state, row.county)
                    os.system('say "beep"')  # Uses system text-to-speech to make a sound
                    time.sleep(3**(i))
                    continue
                except Exception as e:
                    print('Exception:', e, lat, lng, row.state, row.county)
                    print(response)
                    print(e)
                    time.sleep(2**(i))
                    continue
                
        result_df = process_api_response(response, lat, lng)
        if result_df is None:
            input(print(response, row.county, row.state))
            continue
        else:
            response_lookup[row.zip, row.county, row.district] = response
            
        result_df['response'] = json.dumps(response)
        result_df['intersection_id'] = row.intersection_id
        result_df['district'] = row.district
        result_df['county'] = row.county
        result_df['state'] = row.state
        result_df['zip'] = row.zip
        results = result_df.to_dict(orient='records')

        
        all_results.extend(results)
        
    return pd.DataFrame(all_results)

In [79]:
result_df = process_dataframe(area_df)

Processing rows:  49%|█████████▋          | 28995/59527 [03:33<1:27:04,  5.84it/s]

PermissionError: 45.1845 -68.58681 Maine Penobscot


Processing rows:  53%|███████████          | 31252/59527 [06:58<02:02, 230.62it/s]

PermissionError: 39.62188 -74.22421 New Jersey Ocean


Processing rows:  60%|█████████████▎        | 36007/59527 [08:29<21:02, 18.63it/s]

PermissionError: 42.98899 -71.12494 New Hampshire Rockingham


Processing rows:  66%|██████████████▍       | 39089/59527 [10:01<08:53, 38.30it/s]

PermissionError: 44.5189620599165 -71.86961429979132 Vermont Caledonia


Processing rows:  68%|██████████████▉       | 40378/59527 [11:43<47:47,  6.68it/s]

PermissionError: 40.794977818158124 -74.20997249777953 New Jersey Essex


Processing rows:  71%|██████████████▏     | 42252/59527 [13:46<2:07:57,  2.25it/s]

PermissionError: 40.89161 -73.97368 New Jersey Bergen


Processing rows:  82%|█████████████████▉    | 48535/59527 [16:05<17:28, 10.49it/s]

PermissionError: 43.4776296551475 -72.7666295589081 Vermont Windsor


Processing rows:  82%|████████████████▎   | 48553/59527 [16:29<1:09:34,  2.63it/s]

Exception: HTTPSConnectionPool(host='api4.ballotpedia.org', port=443): Max retries exceeded with url: /myvote_redistricting_with_historical?long=-72.45177&lat=43.72791&include_volunteer=true (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x4fe8031d0>: Failed to establish a new connection: [Errno 65] No route to host')) 43.72791 -72.45177 Vermont Windsor
None
HTTPSConnectionPool(host='api4.ballotpedia.org', port=443): Max retries exceeded with url: /myvote_redistricting_with_historical?long=-72.45177&lat=43.72791&include_volunteer=true (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x4fe8031d0>: Failed to establish a new connection: [Errno 65] No route to host'))


Processing rows:  84%|██████████████████▍   | 49934/59527 [17:45<14:24, 11.10it/s]

PermissionError: 44.14946 -72.67239 Vermont Washington


Processing rows:  90%|███████████████████▊  | 53751/59527 [19:06<04:55, 19.53it/s]

PermissionError: 47.27371 -68.39936 Maine Aroostook


Processing rows:  93%|████████████████████▍ | 55456/59527 [20:59<02:51, 23.77it/s]

PermissionError: 41.73 -70.81 Massachusetts Plymouth


Processing rows:  95%|████████████████████▉ | 56671/59527 [22:52<10:44,  4.43it/s]

PermissionError: 44.67342 -72.21171 Vermont Orleans


Processing rows: 100%|█████████████████████▉| 59460/59527 [24:53<00:00, 70.18it/s]

{'success': True, 'data': {'districts': None, 'polling_place_locator': None, 'elections': None}, 'message': None} Hawaii Hawaii


Processing rows: 100%|█████████████████████▉| 59460/59527 [25:04<00:00, 70.18it/s]

None 


Processing rows: 100%|█████████████████████▉| 59463/59527 [25:13<00:21,  2.91it/s]

PermissionError: 13.45472 144.75128 Hawaii Kauai


Processing rows: 100%|██████████████████████| 59527/59527 [25:49<00:00, 38.43it/s]


In [None]:
result_df['district'] = result_df.district.str.replace('00', '01')

state_id_lookup = dict(zip(area_df.state, area_df.state_id))

import re

def extract_type_district_numbers(text, state):
    # Regular expression pattern to match <TYPE> District <NUMBER>
    pattern = f'U\\.S\\. House \\b{state}\\b (?:D|d)istrict (\\d+)'
    matches = re.findall(pattern, text)
    
    # Return matches as a list of tuples with <TYPE> and <NUMBER>
    return (matches or [None])[0]

In [206]:
result_df['state_id'] = result_df.state.map(state_id_lookup)
result_df['district'] = result_df.district.fillna(result_df.progress_apply(
    lambda x: extract_type_district_numbers(x.response, x.state) or 0, axis=1
)).str.slice(-2)
result_df['district'] = result_df.apply(lambda x: f'{x.state_id}-CD{x.district:02}', axis=1)

result_df = result_df[~result_df.district.str.endswith('00')]

100%|████████████████████████████████| 2320788/2320788 [00:43<00:00, 53621.24it/s]


In [207]:
from datetime import date

today = date.today().strftime("%Y-%m-%d")

result_df[[
    'zip', 'county', 'district', 'state', 'lat', 'lng', 'response'
]].drop_duplicates(['zip', 'county', 'district']).to_csv(f'data/processed/results_{today}.csv')

# Define ballot complexity analyzer

Function to analyze ballot complexity using GPT-4:
- Identifies non-partisan contests

Uses structured Pydantic models to validate GPT-4 outputs.

In [None]:
import json
import os
import re

from typing import List, Optional, Union, Literal
from pydantic import BaseModel, Field
from openai import OpenAI
from functools import lru_cache
from typing import List
from collections import Counter

# Initialize OpenAI client
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

class NonPartisanContest(BaseModel):
    is_non_partisan: bool

In [209]:
# Updated prompt for remaining indicators and analysis
SYSTEM_PROMPT = """
Determine if this is a non-partisan election contest.

Non-partisan contests typically:
- Don't list party affiliations
- Include local offices like school boards, city councils
- Include most judicial positions
- Are specifically designated as non-partisan

Partisan contests typically:
- List party affiliations
- Include races for Congress, President, Governor
- Are primary elections for political parties
- Include party committee positions

Provide analysis in JSON format:
{
    "is_non_partisan": boolean
}
"""
# Function to analyze a single ballot
@lru_cache(maxsize=100_000)
def analyze_race(race):
    
    # Create prompt for remaining analysis
    prompt = f"""
    Analyze the following race:
    {race}
    
    Provide the analysis in the specified structured JSON format.
    """
    
    try:
        completion = client.beta.chat.completions.parse(
            model="gpt-4o-mini",
            temperature=0.0,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": prompt}
            ],
            response_format=NonPartisanContest
        )
        
        # Parse the JSON response
        analysis = json.loads(completion.choices[0].message.content)
        return analysis['is_non_partisan']
    except Exception as e:
        print(f"Error analyzing ballot: {e}")
        return None

# Define ballot formatting functions

Functions to create a markdown-formatted ballot display:
- Sorts races by importance (President → Federal → State → Local → Measures)
- Formats candidate details including party, incumbent status, and links
- Creates checkboxes for voting options

In [210]:
import pandas as pd

def format_and_display_ballot_info(group):
    # Sort the races
    sorted_group = sort_races(group)
    
    # Start building the markdown output
    markdown_output = f"# Ballot for {sorted_group['county'].iloc[0]} County, {sorted_group['state'].iloc[0]}\n\n"
    markdown_output += f"Election Date: {sorted_group['election_date'].iloc[0]}\n\n"

    # Process races by office
    outputs = []
    for office, office_group in sorted_group.groupby('office.name'):
        outputs.append((format_office(office, office_group), office_group.key.iloc[0]))

    outputs.sort(key=lambda x: x[1])
    for output, key in outputs:
        markdown_output += output

    return markdown_output

def sort_races(group):
    def race_priority(row):
        office = row['office.name'].lower()
        if row['race_type'] == 'Ballot Measure':
            return 5
        elif 'president' in office:
            return 0
        elif office.startswith('u.s.'):
            return 1
        elif 'governor' in office:
            return 2
        elif row['office.level'] == 'State':
            return 3
        else:
            return 4

    group['key'] = group.apply(race_priority, axis=1)
    group = group.sort_values(by='key')

    return group

def format_office(office, office_group):
    first_row = office_group.iloc[0]
    output = f"## {office}\n\n"
    
    if first_row['race_type'] == 'Ballot Measure':
        output += format_ballot_measure(first_row)
    else:
        output += format_candidate_race(office_group)
    
    output += "---\n\n"
    return output

def format_ballot_measure(measure):
    output = f"**Type:** Ballot Measure\n"
    output += f"**District Type:** {measure['measure_district_type']}\n\n"
    output += "### Options:\n"
    output += "- [ ] Yes\n"
    output += "- [ ] No\n\n"
    return output

def format_candidate_race(race_group):
    first_row = race_group.iloc[0]
    output = f"**Level:** {first_row['office.level']}\n"
    output += f"**Branch:** {first_row['office.branch']}\n"
    output += f"**Number of Seats:** {first_row['number_of_seats']}\n\n"
    output += "### Candidates:\n"
    
    for _, candidate in race_group.iterrows():
        output += format_candidate(candidate)
    
    return output

def format_candidate(candidate):
    party = candidate['party_affiliation']
    party_name = party[0]['name'] if isinstance(party, list) and party else 'Unknown'
    
    output = f"- [ ] **{candidate['person.name']}** ({party_name})\n"
    output += f"  - Incumbent: {'Yes' if candidate['is_incumbent'] else 'No'}\n"
    if pd.notna(candidate['person.url']):
        output += f"  - [More Info]({candidate['person.url']})\n"
    if pd.notna(candidate['running_mate.name']):
        output += f"  - Running Mate: {candidate['running_mate.name']}\n"
    output += "\n"
    return output

In [211]:
tqdm.pandas()

rows = []
for group_name, group in tqdm(result_df.groupby(['intersection_id', 'state'])):
    subset = [c for c in group.columns if c not in ['lat', 'lng', 'group_id', 'zip']]
    ballot_text = format_and_display_ballot_info(group.drop_duplicates(['office.name', 'person.name']))
    
    races = [format_office(office, office_group)
             for office, office_group in group.groupby('office.name')
             if office_group.iloc[0].race_type != 'Ballot Measure']
    
    measures = [format_office(office, office_group)
                for office, office_group in group.groupby('office.name')
                if office_group.iloc[0].race_type == 'Ballot Measure']
    
    comp_races = [format_office(office, office_group)
                  for office, office_group in group.groupby('office.name')
                  if office_group.iloc[0].race_type != 'Ballot Measure' and len(office_group) > office_group.iloc[0].number_of_seats]
    
    non_partisan_races = [r.strip('# ').split('\n')[0] for r in races if analyze_race(r)]
    ballot_length = len(ballot_text)
    word_count = len(ballot_text.split())
    
    row = {
        'intersection_id': group_name[0],
        'lat': group['lat'].iloc[0], 'lng': group['lng'].iloc[0],
        'zip': group['zip'].iloc[0],
        'response': group['response'].iloc[0],
        'ballot_markdown': ballot_text, 
        'state_name': group_name[1],
        'county': group['county'].iloc[0],
        'district': group['district'].iloc[0],
        'unique_decisions':  group['office.name'].nunique(),
        'measures': measures,
        'races': races,
        'comp_races': comp_races,
        'non_partisan_races': non_partisan_races,
        "ballot_length": ballot_length,
        "word_count": word_count,
        'total_options': (group.race_type == 'Ballot Measure').sum() * 2 + (group.race_type != 'Ballot Measure').sum()
    }
    rows.append(row)

full_df = pd.DataFrame(rows).set_index(['intersection_id', 'state_name'])

100%|██████████████████████████████████████| 59252/59252 [06:28<00:00, 152.58it/s]


# Define readability calculator

Function to calculate Flesch-Kincaid Grade Level score for ballot text:
- Strips markdown formatting
- Returns reading grade level (higher score = more complex)

In [212]:
import re
from textstat import flesch_kincaid_grade

def calculate_flesch_kincaid(text):
    # Remove any markdown formatting
    clean_text = re.sub(r'[#*_`]', '', text)
    
    # Calculate Flesch-Kincaid Grade Level
    grade_level = flesch_kincaid_grade(clean_text)
    
    return round(grade_level, 2)

In [213]:
# Apply the analysis to the DataFramerow['ballot_markdown'])
full_df['flesch_kincaid_grade'] = full_df.ballot_markdown.progress_apply(calculate_flesch_kincaid)
full_df['grade_level'] = full_df.flesch_kincaid_grade.apply(lambda x: f'{x:.1f}')

100%|█████████████████████████████████████| 59252/59252 [00:15<00:00, 3903.48it/s]


In [214]:
full_df = full_df[full_df.ballot_markdown.notna()]

# Calculate complexity scores

Function to compute weighted ballot complexity scores based on multiple factors:
- Technical language and readability (Flesch-Kincaid)
- Ballot length and word count 
- Information density
- Number and complexity of decisions
- Non-partisan contest presence

Each factor is normalized against maximum values across all ballots.

In [215]:
full_df['avg_words_per_decision'] = full_df.word_count / full_df.unique_decisions
full_df['avg_options_per_decision'] = full_df.total_options / full_df.unique_decisions

In [216]:
def calculate_complexity_score(ballot_analysis, max_avg_options_per_decision, max_avg_words_per_decision, max_unique_decisions,
                               max_ballot_length, max_word_count, max_flesch_kincaid_grade):
    # Define weights for each indicator
    weights = {
        'ballot_length': 0.175,
        'word_count': 0.125,
        'avg_words_per_decision': 0.125,
        'unique_decisions': 0.15,
        'non_partisan_contests': 0.075,  # increased by 0.025
        'avg_options_per_decision': 0.125,
        'flesch_kincaid_grade': 0.125
    }    
    # Extract values with nested field access where necessary
    score = (
        (ballot_analysis['ballot_length'] * weights['ballot_length'] / max_ballot_length) +
        (ballot_analysis['word_count'] * weights['word_count'] / max_word_count) +
        (ballot_analysis['avg_words_per_decision'] * weights['avg_words_per_decision'] / max_avg_words_per_decision) +
        (ballot_analysis['avg_options_per_decision'] * weights['avg_options_per_decision'] / max_avg_options_per_decision) +
        (ballot_analysis['unique_decisions'] * weights['unique_decisions'] / max_unique_decisions) +
        (ballot_analysis['flesch_kincaid_grade'] * weights['flesch_kincaid_grade'] / max_flesch_kincaid_grade) +
        (int(not not ballot_analysis['non_partisan_races']) * weights['non_partisan_contests'])
    )
    
    return score

full_df['complexity_score'] = full_df.apply(
    calculate_complexity_score,
    max_ballot_length=full_df.ballot_length.max(),
    max_word_count=full_df.word_count.max(),
    max_avg_words_per_decision=full_df.avg_words_per_decision.max(),
    max_avg_options_per_decision=full_df.avg_options_per_decision.max(),
    max_unique_decisions=full_df.unique_decisions.max(),
    max_flesch_kincaid_grade=full_df.flesch_kincaid_grade.max(),
    axis=1
)

In [217]:
# # Sort and identify highest and lowest complexity ballots

full_df_sorted = full_df.reset_index().drop_duplicates('intersection_id').sort_values('complexity_score', ascending=False)
highest_complexity = full_df_sorted.head(2000)
lowest_complexity = full_df_sorted.tail(10000)
highest_complexity[['zip', 'county', 'district', 'state_name', 'complexity_score']].drop_duplicates(['state_name'], keep='first').head(10)
lowest_complexity[['zip', 'county', 'district', 'state_name', 'complexity_score']].drop_duplicates(['state_name'], keep='last').head(10)

Unnamed: 0,zip,county,district,state_name,complexity_score
23439,77375,Harris,TX-CD02,Texas,0.684015
47032,48363,Oakland,MI-CD09,Michigan,0.585903
6723,90201,Los Angeles,CA-CD38,California,0.552424
5000,97227,Multnomah,OR-CD03,Oregon,0.552055
19745,59701,Silver Bow,MT-CD01,Montana,0.495504
15427,85035,Maricopa,AZ-CD03,Arizona,0.48516
31257,98230,Whatcom,WA-CD02,Washington,0.477681


Unnamed: 0,zip,county,district,state_name,complexity_score
28489,71432,Winn,LA-CD05,Louisiana,0.214484
23299,96120,Alpine,CA-CD03,California,0.209561
29547,2911,Providence,RI-CD01,Rhode Island,0.208641
39847,38111,Shelby,TN-CD08,Tennessee,0.208509
30588,83344,Cassia,ID-CD02,Idaho,0.206419
20732,19973,Sussex,DE-CD01,Delaware,0.205911
58963,36528,Mobile,AL-CD10,Alabama,0.191256
59084,4570,Lincoln,ME-CD10,Maine,0.183272
45347,40177,Hardin,KY-CD02,Kentucky,0.183225
36037,3801,Rockingham,NH-CD01,New Hampshire,0.182229


# Define report generator

Function to create a markdown-formatted ballot complexity report with:
- Overall complexity score
- Decision complexity metrics (unique decisions, options, density)
- Language complexity (Flesch-Kincaid grade level)
- Ballot length statistics
- AI analysis disclaimer

In [218]:
def get_report_markdown(ballot_report, complexity_score):
    # Generate Markdown report with AI disclaimer
    report_md = f"""# Ballot Complexity Report
 
*This report provides an AI-assisted analysis of ballot complexity. Please note that this is a supplementary analysis and not a substitute for official election information.*

**Overall Complexity Score: {complexity_score*100:.1f}/100**
|                         |                                 |                                          |
|-------------------------|---------------------------------|------------------------------------------|
| **Decision Complexity** | Number of Questions             | {ballot_report['unique_decisions']}      |
|                         | Average Words per Question      | {ballot_report['avg_words_per_decision']:.1f}|
|                         | Average Options per Question    | {ballot_report['avg_options_per_decision']:.1f}|
|                         | Number of Races                 | {len(ballot_report['races'])}|
|                         | Number of Competitive Races     | {len(ballot_report['comp_races'])}|
|                         | Number of Ballot Measures       | {len(ballot_report['measures'])}|
"""
    
    # Conditionally add Non-Partisan Contests
    if ballot_report['non_partisan_races']:
        report_md += "|                         | Non-Partisan Races  | " + ", ".join(ballot_report['non_partisan_races'][:3]) + " |\n"
    
    # Language Complexity Section
    report_md += f"""| **Language Complexity** | [Flesch-Kincaid Grade Level](https://ballotpedia.org/Ballot_measure_readability_scores,_2024#Flesch-Kincaid_Grade_Level)""" + \
    f"""| {ballot_report['flesch_kincaid_grade']}  years of education      |
"""
    # Length Section
    report_md += f"""| **Length**              | Ballot Length                 | {ballot_report['ballot_length']:,} characters       |
|                         | Word Count                     | {ballot_report['word_count']:,}                         |
"""
    return report_md.replace('.0', '')

In [219]:
population_lookup = dict(zip(area_df.zip.apply(lambda x: f'{x:05}').astype(str), area_df.population))
full_df['zip'] = full_df.zip.apply(lambda x: f'{x:05}').astype(str)
full_df['population'] = full_df.zip.map(population_lookup)
full_df = full_df.sort_values(by='population', ascending=False)

zip_lookup = full_df.reset_index().rename(columns={
    'county_name': 'county',
    'state_name': 'state',
})[['state', 'county', 'zip', 'population']]

zip_lookup = zip_lookup.sort_values(by='population', ascending=False).drop(columns=['population'])
zip_lookup = zip_lookup.drop_duplicates(['state', 'county', 'zip'])
zip_lookup.head()

zip_lookup.to_csv('data/processed/zip_lookup.csv', index=False)

Unnamed: 0,state,county,zip
0,New Jersey,Ocean,87010
3,New Mexico,Santa Fe,87010
1,New Jersey,Ocean,8701
7,Texas,Fort Bend,77494
4,Texas,Harris,77494


In [220]:
full_df['full_markdown'] = full_df.apply(
    lambda x: get_report_markdown(x, x.complexity_score), axis=1
) + '\n---\n' + full_df.ballot_markdown

In [221]:
COLUMNS = {
    'unique_decisions': 'unique_questions',
    'avg_words_per_decision': 'avg_words_per_question',
    'avg_options_per_decision': 'avg_options_per_question',
    'races': 'races',
    'measures': 'measures',
    'avg_words_per_decision': 'avg_words_per_question',
    'comp_races': 'competitive_races',
    'non_partisan_races': 'non_partisan_races',
    'flesch_kincaid_grade': 'flesch_kincaid_grade',
    'ballot_length': 'ballot_length',
    'word_count': 'word_count'
}

In [228]:
from collections import defaultdict

full_df['pop'] = full_df.zip.map(population_lookup)
data_lookup = full_df.reset_index().sort_values(by='pop', ascending=False).rename(columns={
    'county_name': 'county',
    'state_name': 'state',
    **COLUMNS
})[['state', 'district', 'county', 'zip', 'full_markdown', *COLUMNS.values()]].drop_duplicates(
    ['state', 'district', 'county', 'zip']
)

In [229]:
grouping = ['state', 'district', 'county', 'zip', 'full_markdown']
data_lookup = data_lookup.groupby(grouping).apply(
    lambda df: pd.Series({
        **df.drop(columns=grouping).to_dict(orient='records')[0],
        'measures': len(df.measures.iloc[0]),
        'races': len(df.races.iloc[0]),
        'competitive_races': len(df.competitive_races.iloc[0]),
        'non_partisan_races': df.non_partisan_races.iloc[0],
    })
).reset_index()
data_lookup['district'] = data_lookup.district.str.replace('00', '01')
data_lookup['district'] = data_lookup.district.str.replace('DC-CD98', '')

In [230]:
from datetime import date

today = date.today().strftime("%Y%m%d")
data_lookup.to_csv(f'data/processed/data.csv')
data_lookup.to_csv(f'data/processed/data_{today}.csv')
for zip_code in tqdm(data_lookup.zip.unique()):
    data_lookup[data_lookup.zip == zip_code].to_csv(f'data/processed/zip_data_{zip_code}.csv'.lower().replace(' ', ''))

100%|██████████████████████████████████████| 35030/35030 [01:21<00:00, 427.64it/s]


# Perform repairs

Fix old reports without having to reprocess all the data.

In [405]:
data_df = pd.read_csv('data/processed/data_20241014.csv')

data_df['information_density'] = data_df.ballot_markdown.apply(
    lambda x: re.search(r'(Information Density\s+\|)\s(High|Medium|Low)', x)[2].lower()
)
data_df['word_count'] = data_df.ballot_markdown.apply(
    lambda x: int(re.search(r'(Word Count\s+\|)\s(\d+)', x)[2])
)
data_df['ballot_length'] = data_df.ballot_markdown.apply(
    lambda x: int(re.search(r'(Ballot Length\s+\|)\s(\d+)', x)[2])
)
data_df['information_density'] = data_df.ballot_markdown.apply(
    lambda x: re.search(r'(Information Density\s+\|)\s(High|Medium|Low)', x)[2].lower()
)
data_df['flesch_kincaid_grade'] = data_df.ballot_markdown.apply(
    lambda x: float((re.search(r'(Kincaid_Grade_Level\)\|)\s(\d+(\.\d+)?)', x) or [None] * 5)[2])
)
data_df['non_partisan_contests'] = data_df.ballot_markdown.apply(
    lambda x: bool(['No', 'Yes'].index((re.search(r'(Non-Partisan Contests Present\s+\|)\s(Yes|No)', x) or ['No'] * 3)[2]))
)

data_df['grade_level'] = data_df.flesch_kincaid_grade.apply(lambda x: f'{x:.1f}' if x <= 18 else '18+')
data_df['unique_decisions'] = data_df.ballot_markdown.str.count('---\n') - 1
data_df['unique_options'] = data_df.ballot_markdown.str.count('- \\[ \\]')
data_df['avg_options_per_decision'] = data_df['unique_options'] / data_df['unique_decisions']
data_df['avg_words_per_decision'] = data_df['word_count'] / data_df['unique_decisions']

In [406]:
def calculate_complexity_score(ballot_analysis, max_avg_options_per_decision, max_avg_words_per_decision, max_unique_decisions,
                               max_ballot_length, max_word_count, max_flesch_kincaid_grade):
    # Define weights for each indicator
    weights = {
        'ballot_length': 0.175,
        'word_count': 0.125,
        'avg_words_per_decision': 0.125,
        'unique_decisions': 0.15,
        'non_partisan_contests': 0.075,  # increased by 0.025
        'avg_options_per_decision': 0.125,
        'flesch_kincaid_grade': 0.125
    }
        
    # Extract values with nested field access where necessary
    score = (
        (ballot_analysis['ballot_length'] * weights['ballot_length'] / max_ballot_length) +
        (ballot_analysis['word_count'] * weights['word_count'] / max_word_count) +
        (ballot_analysis['avg_words_per_decision'] * weights['avg_words_per_decision'] / max_avg_words_per_decision) +
        (ballot_analysis['unique_decisions'] * weights['unique_decisions'] / max_unique_decisions) +
        (ballot_analysis['flesch_kincaid_grade'] * weights['flesch_kincaid_grade'] / max_flesch_kincaid_grade)
    )
    
    return score * 100
    
data_df['complexity_score'] = data_df.apply(
    calculate_complexity_score,
    max_ballot_length=data_df.ballot_length.max(),
    max_word_count=data_df.word_count.max(),
    max_avg_words_per_decision=data_df.avg_words_per_decision.max(),
    max_avg_options_per_decision=data_df.avg_options_per_decision.max(),
    max_unique_decisions=data_df.unique_decisions.max(),
    max_flesch_kincaid_grade=data_df.flesch_kincaid_grade.max(),
    axis=1
)

In [407]:
import re


def fix_row(row):
    text = row.ballot_markdown

    # Reformat numbers
    for metric, key, fmt in (
        ('Unique Decisions', 'unique_decisions', 'd'),
        ('Average Words per Decision', 'avg_words_per_decision', '.1f'),
        ('Average Options per Decision', 'avg_options_per_decision', '.1f'),
        ('Word Count', 'word_count', ','),
        ('Ballot Length', 'ballot_length', ','),
    ):
        text = re.sub(f'({metric}\\s*\\s?\\|)\\s(\\d+(\\.\\d+)?)', f'\\1 {getattr(row, key):{fmt}}', text)

    # Custom reformats
    text = re.sub(f'(Kincaid_Grade_Level\\)\\s*\\s?\\|)\\s(\\d+(\\.\\d+)?)', f'\\1 {row.grade_level:s} years of education', text)
    text = re.sub(f'(Overall Complexity Score: )(\\d+(\\.\\d+)?)', f'\\1 {row.complexity_score:.1f}', text)

    # Remove information density and non-partisan contests present
    text = re.sub(r'\|\s+\| Information Density\s+\|.*\|', '', text).replace('|\n\n|', '|\n|')
    text = re.sub(r'\|\s+\| Non-Partisan Contests Present\s+\|.*\|', '', text).replace('|\n\n|', '|\n|')
    
    # Remove type
    text = re.sub(r'\*\*Type:\*\* .+', '', text).replace('\n\n\n', '\n\n')
    

    # Rename unique decisions
    text = text.replace('Unique Decisions', 'Number of Questions')
    text = text.replace('Average Words per Decision', 'Average Words per Question')
    text = text.replace('Average Options per Decision', 'Average Options per Question')
    text = text.replace('Non-Partisan Contest Examples', 'Non-Partisan Contests')
    
    
    return text

data_df['fixed_markdown'] = data_df.apply(fix_row, axis=1)
data_df[['fixed_markdown', 'ballot_markdown']].apply(lambda x: [len(y) for y in x], axis=1)

0       [12341, 12847]
1       [25600, 26840]
2       [13049, 13585]
3         [5249, 5703]
4       [25693, 26914]
             ...      
4915      [8191, 8894]
4916      [4433, 4785]
4917      [6378, 6957]
4918      [4531, 4856]
4919      [8327, 8840]
Length: 4920, dtype: object

In [408]:
x = data_df.sample(1)
print(x.fixed_markdown.sample(1).iloc[0][:2000])
print(x.ballot_markdown.sample(1).iloc[0][:2000])

# Ballot Complexity Report
 
*This report provides an AI-assisted analysis of ballot complexity. Please note that this is a supplementary analysis and not a substitute for official election information.*

**Overall Complexity Score:  31.5/100**
|                         |                                 |                                          |
|-------------------------|---------------------------------|------------------------------------------|
| **Decision Complexity** | Number of Questions                | 15      |
|                         | Average Words per Question      | 56.1|
|                         | Average Options per Question    | 2.5|
|                         | Non-Partisan Contests  | U.S. Senate Texas, Texas Railroad Commission |
| **Language Complexity** | [Flesch-Kincaid Grade Level](https://ballotpedia.org/Ballot_measure_readability_scores,_2024#Flesch-Kincaid_Grade_Level)| 15.4 years of education       |
| **Length**              | Ballot Length            

In [412]:
columns = pd.read_csv('data/processed/data_20241014.csv').columns
output_df = data_df.drop(columns='ballot_markdown').rename(columns={'fixed_markdown': 'ballot_markdown'})[columns]
print(output_df.ballot_markdown.sample(1).iloc[0])

# Ballot Complexity Report
 
*This report provides an AI-assisted analysis of ballot complexity. Please note that this is a supplementary analysis and not a substitute for official election information.*

**Overall Complexity Score:  32.9/100**
|                         |                                 |                                          |
|-------------------------|---------------------------------|------------------------------------------|
| **Decision Complexity** | Number of Questions                | 17      |
|                         | Average Words per Question      | 53.6|
|                         | Average Options per Question    | 2.3|
|                         | Non-Partisan Contests  | U.S. Senate Texas - Tracy Andrus (Independent), Analisa Roche (Independent), Texas Railroad Commission - Richard McKibbin (Independent) |
| **Language Complexity** | [Flesch-Kincaid Grade Level](https://ballotpedia.org/Ballot_measure_readability_scores,_2024#Flesch-Kincaid_Grade_Le

In [411]:
output_df.to_csv('data/processed/data_20241027.csv', index=False)

# Map Analysis

In [240]:
data_df[['county', 'district', 'state', 'complexity_score']].iloc[0]

county                   Cook
district               IL-CD5
state                Illinois
complexity_score    49.065499
Name: 0, dtype: object

In [291]:
complexity_df.district.iloc[0]

'IL-CD5'

In [361]:
tipping_point_df.state.unique()

array(['Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California',
       'Colorado', 'Connecticut', 'Delaware', 'District of Columbia',
       'Florida', 'Georgia', 'Hawaii', 'Idaho', 'Illinois', 'Indiana',
       'Iowa', 'Kansas', 'Kentucky', 'Louisiana', 'ME-1', 'ME-2', 'Maine',
       'Maryland', 'Massachusetts', 'Michigan', 'Minnesota',
       'Mississippi', 'Missouri', 'Montana', 'NE-1', 'NE-2', 'NE-3',
       'Nebraska', 'Nevada', 'New Hampshire', 'New Jersey', 'New Mexico',
       'New York', 'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma',
       'Oregon', 'Pennsylvania', 'Rhode Island', 'South Carolina',
       'South Dakota', 'Tennessee', 'Texas', 'Utah', 'Vermont',
       'Virginia', 'Washington', 'West Virginia', 'Wisconsin', 'Wyoming'],
      dtype=object)

In [354]:
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import LinearSegmentedColormap

def prepare_state_complexity(complexity_df):
    # Convert state names to uppercase for consistent matching
    complexity_df['state'] = complexity_df['state'].str.upper()
    
    # Group by state and calculate weighted median
    def weighted_median(group):
        # Sort complexity scores and corresponding weights
        sorted_data = group.sort_values('complexity_score')
        weights = sorted_data['voters']
        
        # Calculate cumulative weights
        cumsum = weights.cumsum()
        # Find middle point of total weight
        median_weight = weights.sum() / 2
        
        # Find the complexity score where cumulative weight crosses the median
        return sorted_data['complexity_score'][cumsum >= median_weight].iloc[0]
    
    return (complexity_df.groupby('state')
            .apply(weighted_median)
            .reset_index()
            .rename(columns={0: 'complexity_score'}))
    
def prepare_tipping_points(tipping_point_df):
    """Prepare tipping point probabilities."""
    tipping_data = tipping_point_df[['state', 'tipping']].copy()
    tipping_data['state'] = tipping_data['state'].str.upper()
    return tipping_data

def merge_state_data(state_complexity, tipping_data):
    """Merge complexity and tipping point data."""
    return state_complexity.merge(tipping_data, on='state', how='outer')

def create_color_scale():
    """Create a custom colormap from green to yellow to red."""
    colors = [(0.5, 0.5, 0.5),      # Green
             (0.75, 0.75, 0),         # Yellow
             (1, 0.5, 0)]       # Red
    return LinearSegmentedColormap.from_list('custom', colors)

def get_state_colors(merged_data, colormap):
    """Generate colors for each state based on complexity and tipping point."""
    def get_color(row):
        if pd.isna(row['complexity_normalized']):
            return (0.8, 0.8, 0.8)  # Grey for missing data
        
        # Get base color from complexity score
        base_color = colormap(row['complexity_normalized'])
        
        # Adjust saturation based on tipping point
        # Scale up the typically small tipping point values
        saturation = min(1.0, row['tipping'] * 20) if not pd.isna(row['tipping']) else 0.5
        
        # Mix with white based on saturation
        white = np.array([1, 1, 1])
        color = np.array(base_color[:3])
        return tuple(white * (1 - saturation) + color * saturation)

    # Normalize complexity scores
    complexity_min = merged_data['complexity_score'].min()
    complexity_max = merged_data['complexity_score'].max()
    merged_data['complexity_normalized'] = (
        (merged_data['complexity_score'] - complexity_min) / 
        (complexity_max - complexity_min)
    )
    
    return [get_color(row) for _, row in merged_data.iterrows()]

def setup_map_figure():
    """Create and setup the figure and axes for the map."""
    fig, ax = plt.subplots(figsize=(15, 10))
    ax.set_aspect('equal')
    ax.axis('off')
    return fig, ax

def add_colorbar(fig, ax, vmin, vmax):
    """Add a colorbar to the map."""
    cmap = create_color_scale()
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(vmin=vmin, vmax=vmax))
    cbar = fig.colorbar(sm, ax=ax, orientation='horizontal', pad=0.05)
    cbar.set_label('Complexity Score', labelpad=10)
    return cbar

In [360]:
population_lookup = dict(zip(
    df.zip, df.population
))
pair_population_lookup = {}
for zip_code, pairs in pair_lookup.items():
    population_lookup[zip_code] /= len(pairs)
    for p in pairs:
        pair_population_lookup[p] = pair_population_lookup.get(p, 0) + population_lookup[zip_code]

In [356]:
# Example usage:
complexity_df = data_df.copy()
complexity_df['voters'] = complexity_df.apply(lambda x: pair_population_lookup[x.district if str(x.district) != 'nan' else None, x.county], axis=1)

STATES = set(tipping_point_df.state.unique())
complexity_df['new_district'] = complexity_df.district.str.replace('CD', '')
complexity_df['state'] = complexity_df.apply(lambda x: x.state if x.new_district not in STATES else x.new_district, axis=1)

tipping_point_df = pd.read_excel('data/raw/State_Topline.xlsx').query('modeldate == "10/23/2024"')
states_gdf = gpd.read_file('data/raw/tl_2024_us_state/tl_2024_us_state.shp')

In [357]:
# Prepare data
state_complexity = prepare_state_complexity(complexity_df)
tipping_data = prepare_tipping_points(tipping_point_df)
merged_data = merge_state_data(state_complexity, tipping_data)

  .apply(weighted_median)


In [359]:
# Setup map
states_gdf = states_gdf.to_crs("ESRI:102003")
states_gdf['NAME'] = states_gdf.NAME.str.upper()
states_gdf = states_gdf.merge(merged_data, left_on='NAME', right_on='state', how='right')

# Generate colors
colormap = create_color_scale()
colors = get_state_colors(states_gdf, colormap)

# Create visualization
fig, ax = setup_map_figure()
states_gdf.plot(ax=ax, color=colors, edgecolor='black', linewidth=0.5)

# Add title and colorbar
plt.title('State Complexity Scores and Tipping Point Probabilities\n' +
          'Color: Green (Less Complex) to Red (More Complex)\n' +
          'Shade: White (Low Tipping Point Prob) to Full Color (High Tipping Point Prob)\n' +
          'Grey: No Data',
          pad=20)

add_colorbar(fig, ax, 
            vmin=states_gdf['complexity_score'].min(),
            vmax=states_gdf['complexity_score'].max())

plt.tight_layout()

plt.show()

KeyError: 'complexity_score'