In [13]:
import os
import pandas as pd
from elections.Ballot import Ballot
from elections.Candidate import Candidate
from elections.CandidateScore import CandidateScore
from elections.Party import Party  
from elections.HeadToHeadElection import HeadToHeadElection
from elections.DiversityRunoff import DiversityRunoffElection
from elections.InstantRunoffElection import InstantRunoffElection
from CVRLoader import CVRLoader
from Contest import Contest


In [14]:

def load_xlsx(directory: str) -> list[Ballot]:
    # List all XLSX files in the directory
    xlsx_files = [f for f in os.listdir(directory) if f.endswith('.xlsx') and not f.startswith('~$')]   
    
    # Declare a dictionary that maps a candidate name to a Candidate
    candidates = {}
    ballots = []
    for xlsx_file in xlsx_files:
        print(f"Processing file: '{xlsx_file}'")
        df = pd.read_excel(os.path.join(directory, xlsx_file)) 
        candidate_columns = list(range(3, len(df.columns)))
        for index, row in df.iterrows():
            scores = []
            seen = set()
            for col in candidate_columns:
                candidate_name = row.iloc[col]
                # trim anything after a ( in the candidate name.
                candidate_name = candidate_name.split('(')[0]
                # trim any leading or trailing whitespace
                candidate_name = candidate_name.strip()
                # skip if the candidate_name is "undervote" or "overvote"
                if candidate_name not in seen and candidate_name != "undervote" and candidate_name != "overvote":
                    if candidate_name not in candidates: 
                        print(f"Creating new candidate: '{candidate_name}'")
                        candidates[candidate_name] = Candidate(candidate_name, Party("None", "none"))
                    candidate = candidates[candidate_name]
                    score = 7 - col
                    scores.append(CandidateScore(candidate, score))
                    seen.add(candidate_name)
            if len(scores) > 0:
                ballots.append(Ballot(scores))

    return ballots


In [15]:
from dataclasses import dataclass
@dataclass
class Results:
    irv_matches_condorcet: int
    irv_matches_diversity: int
    diversity_matches_condorcet: int
    elections: int  
    def __init__(self):
        self.irv_matches_condorcet = 0
        self.irv_matches_diversity = 0
        self.diversity_matches_condorcet = 0
        self.elections = 0  
    
    def add(self, irv_result: str, condorcet_result: str, diversity_result: str):
        if irv_result == condorcet_result:
            self.irv_matches_condorcet += 1
        if irv_result == diversity_result:
            self.irv_matches_diversity += 1
        if diversity_result == condorcet_result:
            self.diversity_matches_condorcet += 1
        self.elections += 1

    def print(self):
        print("Results summary:")
        print(f"elections {self.elections}")
        if self.elections == 0:
            print("No elections to analyze")
            return

        print(f"IRV matches Condorcet: {self.irv_matches_condorcet} {self.irv_matches_condorcet / self.elections * 100}%")
        print(f"IRV matches Diversity: {self.irv_matches_diversity} {self.irv_matches_diversity / self.elections * 100}%")
        print(f"Diversity matches Condorcet: {self.diversity_matches_condorcet} {self.diversity_matches_condorcet / self.elections * 100}%")

In [16]:

def analyze_election(results: Results, ballots: list[Ballot], diversity_threshold=.035, diversity_depth=2, debug=False):
    # create a set of the unique candidates in all of the ballots
    candidates = set()
    for ballot in ballots:
        for score in ballot.ordered_candidates:
            candidates.add(score.candidate) 

    if len(candidates) < 3:
        print("There are not enough candidates for analysis.")
        return  
    diversity_runoff = DiversityRunoffElection(ballots,
                                               set(candidates),
                                               diversity_threshold=diversity_threshold,
                                               diversity_depth=diversity_depth,
                                               debug=debug)
    condorcet = HeadToHeadElection(ballots, set(candidates))
    IRV = InstantRunoffElection(ballots, set(candidates))
    print(f"Condorcet Winner: {condorcet.result().winner().name}")
    print(f"IRV Winner: {IRV.result().winner().name}")
    print(f"Diversity Runoff Winner: {diversity_runoff.result().winner().name}")

    results.add(IRV.result().winner().name, condorcet.result().winner().name, diversity_runoff.result().winner().name)


