In [None]:
import matplotlib.pyplot as plt

from Ballot import Ballot
from DefaultConfigOptions import *
from PartyPrimaryElection import PartyPrimaryElection
from ElectionResult import ElectionResult
from DistrictData import DistrictVotingRecord, DistrictData
from InstantRunoffElection import InstantRunoffElection
from HeadToHeadElection import HeadToHeadElection
from Population import Population
from NDPopulation import NDPopulation
from typing import List, Set, Callable
from Election import Election
from CandidateModel import CandidateModel
import tensorflow as tf
import tensorboard as tb
from Timings import Timings
import os.path
import pickle

In [None]:


class ElectionConstructor:
    def __init__(self, constructor: Callable[[List[Ballot], Set[Candidate]], Election], name: str):
        self.constructor = constructor
        self.name = name

    def run(self, ballots: List[Ballot], candidates: Set[Candidate]) -> ElectionResult:
        e = self.constructor(ballots, candidates)
        return e.result()


def construct_irv(ballots: List[Ballot], candidates: Set[Candidate]):
    return InstantRunoffElection(ballots, candidates)


def construct_h2h(ballots: List[Ballot], candidates: Set[Candidate]):
    return HeadToHeadElection(ballots, candidates)

In [None]:
class Sample:
    def __init__(self, opponents: List[Candidate], candidate: Candidate):
        self.opponents = opponents.copy()
        self.candidate = candidate

In [None]:
def create_model_and_population(ideology_bins: int, ideology_dim: int) -> (CandidateModel, NDPopulation):
    ideology_bins = 64
    hidden_ratio = 4
    n_hidden = hidden_ratio * ideology_bins * ideology_dim
    n_latent = ideology_bins * ideology_dim
    batch_size = 128
    learn_rate = .001

    model = CandidateModel(ideology_bins=ideology_bins,
                                     ideology_dim=ideology_dim,
                                     n_hidden=n_hidden,
                                     n_latent = n_latent,
                                     learn_rate= learn_rate)

    population_means = np.zeros(shape=(ideology_dim,))
    population_stddev = np.ones(shape=(ideology_dim,))
    pop = NDPopulation(population_means, population_stddev)
    return model, pop

In [None]:
class Stats:
    def __init__(self):
        self.model_count = 1e-5
        self.model_winners = 1e-5
        self.random_count = 1e-5
        self.random_winners = 1e-5
        self.model_winner_distance = 0
        self.random_winner_distance = 0
        self.results = []


    def reset(self):
        self.model_count = 1e-5
        self.model_winners = 1e-5
        self.random_count = 1e-5
        self.random_winners = 1e-5
        self.model_winner_distance = 0
        self.random_winner_distance = 0
        self.results = []

    def update(self, winner: Candidate, candidates: List[Candidate]):
        self.results += (winner, candidates)

        for c in candidates:
            if c.name[0] == 'r':
                self.add_random()
            else:
                self.add_model()

        if winner.name[0] == 'r':
            self.add_random_winner(winner)
        else:
            self.add_model_winner(winner)



    def add_random(self):
        self.random_count += 1

    def add_model(self):
        self.model_count += 1

    def add_random_winner(self, w: Candidate):
        self.random_winners += 1
        self.random_winner_distance += w.ideology.distance_from_o()

    def add_model_winner(self, w: Candidate):
        self.model_winners += 1
        self.model_winner_distance += w.ideology.distance_from_o()

    def print(self, label: str, global_step: int):
        print("%15s %6d, %5d " %
               (label,
                global_step,
               len(self.results)), end="")

        print("random %6d/%6d %5.2f%% O: %5.2f" %
              (self.random_count,
               self.random_winners,
               100 * self.random_winners / self.random_count,
               self.random_winner_distance / self.random_winners), end='')

        print(" model %6d/%6d %5.2f%% O: %5.2f" %
              (self.model_count,
               self.model_winners,
               100 * self.model_winners / self.model_count,
               self.model_winner_distance/ self.model_winners), end='')

        print(" chance of model_winner = %5.2f%%" % (
                100 * self.model_winners / (self.model_winners + self.random_winners))
              )


In [None]:
def run_sample_election(model: CandidateModel, process: ElectionConstructor, population: NDPopulation, timings: Timings):
    candidates = []
    model_entries = set(np.random.choice(range(6), 3, replace=False))
    for i in range(6):
        if i in model_entries and model.ready():
            ideology = Ideology(model.choose_ideology(candidates))
            c = Candidate("m-" + str(i), Independents, ideology, 0)
        else:
            ideology = population.unit_sample_voter().ideology
            c = Candidate("r-" + str(i), Independents, ideology, 0)

        candidates += [c]

    voters = population.generate_unit_voters(1000)
    ballots = [Ballot(v, candidates, unit_election_config) for v in voters]
    result = process.run(ballots, set(candidates))

    winner = result.winner()

    return winner, candidates

In [None]:
def train_candidate_model(model: CandidateModel, process: ElectionConstructor, population: NDPopulation):
    timings = Timings()
    stats = Stats()
    first = True
    while model.global_step < 1000:
        with timings.time_block("run_election"):
            winner, candidates = run_sample_election(model, process, population, timings)
        with timings.time_block("add_sample"):
            for i in range(len(candidates)):
                model.add_sample_from_candidates(candidates[i], candidates[0:i], winner)

        if model.ready():
            if first:
                print("starting to train")
                first = False

            stats.update(winner, candidates)
            with timings.time_block("model.train"):
                model.train(128)
            s = model.global_step
            if (s < 100 and s % 10 == 0) or (s < 1000 and s % 100 == 0) or s % 1000 == 0:
                stats.print(process.name, model.global_step)
                if model.global_step < 10000:
                    stats.reset()

    timings.print()

In [None]:
def check_stats(stats: Stats, model: CandidateModel, process: ElectionConstructor, population: NDPopulation):
    results=[]
    timings = Timings()
    for i in range(1000):
        winner, candidates = run_sample_election(model, process, population, timings)
        stats.update(winner, candidates)
        results.append((winner, candidates))

In [None]:
class ProcessResult:
    def __init__(self, process: ElectionConstructor, bins: int, dim: int, stats: Stats, model: CandidateModel):
        self.process = process
        self.dim = dim
        self.bins = bins
        self.stats = stats
        self.model = model
        self.label = "%15s ib%3d %dD" % (process.name, bins, dim)

    def print(self):
        self.stats.print(self.label, self.model.global_step)

In [None]:
def run_parameter_set(process: ElectionConstructor, ibins: int, dim: int):
    save_path = "models/cm-%s-%03d-%dD.p" % (process.name, ibins, dim)
    model, population = create_model_and_population(ibins, dim)
    if os.path.exists(save_path):
        with open(save_path, "rb") as f:
            model: CandidateModel = pickle.load(f)
    else:
        train_candidate_model(model, process, population)
        model.save_to_file(save_path)

    stats = Stats()
    check_stats(stats, model, process, population)
    return stats, model

In [None]:
dims = [1, 2, 3]
processes = [
    ElectionConstructor(constructor=construct_irv, name="Instant Runoff"),
    ElectionConstructor(constructor=construct_h2h, name="Head-to-Head")
]

results = []
for bins in [64, 128]:
    for process in processes:
        for dim in dims:
            stats, model = run_parameter_set(process, bins, dim)
            results.append(ProcessResult(process, bins, dim, stats, model))
            results[-1].print()

for r in results:
    r.print()