In [2]:
import os
import re

import numpy as np
import pandas as pd
import geopandas as gpd
from tqdm import tqdm
tqdm.pandas()

from shapely.geometry import MultiPolygon, Polygon
from shapely.validation import make_valid

In [14]:
from constants import CEN_YEARS, FED_YEARS, ONTED_YEARS, FELXN_YEARS, ONTELXN_YEARS
from constants import YEAR_CODES

In [15]:
def get_census_year(year):
    """
    Returns the appropriate census year based on different rules for different periods:
    - Before 1961: return 1961
    - 1961-1980: round down to decade + 1 (1961, 1971)
    - After 1981: round down to nearest 5 + 1 (1981, 1986, 1991, etc.)
    
    Examples:
    1955 -> 1961
    1965 -> 1961
    1975 -> 1971
    1980 -> 1971
    1987 -> 1986
    2003 -> 2001
    """
    if year < 1961:
        return 1961
    elif year <= 1980:
        return year - ((year - 1961) % 10)
    else:
        return year - ((year - 1951) % 5)

def get_ed_year(year, is_ontario=True):
    """
    Returns the most recent electoral district year prior to the input year.
    
    Args:
        year (int): The year to look up
        is_ontario (bool): If True, use Ontario electoral districts years,
                       if False, use Federal electoral district years
    
    Returns:
        int: The most recent electoral district year

    Examples:
        get_ed_year(1962, is_ontario=True) -> 1962
        get_ed_year(1962, is_ontario=False) -> 1952
        get_ed_year(2003, is_ontario=True) -> 1996
        get_ed_year(2003, is_ontario=False) -> 2003
    """
    years = ONTED_YEARS if is_ontario else FED_YEARS
    valid_years = [y for y in years if y <= year]
    if not valid_years:
        return years[0]  # Return earliest year if input year is before all valid years
    return max(valid_years)

Create approximations for each of the census variables under consideration, for each election year. We want this at the level of electoral districts and approximated from census tracts.

In [16]:
def add_census_values_to_gdf(gdf_full, gdf_small, cen_year, cen_var):
    cols = YEAR_CODES[cen_year][cen_var]

    if len(cols) == 0:  
        gdf_small[cen_var] = np.nan
        if cen_var == 'num_not_vm_tot':
            gdf_small.rename(columns={'num_not_vm_tot': 'num_vm_tot'}, inplace=True)
    elif len(cols) > 1:  # New immigrants has multiple tags
        gdf_small.loc[:, cen_var] = gdf_full.loc[:, cols].sum(axis=1)
    elif cen_var == 'num_not_vm_tot':  # Need to compute total - NOT
        pop_total = gdf_full[YEAR_CODES[cen_year]['num_pop_tot'][0]]
        non_vm_total = gdf_full[cols[0]]  # Use [0] to get string from list
        gdf_small['num_vm_tot'] = pop_total - non_vm_total
    else:
        gdf_small.loc[:, cen_var] = gdf_full.loc[:, cols]