In [17]:
cvr_cache = {}
def load_cached_cvr(directory: str) -> CVRLoader:
    if directory not in cvr_cache:
        print(f"Loading CVR from {directory}")
        cvr_cache[directory] = CVRLoader(directory)
    return cvr_cache[directory]

csv_cache = {}
def load_cached_csv(directory: str) -> Contest:
    if directory not in csv_cache:
        print(f"Loading xlsx from {directory}")
        csv_cache[directory] = load_xlsx(directory)
    return csv_cache[directory]     

In [18]:
diversity_threshold = .035
diversity_depth = 2
debug = True

def analyze_Maine(results: Results, debug=False):
    # for path in [ "testData/"]:
    for path in [ "MaineData/2018/House-2", "MaineData/2022/House-2", "MaineData/2024/House-2" ]:
        ballots = load_cached_csv(path)
        print(f"analyzing {path} Loaded {len(ballots)} ballots")
        analyze_election(results, ballots, diversity_threshold, diversity_depth, debug)

def analyze_Alaska(results: Results):
    base_dir = "AlaskaData"
    for dir in [f for f in os.listdir(base_dir) if f.startswith("CVR")]:
        cvr = load_cached_cvr(os.path.join(base_dir, dir))
        print(f"{dir}: Loaded {len(cvr.elections)} elections")
        for contest in [c for c in cvr.elections.values() if len(c.ballots) > 0 and c.number_of_ranks > 2]:
            print(f"analyzing contest {contest.name}")
            analyze_election(results, contest.ballots, diversity_threshold, diversity_depth, debug)

In [19]:

# results = Results()
# analyze_Maine(results)
# analyze_Alaska(results)
# results.print()


In [28]:
from elections.ElectionResult import ElectionResult
from elections.InstantRunoffElection import InstantRunoffElection, InstantRunoffResult
from elections.HeadToHeadElection import HeadToHeadElection, HeadToHeadResult
from elections.Ballot import Ballot
from elections.Candidate import Candidate

