In [1]:
import pandas as pd
import numpy as np

import copy
import itertools
import os
import re

# Erroneous input (two policies ranked the same) is resolved randomly.
# Set the seed to prevent flaky voting.
np.random.seed(42)

In [2]:
working_dir = '/home/rgiordan/Documents/git_repos/Presentations/CCC_covid_vote'

In [3]:
responses = pd.read_csv(os.path.join(working_dir, 
    'Children\'s_Community_Center_Out2022-11-07_13_38_27.csv'))

In [38]:
# Compile which polcies are indicated as unacceptable.
print(responses.columns[7])
unacceptables = responses[responses.columns[7]].fillna('').to_numpy()

num_unacceptable = []
for policy in [ 'A', 'B', 'C', 'D' ]:
    num_unacceptable.append(
        np.sum([ re.match(f'Policy {policy}', v) is not None for v in unacceptables ])
    )
print(num_unacceptable)

Please indicate if any of the above policies are unacceptable to your family.  Though this will not be formally part of the non-binding ranked choice vote, the board will take these responses into consideration.  The board is particularly interested if your family has distinctive needs with regard to COVID or masking.
[4, 3, 0, 1]


In [4]:
# Select the columns that compile the votes

a_col = responses.columns[3]
b_col = responses.columns[4]
c_col = responses.columns[5]
d_col = responses.columns[6]
# print('\n'.join([a_col, b_col, c_col, d_col]))

# Assert that I have selected the columns correctly
assert(re.search("Policy A", a_col))
assert(re.search("Policy B", b_col))
assert(re.search("Policy C", c_col))
assert(re.search("Policy D", d_col))

In [12]:
# Load the CSV and convert the votes to numeric ranks.

def ConvertToNumericVote(pd_col):
    # Empty rows (not voted on) are storted as nan, not strings.   Replace them 
    # with zeros so we can handle them gracefully in numpy.
    vote_str = pd_col.fillna('0').to_numpy()
    
    # Keep only the numbers from the response text.
    vote_str = np.array([ re.sub(r'[^0-9]', '', v) for v in vote_str ])
    
    return vote_str.astype('int')

original_votes = np.vstack([
    ConvertToNumericVote(responses[col]) for col in [ a_col, b_col, c_col, d_col ]
])
print(original_votes)
print(f'There were {original_votes.shape[1]} voters.')

# Assert that the A vote is the first row as expected as a sanity check.
assert(np.all(ConvertToNumericVote(responses[a_col]) == original_votes[0, :]))

[[3 3 3 4 3 3 4 4 4 2 4 4 4 0 4 3 4 4 4 4 2 4 4 3 2 4 3 2]
 [2 2 4 3 4 1 3 1 1 3 3 3 3 0 2 4 3 3 2 1 1 3 2 4 1 3 4 3]
 [1 1 2 2 2 2 2 2 2 1 2 2 2 2 1 2 2 2 3 2 3 2 3 3 3 2 2 1]
 [4 4 1 1 1 4 1 3 3 4 1 1 1 1 3 1 1 1 1 3 4 1 1 1 4 1 1 4]]
There were 28 voters.


In [6]:
def AssertEquivalentVotes(old_vote, new_vote, exclude=np.array([])):
    # Assert that the strict pairwise ranks of the new_vote matches the old_vote,
    # excluding the indices in exclude.  This makes sure that I drop votes
    # or modify ill-formed votes correctly.
    
    # Though computatoinally wasteful, I'll use this in place of unit tests when
    # modifying the votes for ranked choice and resolving errors.
    
    assert(len(old_vote) == len(new_vote))

    num_policies = len(old_vote)
    if (len(exclude) > 0):
        assert(np.max(exclude) < num_policies)
        assert(np.min(exclude) >= 0)
    
    policies = np.setdiff1d(np.arange(num_policies), exclude)
    
    for i, j in itertools.product(policies, policies):
        if old_vote[i] < old_vote[j]:
            assert(new_vote[i] < new_vote[j])


In [7]:
#vote = votes[:, 0]
vote = np.array([0, 2, 2, 4])