In [17]:
def compute_ed_stats(gdf_ed_gta, gdf_ct_gta, df_ct_cen, cen_year):
    gdf_ct_gta['geosid'] = gdf_ct_gta['geosid'].astype(str)
    df_ct_cen['geosid'] = df_ct_cen['geosid'].astype(str)
    df_ct_cen['geosid'] = df_ct_cen['geosid'].apply(lambda x: x[:-2] + '.00' if x.endswith('.0') else x)

    gdf_full = pd.merge(
        gdf_ct_gta,
        df_ct_cen,
        on='geosid',
        how='left'
    )

    gdf_small = gdf_full[['geosid', 'geometry']].copy()
    add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'num_pop_tot')
    add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'num_imm_tot')
    # add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'num_imm_new')
    add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'num_imm_2nd_tot')
    add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'avg_hou_inc')
    add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'num_not_vm_tot')
    add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'num_vm_sa_tot')
    add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'num_vm_chn_tot')
    # add_census_values_to_gdf(gdf_full, gdf_small, cen_year, 'num_enfr_home_tot')

    # Find intersections between census tracts and electoral districts
    pairs = gpd.sjoin(gdf_small, gdf_ed_gta, how="inner", predicate="intersects")

    # Calculate area weights for overlapping geometries
    pairs['overlap_area'] = pairs.apply(
        lambda row: row['geometry'].intersection(
            gdf_ed_gta.loc[row['index_right'], 'geometry']
        ).area / row['geometry'].area,
        axis=1
    )

    # List of columns to aggregate (excluding geosid, geometry)
    value_columns = [col for col in gdf_small.columns 
                    if col not in ['geosid', 'geometry']]

    # Initialize result dataframe with electoral district geometries
    result = gdf_ed_gta.copy()

    # Calculate weighted sums for each value column
    for col in value_columns:
        if pairs[col].isna().all():  # If column is all NaN, preserve NaN in result
            result[col] = np.nan
        elif col == 'avg_hou_inc':  # Special case for weighted mean using population
            # Compute weighted population
            pairs['weighted_pop'] = pairs['num_pop_tot'] * pairs['overlap_area']
            # Normalize weights within each electoral district
            total_weighted_pop = pairs.groupby(pairs['index_right'])['weighted_pop'].sum()
            pairs['normalized_weight'] = pairs['weighted_pop'] / pairs['index_right'].map(total_weighted_pop)
            # Compute weighted mean for avg_hou_inc
            weighted_values = pairs[col] * pairs['normalized_weight']
            result[col] = weighted_values.groupby(pairs['index_right']).sum()
        else:
            weighted_values = pairs[col] * pairs['overlap_area']
            result[col] = weighted_values.groupby(pairs['index_right']).sum()

    # Compute relevant proportions 
    result['pct_imm'] = (result['num_imm_tot'] / result['num_pop_tot']) * 100
    # result['pct_imm_new'] = (result['num_imm_new'] / result['num_pop_tot']) * 100
    result['pct_imm_2nd'] = (result['num_imm_2nd_tot'] / result['num_pop_tot']) * 100
    result['pct_vm'] = (result['num_vm_tot'] / result['num_pop_tot']) * 100
    result['pct_vm_sa'] = (result['num_vm_sa_tot'] / result['num_pop_tot']) * 100
    result['pct_vm_chn'] = (result['num_vm_chn_tot'] / result['num_pop_tot']) * 100
    # result['pct_enfr_home'] = (result['num_enfr_home_tot'] / result['num_pop_tot']) * 100

    return result

In [18]:
for year in tqdm(ONTELXN_YEARS):
    ed_year = get_ed_year(year, is_ontario=True)
    cen_year = get_census_year(year)

    gdf_onted_gta = gpd.read_file(f'../data/geo/{ed_year}_ont-ed/ont-ed_gta_{ed_year}.gpkg')
    gdf_ct_gta = gpd.read_file(f"../data/geo/{cen_year}_ct/ct_gta_{cen_year}.gpkg")
    df_ct_cen = pd.read_csv(f"../data/census/{cen_year}_ct_wide/census_wide_{cen_year}_ct.csv")

    gdf_onted_stats = compute_ed_stats(gdf_onted_gta, gdf_ct_gta, df_ct_cen, cen_year)
    gdf_onted_stats = gdf_onted_stats[gdf_onted_stats.ct_overlap >= 0.75]

    gdf_onted_stats.to_file(f'../data/elections/{year}_ont-elxn/ont-ed_stats_{year}.gpkg')

for year in tqdm(FELXN_YEARS):
    ed_year = get_ed_year(year, is_ontario=False)
    cen_year = get_census_year(year)

    gdf_fed_gta = gpd.read_file(f'../data/geo/{ed_year}_fed/fed_gta_{ed_year}.gpkg')
    gdf_ct_gta = gpd.read_file(f"../data/geo/{cen_year}_ct/ct_gta_{cen_year}.gpkg")
    df_ct_cen = pd.read_csv(f"../data/census/{cen_year}_ct_wide/census_wide_{cen_year}_ct.csv")

    gdf_fed_stats = compute_ed_stats(gdf_fed_gta, gdf_ct_gta, df_ct_cen, cen_year)
    gdf_fed_stats = gdf_fed_stats[gdf_fed_stats.ct_overlap >= 0.75]

    gdf_fed_stats.to_file(f'../data/elections/{year}_felxn/fed_stats_{year}.gpkg')

100%|██████████| 1/1 [00:05<00:00,  5.07s/it]


In each of the data frames for the elections which contain census approximations for each electoral District, add the vote share for that riding in the given year for the three major parties as well as an other category.

