# Running the gerrychain to find out the stats for the approved congressional districting plan

@authors: vcle, bpuhani

In [1]:
import json
from functools import partial

import geopandas as gpd
from gerrychain import Graph, Partition, constraints, MarkovChain, Election
from gerrychain.accept import always_accept
from gerrychain.metrics import efficiency_gap  # get the efficiency gap directly from gerrychain
from gerrychain.proposals import recom
from gerrychain.tree import bipartition_tree
from gerrychain.updaters import cut_edges, Tally
from shapely.ops import unary_union

import utilities as util

## Loading the needed data.
For this notebook to work we assume, that you ran the following notebooks first:
* `0_IL_import_and_explore_data.ipynb`
* `B_2_IL_clean_maup_with_congress.ipynb`

In [2]:
il_df: gpd.GeoDataFrame = util.load_shapefile("il_data/IL_congress.shp")
il_graph: Graph = util.load_graph("il_data/IL_congress.shp")

Loading shapefile from il_data/IL_congress.shp...
Shapefile data loaded from cache.
Loading shapefile graph from il_data/IL_congress.shp...
Shapefile data loaded from cache.


Setup Updaters

In [3]:
def has_holes(partition, district) -> bool:
    # Merge all geometries in the district into a single polygon/multipolygon
    raw_geometry = unary_union([partition.graph.nodes[v]["geometry"]
                                for v in partition.parts[district]])

    # Try to repair invalid geometry
    geom_fixed = raw_geometry.buffer(0)

    # A simple hole check: does the geometry have interior rings?
    # (for Polygon: check .interiors; for MultiPolygon: check if any part has interiors)
    if geom_fixed.geom_type == "Polygon":
        return len(geom_fixed.interiors) > 0
    elif geom_fixed.geom_type == "MultiPolygon":
        return any(len(p.interiors) > 0 for p in geom_fixed.geoms)
    else:
        print(f"Not a polygon geometry: {geom_fixed.geom_type}")
        return False  # Not a polygon geometry? Then we ignore it.

In [4]:
il_updaters = {
    "total_population": Tally("TOTPOP", alias="total_population"),
    # "hisp_population": Tally("HISP", alias="hisp_population"), # not needed apparently
    "cut_edges": cut_edges,
    # calculate if a district has holes
    "district_has_holes": lambda p: [int(has_holes(p, d)) for d in p.parts],
}

In [5]:
elections = [
    Election("PRE20", {"Dem": "G20PRED", "Rep": "G20PRER"}),
    Election("USS20", {"Dem": "G20USSD", "Rep": "G20USSR"}),
]

In [6]:
# adding the elections to the updaters
election_updaters = {election.name: election for election in elections}
il_updaters.update(election_updaters)

In [7]:
# Set up the initial partition object
initial_partition = Partition(
    il_graph,
    assignment="CONGD",
    updaters=il_updaters,
)

In [8]:
# Define the ideal population
ideal_population = sum(initial_partition["total_population"].values()) / len(initial_partition)
print("Nr of districts:", len(initial_partition))
print("Ideal population:", ideal_population)

Nr of districts: 17
Ideal population: 753676.9411764706


In [9]:
# Define the recom proposal
proposal = partial(
    recom,
    pop_col="TOTPOP",
    pop_target=ideal_population,
    epsilon=0.02,
    method=partial(
        bipartition_tree,
        max_attempts=100,
        allow_pair_reselection=True
    )
)

In [10]:
# define the lists that are needed to track the one Number
list_of_nr_of_cut_edges = []

list_of_dem_won_districts_pre20 = []
list_of_dem_won_districts_uss20 = []

list_of_eg_pre20 = []
list_of_eg_uss20 = []

list_of_dem_percents_pre20 = []
list_of_dem_percents_uss20 = []

In [11]:
# create a checkpoint for all the lists in one big dictionary
checkpoint_dict = {
    "list_of_nr_of_cut_edges": list_of_nr_of_cut_edges,
    "list_of_dem_won_districts_pre20": list_of_dem_won_districts_pre20,
    "list_of_dem_won_districts_uss20": list_of_dem_won_districts_uss20,
    "list_of_eg_pre20": list_of_eg_pre20,
    "list_of_eg_uss20": list_of_eg_uss20,
    "list_of_dem_percents_pre20": list_of_dem_percents_pre20,
    "list_of_dem_percents_uss20": list_of_dem_percents_uss20
}