def RepairVote(old_vote, verbose=False):
    # Check for valid input and correct voting errors, following the office hours document.
    # Returns a new set of rankings which is in order and has no gaps or duplicates.
    # For a description of what's going on, set verbose=True.
    
    def VerbosePrint(s):
        if verbose:
            print(s)

    vote = copy.copy(old_vote)
    VerbosePrint(f'Original vote: {old_vote}')
    num_votes = sum(vote > 0)
    rank = 1
    while rank <= num_votes:
        VerbosePrint(f'Rank {rank}.  Current vote: {vote}')
        if not np.any(vote == rank):
            # There is no policy with this rank; decrement the other votes and try again.
            VerbosePrint(f'Rank {rank} missing, decrementing other votes')
            dec_inds = np.logical_and(vote > rank, vote > 0) 
            vote[dec_inds] = vote[dec_inds] - 1
        else:
            rank_inds = np.argwhere(vote == rank).flatten()
            if len(rank_inds) > 1:
                # This rank was duplicated.  Randomly split up this rank among the
                # policies.
                VerbosePrint(f'Rank {rank} duplicated, randomly splitting indices {rank_inds}')

                num_dups = len(rank_inds)
                new_ranks = np.random.choice(num_dups, num_dups, replace=False) + rank

                # Increment other votes to make room for num_dups - 1 extra votes
                dec_inds = np.logical_and(vote > rank, vote > 0) 
                vote[dec_inds] = vote[dec_inds] + num_dups - 1
                vote[rank_inds] = new_ranks
                rank += num_dups - 1
            else:
                # This rank is okay, go to the next one.
                rank += 1

    VerbosePrint(f'Final vote: {vote}')
    AssertEquivalentVotes(old_vote, vote)
    return vote

votes = copy.copy(original_votes)
for voter in range(votes.shape[1]):
    votes[:, voter] = RepairVote(votes[:, voter])

In [8]:
# Report on any votes that were modified.
for voter in range(votes.shape[1]):
    if np.any(votes[:, voter] != original_votes[:, voter]):
        print(f'\nThe vote for voter {voter} was repaired:')
        print(original_votes[:, voter], '->', votes[:, voter])
        print('The process was as follows:')
        RepairVote(original_votes[:, voter], verbose=True)


The vote for voter 23 was repaired:
[3 4 3 1] -> [3 4 2 1]
The process was as follows:
Original vote: [3 4 3 1]
Rank 1.  Current vote: [3 4 3 1]
Rank 2.  Current vote: [3 4 3 1]
Rank 2 missing, decrementing other votes
Rank 2.  Current vote: [2 3 2 1]
Rank 2 duplicated, randomly splitting indices [0 2]
Rank 3.  Current vote: [2 4 3 1]
Rank 4.  Current vote: [2 4 3 1]
Final vote: [2 4 3 1]


In [13]:
def FindWinnerAndLoser(votes, verbose=False):
    # Identify a winner (if there is one) and the losers among the votes array.
    # Return (winner, loser), where both may be arrays in the case of a tie.

    def VerbosePrint(s):
        if verbose:
            print(s)

    num_voters = votes.shape[1]
    num_votes = np.sum(votes == 1)
    assert(num_votes <= num_voters)
    if num_votes != num_voters:
        VerbosePrint(f'This many voters did not vote this round: {num_voters - num_votes}')

    # If we drop a proposal that has no votes, it cannot possibly change the outcome.
    # Drop the proposal that has the fewest first choice votes and has at least some
    # votes.  Note that this is necessary because we are representing removing a vote
    # as non-voting.
    has_votes = np.sum(votes > 1, axis=1) > 0
    vote_count = np.sum(votes == 1, axis=1)
    VerbosePrint(f'Vote count: {vote_count}')
    min_count = np.min(vote_count[has_votes])
    loser = np.argwhere(
        np.logical_and(vote_count == min_count, has_votes)).flatten()

    if len(loser) > 1:
        VerbosePrint(f'There was a tie among losers: {loser}')
    vote_prop = vote_count / num_votes
    VerbosePrint(f'The proportion of first votes was {vote_prop}')
    majority = vote_prop >= 0.5
    if np.any(majority):
        winner = np.argwhere(majority).flatten()
        if (len(winner) > 1):
            VerbosePrint(f'There was a tie among winners: {winner}.')
        else:
            VerbosePrint(f'The winner was {winner}')
        return (winner, loser)
    else:
        VerbosePrint('There was no majority')
        return ([], loser)
    