class SinglePeakedElection:
    def __init__(self, election_name: str, ballots: list[Ballot], n_top_candidates: int, borda_pct: float):
        self.election_name = election_name
        self.ballots = ballots
        # the top_n candidates by borda count.  
        self.top_candidates = self.find_top_candidates(n_top_candidates, borda_pct)
        self.ballot_orders = self.compute_ballot_orders()
        self.all_candidates = self.find_all_candidates()
        self.condorcet_result = self.compute_condorcet_result()
        self.irv_result = self.compute_irv_result()
        self.exhausted_ballots_pct = self.compute_irv_exhausted_ballots()
        self.extra_irv_rankings = self.compute_extra_irv_rankings()
        self.all_candidate_names = self.find_all_candidate_names()
        self.min_last_place_votes = 0
        self.single_peaked_min_candidate = "none"
        self.bullet_votes = 0
        self.single_peaked_votes = 1e-10
        self.single_peaked_valid = False
        self.weak_single_peaked_pct = 0
        self.compute_single_peaked_stats()

    def find_all_candidates(self) -> set[Candidate]:
        candidates = set()
        for ballot in self.ballots:
            for score in ballot.ordered_candidates:
                candidates.add(score.candidate)
        return candidates

    def find_all_candidate_names(self) -> set[str]:
        candidates = self.find_all_candidates()
        return {candidate.name for candidate in candidates}

    def find_top_candidates(self, top_n: int, pct: float) -> set:
        candidates = self.find_all_candidate_names()
        borda_counts = {c: 0 for c in candidates}

        for ballot in self.ballots:
            for score in ballot.ordered_candidates:
                borda_counts[score.candidate.name] += score.score

        top_candidates = sorted(borda_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]

        while len(top_candidates) > 1:
            last_place_pct = top_candidates[-1][1] / top_candidates[0][1]
            if last_place_pct < pct:
                top_candidates = top_candidates[:-1]
            else:
                break

        return set(list(zip(*top_candidates))[0])

    def compute_ballot_orders(self) -> dict:
        ballot_orders = {}
        for ballot in self.ballots:
            names = [score.candidate.name for score in ballot.ordered_candidates if score.candidate.name in self.top_candidates]
            if len(names) == len(self.top_candidates):
                names = names[:-1]
            if len(names) == 0:
                continue
            order = ";".join(names)
            ballot_orders[order] = ballot_orders.get(order, 0) + 1
        return ballot_orders

    def compute_condorcet_result(self) -> HeadToHeadResult:
        condorcet = HeadToHeadElection(self.ballots, self.all_candidates)
        return condorcet.result()

    def compute_irv_result(self) -> InstantRunoffResult:
        irv = InstantRunoffElection(self.ballots, self.all_candidates)
        return irv.result()

    def compute_single_peaked_stats(self) -> str:
        candidates = set()
        for order in self.ballot_orders.keys():
            for c in order.split(";"):
                candidates.add(c)

        last_place_votes = {}
        implicit_last_place_votes = {}
        total_votes = sum(self.ballot_orders.values())
        bullet_votes = 0
        for candidate_order, votes in self.ballot_orders.items():
            candidate_names = [c for c in candidate_order.split(";")]
            if len(candidate_names) == 2:
                last_place_candidate = list(candidates.difference(candidate_names))[0]
                last_place_votes[last_place_candidate] = last_place_votes.get(last_place_candidate, 0) + votes
                implicit_last_place_votes[last_place_candidate] = implicit_last_place_votes.get(last_place_candidate, 0) + votes
            else:
                for candidate in candidates.difference(candidate_names):
                    implicit_last_place_votes[candidate] = implicit_last_place_votes.get(candidate, 0) + votes  
                bullet_votes += votes

        min_votes = total_votes
        for candidate, votes in last_place_votes.items():
            if votes < min_votes:
                min_votes = votes
                self.single_peaked_min_candidate = candidate

        min_implicit_last_place_votes = total_votes
        for candidate, votes in implicit_last_place_votes.items():
            if votes < min_implicit_last_place_votes:
                min_implicit_last_place_votes = votes

        self.min_last_place_votes = min_votes
        self.min_implicit_last_place_votes = min_implicit_last_place_votes
        self.bullet_votes = bullet_votes
        self.single_peaked_votes = total_votes
        self.single_peaked_valid = True
        self.weak_single_peaked_pct = bullet_votes / total_votes


    def compute_irv_exhausted_ballots(self) -> float:
        first_round_votes = self.irv_result.rounds[0].total_votes
        last_round_votes = self.irv_result.rounds[-1].total_votes
        return (first_round_votes - last_round_votes) / first_round_votes

    def compute_extra_irv_rankings(self) -> int:
        final_candidates = self.irv_result.rounds[-1].vote_totals.keys()
        extra_rankings = 0
        for ballot in self.ballots:
            # find the first candidate in the final_candidates list and its index
            for i, score in enumerate(ballot.ordered_candidates):
                if score.candidate in final_candidates:
                    if (i < len(final_candidates) - 1):
                        extra_rankings += 1
        return extra_rankings

    def print_ballot_orders(self):
        total_votes = sum(self.ballot_orders.values())
        for order, votes in sorted(self.ballot_orders.items(), key=lambda x: x[0], reverse=False):
            percentage = (votes / total_votes) * 100
            print(f"{order:70} {votes:7d} {percentage:5.2f}%")

    def print_long_summary(self):
        self.print_summary()
        self.print_ballot_orders()
        self.condorcet_result.print_matrix()
        print("")

    def print_summary(self):
        print(f"{self.election_name:30s}", end=" ")
        print(f"Condorcet Winner: {self.condorcet_result.winner().name:30s}", end=" ")
        if (self.irv_result.winner().name != self.condorcet_result.winner().name):
            print(f"IRV Winner: {self.irv_result.winner().name:30s}", end=" ")
        print(f"nsp_all_ballots : {self.min_last_place_votes / (self.single_peaked_votes) * 100:5.2f}%", end=" ")
        print(f"nsp_full_ballots : {self.min_last_place_votes / (self.single_peaked_votes - self.bullet_votes) * 100:5.2f}%", end=" ")
        print(f"incomplete_ballots: {self.bullet_votes / self.single_peaked_votes * 100:5.2f}%", end=" ")
        print(f"IRV exhausted: {self.exhausted_ballots_pct * 100:5.2f}%")