In [12]:
def run_the_chain(nr_of_total_steps: int, start_partition: Partition, offset: int = 0) -> Partition:
    """Runs the chain for the specified number of steps. Returns the last partition"""

    # Set up the chain
    chain = MarkovChain(
        proposal=proposal,
        constraints=[
            # Compactness constraint
            constraints.UpperBound(lambda p: len(p["cut_edges"]), 2 * len(initial_partition["cut_edges"])),
            # Population constraint
            constraints.within_percent_of_ideal_population(initial_partition, 0.02, "total_population"),
            # remove the constraint because it will have holes
            # set constraint for the map not to allow holes (lower and upper bound is 1 == (True) == no Holes)
            # constraints.Bounds(lambda p: p["district_has_holes"], (0, 0))
        ],
        accept=always_accept,
        initial_state=start_partition,
        total_steps=nr_of_total_steps - offset
    )
    last_partition: Partition = start_partition

    for (i, partition) in enumerate(chain.with_progress_bar()):
        last_partition = partition

        # Calculate and append the efficiency gap values for each election to checkpoint_dict
        checkpoint_dict["list_of_eg_pre20"].append(efficiency_gap(partition["PRE20"]))
        checkpoint_dict["list_of_eg_uss20"].append(efficiency_gap(partition["USS20"]))

        # append the sorted percentages of Democratic votes for each election to checkpoint_dict
        checkpoint_dict["list_of_dem_percents_pre20"].append(sorted(partition["PRE20"].percents("Dem")))
        checkpoint_dict["list_of_dem_percents_uss20"].append(sorted(partition["USS20"].percents("Dem")))

        # append the number of districts won by the Democratic Party for each election to checkpoint_dict
        checkpoint_dict["list_of_dem_won_districts_pre20"].append(partition["PRE20"].wins("Dem"))
        checkpoint_dict["list_of_dem_won_districts_uss20"].append(partition["USS20"].wins("Dem"))

        # append the number of cut edges for this partition to checkpoint_dict
        checkpoint_dict["list_of_nr_of_cut_edges"].append(len(partition["cut_edges"]))

    return last_partition

## RUN CHAIN ONLY ONCE

In [13]:
run_the_chain(1, initial_partition)

  0%|          | 0/1 [00:00<?, ?it/s]

<Partition [17 parts]>

### Save the result

In [14]:
# load the checkpoint if it exists
checkpoint_dict = util.checkpoint("IL_approved_congress_stats", checkpoint_dict)

Checkpoint: IL_approved_congress_stats
Saving data...
Data saved successfully to checkpoints/IL_approved_congress_stats.pkl.


In [24]:
print(json.dumps(checkpoint_dict, indent=2))

{
  "list_of_nr_of_cut_edges": [
    2729
  ],
  "list_of_dem_won_districts_pre20": [
    14
  ],
  "list_of_dem_won_districts_uss20": [
    14
  ],
  "list_of_eg_pre20": [
    0.13131178822215156
  ],
  "list_of_eg_uss20": [
    0.12667910980924485
  ],
  "list_of_dem_percents_pre20": [
    [
      0.2829432463800301,
      0.30215946585723386,
      0.39093753420891175,
      0.5391122864167559,
      0.5560699417152373,
      0.5568248180380874,
      0.5578379199798942,
      0.5786117038717272,
      0.5788307899118587,
      0.6317107555474051,
      0.7034732037467641,
      0.7055919546195603,
      0.70941553234033,
      0.710698346202289,
      0.7140061275445119,
      0.7369181346715099,
      0.8706399665343582
    ]
  ],
  "list_of_dem_percents_uss20": [
    [
      0.3181500844265489,
      0.3321918351588489,
      0.39583023587624777,
      0.5464606907784858,
      0.5607279129399929,
      0.5623922246994512,
      0.5626044754496623,
      0.5656581273373278,
     