In [19]:
def process_election_results(df, year):
    import warnings
    # Suppress specific deprecation warning for groupby operation
    warnings.filterwarnings('ignore', category=DeprecationWarning,
                          message='DataFrameGroupBy.apply operated on the grouping columns')
    
    # Standardize column names (strip spaces, lowercase)
    df.columns = df.columns.str.strip()
    
    # Detect Ontario elections by checking for 'Plurality' column
    is_ontario = 'Plurality' in df.columns
    
    if is_ontario:
        # Rename columns to match federal format
        df = df.rename(columns={
            'Electoral District': 'Constituency',
            'Party': 'Political_Affiliation',
            'Votes Cast': 'Votes'
        })
        
        # Clean constituency names without regex
        df['Constituency'] = df['Constituency'].str.split(' - ', n=1).str[-1]
    
    # Define party mappings (Ontario mappings include federal mappings as well)
    conservative_parties = ['Progressive Conservative Party', 'Conservative Party of Canada', 'PC', 'PCP']
    reform_parties = ['Reform Party of Canada', 'Canadian Reform Conservative Alliance']
    liberal_party = ['Liberal Party of Canada', 'L', 'OLP', 'LIB']
    ndp_party = ['New Democratic Party', 'ND', 'NDP']
    
    # Compute party vote percentages
    def get_party_percentage(group, party_names):
        party_votes = group[group['Political_Affiliation'].isin(party_names)]
        if len(party_votes) > 1:
            party_votes = party_votes.loc[party_votes['Votes'].idxmax()]
        return party_votes['Votes'].sum() if not party_votes.empty else 0
    
    def compute_percentages(group):
        total = group['Votes'].sum()
        cons1_votes = get_party_percentage(group, conservative_parties)
        cons2_votes = get_party_percentage(group, reform_parties)
        lib_votes = get_party_percentage(group, liberal_party)
        ndp_votes = get_party_percentage(group, ndp_party)
        oth_votes = total - (cons1_votes + cons2_votes + lib_votes + ndp_votes)
        
        return pd.Series({
            'constituency': group['Constituency'].iloc[0],
            'cons1_pct': (cons1_votes / total) * 100,
            'cons2_pct': (cons2_votes / total) * 100 if cons2_votes > 0 else np.nan,
            'lib_pct': (lib_votes / total) * 100,
            'ndp_pct': (ndp_votes / total) * 100,
            'oth_pct': (oth_votes / total) * 100,
            'cons1_votes': cons1_votes,
            'cons2_votes': cons2_votes,
            'lib_votes': lib_votes,
            'ndp_votes': ndp_votes,
            'oth_votes': oth_votes,
        })
    
    # Apply the function to group by constituency
    df_results = (df.groupby('Constituency')
                    .apply(compute_percentages)  
                    .reset_index(drop=True))
    
    return df_results

In [20]:
def create_dummy_constituency_name(df_col, year=None, is_federal=False):
    # Federal overrides for specific years
    federal_overrides = {
        1972: {'High Park': 'highparkhumbervalley', 'Lakeshore': 'torontolakeshore'},
        1974: {'High Park': 'highparkhumbervalley', 'Lakeshore': 'torontolakeshore', 'Peel South': 'mississauga'},
        1988: {'Bramalea-Gore-Malton': 'bramptonmalton', 'Markham-Whitchurch-Stouffville': 'markham'},
        1997: {'Bramalea-Gore-Malton-Springdale': 'bramaleagoremalton', 'Toronto-Danforth': 'broadviewgreenwood'}
    }
    
    # Apply overrides if applicable
    if is_federal and year in federal_overrides:
        df_col = df_col.replace(federal_overrides[year])

    # General processing
    return (df_col
            .str.replace(r'\([^)]*\)', '', regex=True)  # Remove content within brackets
            .str.split('/').str[0]                     # Keep only content before slash
            .str.strip()                                # Remove leading/trailing whitespace
            .str.lower()                                # Convert to lowercase
            .str.replace(r'[^a-z]', '', regex=True))   # Remove non-letter characters


In [21]:
def standardize_names(row):
    row['geoname'] = row['geoname'].replace('--', '—')
    row['geoname'] = row['geoname'].replace('-', '—')
    row['geoname'] = row['geoname'].replace('9', '—')
    row['geoname'] = row['geoname'].split("/")[0]
    return row