def RemovePolicy(votes, drop_index):
    # Drop a policy from consideration, and decrement other so that they remain
    # valid votes.  The dropped vote will be ranked zero, as if the voter did
    # not vote on it in the survey.

    new_votes = copy.copy(votes)
    increment_cols = np.argwhere(new_votes[drop_index, :] == 1).flatten()
    new_votes[:, increment_cols] = votes[:, increment_cols] - 1
    new_votes[new_votes < 0] = 0
    new_votes[drop_index, :] = 0
    
    # Sanity check that we have not changed any preferences.
    for voter in range(votes.shape[1]):
        AssertEquivalentVotes(votes[:, voter], new_votes[:, voter], exclude=[drop_index])

    return new_votes


In [14]:
def PerformRankedChoiceVoting(votes, verbose=False):
    # Perform ranked-choice voting on an array of votes.

    done = False
    votes_copy = copy.copy(votes)
    while not done:
        print('Checking for a majority.')
        winner, loser = FindWinnerAndLoser(votes_copy, verbose=verbose)

        if len(winner) > 0:
            print(f'The winner is {winner}')
            done = True
        else:
            if len(loser) > 1:
                print(f'Randomly dropping a loser from {loser}')
                loser_ind = np.random.choice(len(loser), 1)
                loser = np.array([ loser[loser_ind] ])
            print(f'Removing {loser[0]}')
            votes_copy = RemovePolicy(votes, loser[0])
        
    return winner

winners = []

votes_copy = copy.copy(votes)
for round_ind in range(4):
    print(votes_copy)
    print(f'\nRound {round_ind}:')
    round_winners = PerformRankedChoiceVoting(votes_copy, verbose=True) 
    winners.append(round_winners)
    for winner in round_winners:
        votes_copy = RemovePolicy(votes_copy, winner)
        
print(winners)

[[3 3 3 4 3 3 4 4 4 2 4 4 4 0 4 3 4 4 4 4 2 4 4 3 2 4 3 2]
 [2 2 4 3 4 1 3 1 1 3 3 3 3 0 2 4 3 3 2 1 1 3 2 4 1 3 4 3]
 [1 1 2 2 2 2 2 2 2 1 2 2 2 2 1 2 2 2 3 2 3 2 3 2 3 2 2 1]
 [4 4 1 1 1 4 1 3 3 4 1 1 1 1 3 1 1 1 1 3 4 1 1 1 4 1 1 4]]

Round 0:
Checking for a majority.
Vote count: [ 0  6  5 17]
The proportion of first votes was [0.         0.21428571 0.17857143 0.60714286]
The winner was [3]
The winner is [3]
[[3 3 2 3 2 3 3 4 4 2 3 3 3 0 4 2 3 3 3 4 2 3 3 2 2 3 2 2]
 [2 2 3 2 3 1 2 1 1 3 2 2 2 0 2 3 2 2 1 1 1 2 1 3 1 2 3 3]
 [1 1 1 1 1 2 1 2 2 1 1 1 1 1 1 1 1 1 2 2 3 1 2 1 3 1 1 1]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]]

Round 1:
Checking for a majority.
Vote count: [ 0  8 20  0]
The proportion of first votes was [0.         0.28571429 0.71428571 0.        ]
The winner was [2]
The winner is [2]
[[2 2 1 2 1 3 2 4 4 1 2 2 2 0 3 1 2 2 3 4 2 2 3 1 2 2 1 1]
 [1 1 2 1 2 1 1 1 1 2 1 1 1 0 1 2 1 1 1 1 1 1 1 2 1 1 2 2]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 

In [18]:
# The Borda count is a one-liner, haha.
# The lowest wins.  Note that missing votes are treated badly here,
# as a missing vote gives a lower vote than is possible otherwise.
# A real Borda count should not allow missing votes.
np.sum(votes, axis=1)

array([92, 69, 55, 57])