def analyze_single_peaked_elections():
    borda_pct = .01
    n_top_candidates = 3
    results_by_path = {}
    for path in ["mainedata/2018/house-2", "mainedata/2022/house-2", "mainedata/2024/house-2"]:
        ballots = load_cached_csv(path)
        single_peaked_election = SinglePeakedElection(path, ballots, n_top_candidates, borda_pct)
        results_by_path[path] = [single_peaked_election]

    base_dir = "AlaskaData"
    for dir in [f for f in os.listdir(base_dir) if f.startswith("CVR")]:
        cvr = load_cached_cvr(os.path.join(base_dir, dir))
        print(f"{dir}: Loaded {len(cvr.elections)} elections")
        results_by_path[dir] = []

        for id, contest in cvr.elections.items():
            if contest.number_of_ranks < 3:
                continue
            single_peaked_election = SinglePeakedElection(contest.name, contest.ballots, n_top_candidates, borda_pct)
            single_peaked_election.print_summary()
            results_by_path[dir].append(single_peaked_election)
    

    for path, results in results_by_path.items():
        wsp = sum(r.weak_single_peaked_pct for r in results) / len(results)
        irv_exhausted = sum(r.exhausted_ballots_pct for r in results_by_path[path]) / len(results)
        print(f"Path: {path} Weak Single Peaked: {wsp * 100:5.2f}% IRV Exhausted: {irv_exhausted * 100:5.2f}%")


    # sort the keys alphabetically
    for path in results_by_path.keys():
        print(f"Path: {path}")
        for result in results_by_path[path]:
            result.print_summary()

     # sort the keys alphabetically
    for path in results_by_path.keys():
        print(f"Path: {path}")
        for result in results_by_path[path]:
            result.print_long_summary()
   

analyze_single_peaked_elections()

CVR_Export_20241130154411: Loaded 73 elections
U.S. President / Vice President Condorcet Winner: Trump/Vance                    nsp_all_ballots :  2.95% nsp_full_ballots : 12.31% incomplete_ballots: 76.04% IRV exhausted:  1.36%
U.S. Representative            Condorcet Winner: Begich, Nick                   nsp_all_ballots :  6.16% nsp_full_ballots : 22.58% incomplete_ballots: 72.70% IRV exhausted:  2.47%
House District 5               Condorcet Winner: Stutes, Louise B.              nsp_all_ballots :  0.34% nsp_full_ballots :  1.79% incomplete_ballots: 80.99% IRV exhausted:  0.45%
House District 21              Condorcet Winner: Mears, Donna C.                nsp_all_ballots :  0.89% nsp_full_ballots :  4.65% incomplete_ballots: 80.87% IRV exhausted:  0.17%
House District 13              Condorcet Winner: Josephson, Andrew L. "Andy"    nsp_all_ballots :  0.77% nsp_full_ballots :  3.58% incomplete_ballots: 78.37% IRV exhausted:  0.10%
House District 14              Condorcet Winner: Gal