In [22]:
for year in tqdm(ONTELXN_YEARS):
    df_elections = pd.read_csv(f'../data/elections/{year}_ont-elxn/{year}_results.csv')
    df_elections = process_election_results(df_elections, year)

    gdf_onted_stats = gpd.read_file(f'../data/elections/{year}_ont-elxn/ont-ed_stats_{year}.gpkg')
    
    # Create standardized name columns for joining
    df_elections['dummy_name'] = create_dummy_constituency_name(df_elections['constituency'], year=year, is_federal=False)
    gdf_onted_stats['dummy_name'] = create_dummy_constituency_name(gdf_onted_stats['geoname'], year=year, is_federal=False)
    
    # Perform left merge (keep only rows from gdf_onted_stats)
    merged_df = pd.merge(
        gdf_onted_stats, 
        df_elections.drop(columns=['constituency']), 
        on='dummy_name', 
        how='left',
        indicator=True
    )
    
    # Check for unmatched rows from gdf_onted_stats
    # unmatched = merged_df[merged_df['_merge'] == 'left_only']
    # if len(unmatched) > 0:
    #     print(year)
    #     print(f"\nWarning: {len(unmatched)} electoral districts have no matching election results:")
    #     print(unmatched[['dummy_name', 'geoname']])
    
    # Clean up temporary columns and names
    merged_df = merged_df[merged_df['_merge'] == 'both'].drop(columns=['_merge', 'dummy_name'])
    gdf_onted_stats = merged_df
    gdf_onted_stats = gdf_onted_stats.apply(standardize_names, axis=1)

    gdf_onted_stats.to_file(f'../data/elections/{year}_ont-elxn/ont-ed_stats_{year}.gpkg')

for year in tqdm(FELXN_YEARS):
    df_elections = pd.read_csv(f'../data/elections/{year}_felxn/{year}_results.csv')
    df_elections = process_election_results(df_elections, year)

    gdf_fed_stats = gpd.read_file(f'../data/elections/{year}_felxn/fed_stats_{year}.gpkg')

    # Create standardized name columns for joining
    df_elections['dummy_name'] = create_dummy_constituency_name(df_elections['constituency'], year=year, is_federal=True)
    gdf_fed_stats['dummy_name'] = create_dummy_constituency_name(gdf_fed_stats['geoname'], year=year, is_federal=True)
    
    # Perform left merge (keep only rows from gdf_fed_stats)
    merged_df = pd.merge(
        gdf_fed_stats, 
        df_elections.drop(columns=['constituency']), 
        on='dummy_name', 
        how='left',
        indicator=True
    )
    
    # Check for unmatched rows from gdf_fed_stats
    # unmatched = merged_df[merged_df['_merge'] == 'left_only']
    # if len(unmatched) > 0:
    #     print(year)
    #     print(f"\nWarning: {len(unmatched)} electoral districts have no matching election results:")
    #     print(unmatched[['dummy_name', 'geoname']])
    
    # Clean up temporary columns and names
    merged_df = merged_df[merged_df['_merge'] == 'both'].drop(columns=['_merge', 'dummy_name'])
    gdf_fed_stats = merged_df
    gdf_fed_stats = gdf_fed_stats.apply(standardize_names, axis=1)

    gdf_fed_stats.to_file(f'../data/elections/{year}_felxn/fed_stats_{year}.gpkg')

  df_elections = pd.read_csv(f'../data/elections/{year}_felxn/{year}_results.csv')
  'cons1_pct': (cons1_votes / total) * 100,
  'lib_pct': (lib_votes / total) * 100,
  'ndp_pct': (ndp_votes / total) * 100,
  'oth_pct': (oth_votes / total) * 100,
100%|██████████| 1/1 [00:01<00:00,  1.23s/it]


In [17]:
gdf_fed_stats = gpd.read_file('../data/elections/2025_felxn/fed_stats_2025.gpkg')
df_pct_change = pd.read_csv('../data/elections/2025_felxn/fed_results_pct_change.csv')
df_pct_change = df_pct_change[df_pct_change['party_name'] == 'Conservative Party of Canada'][['FED_NUM', 'pct_vote_change']]
df_pct_change = df_pct_change.rename(columns={'pct_vote_change': 'cons_pct_change'})

gdf_fed_stats = gdf_fed_stats.merge(df_pct_change, on='FED_NUM')
gdf_fed_stats.to_file(f'../data/elections/2025_felxn/fed_stats_2025.gpkg')

In [18]:
for year in tqdm(ONTELXN_YEARS):
    gdf_onted_stats = gpd.read_file(f'../data/elections/{year}_ont-elxn/ont-ed_stats_{year}.gpkg')
    gdf_onted_stats.to_file(f'../static/data/elections/ont-ed_stats_{year}.geojson')

for year in tqdm(FELXN_YEARS):
    gdf_fed_stats = gpd.read_file(f'../data/elections/{year}_felxn/fed_stats_{year}.gpkg')
    gdf_fed_stats.to_file(f'../static/data/elections/fed_stats_{year}.geojson')

  0%|          | 0/1 [00:00<?, ?it/s]

100%|██████████| 1/1 [00:00<00:00,  2.03it/s]
