In [None]:
from datetime import datetime
import itertools
import json
import math
import os

import geopandas
import numpy as np
import pandas

In [None]:
spas_df = geopandas.read_file("LCPS_data/PlanningZones_2017_2018.shp")
students_df = geopandas.read_file("LCPS_data/Students_2017_2018.shp")

In [None]:
weights = list(range(0, 11))
X_S = ["ES", "MS", "HS"]
shc_results = dict(zip(X_S, [dict() for S in X_S]))

for weight, school_level in itertools.product(weights, X_S):
    with open("results/SHC/run%d_%s_SHC.json" % (weight, school_level), "r") as results_file:
        shc_results[school_level][weight] = json.load(results_file)

In [None]:
# These 3 lines take ~2-3 minutes.
students_df = students_df[students_df.GRADE != 14]
loudoun_polygon = spas_df.unary_union
students_df = students_df[students_df.geometry.map(loudoun_polygon.contains)]

def group_students_by_spa(student):
    """GroupBy function for gathering student records to their SPAs.
    
    Args:
        student: integer row index of the student record
    
    Returns:
        A label to group this record into (the student's SPA).
    """
    spa_index = spas_df.geometry.map(lambda spa: spa.contains(students_df.loc[student].geometry))
    spa_index = spa_index[spa_index == True]
    if len(spa_index) == 0:
        return "NONE"
    assert(len(spa_index) == 1)
    return spas_df.loc[spa_index.index[0]].STDYAREA

groups_path = "data/spa_groups.json"
if os.path.exists(groups_path):
    with open(groups_path, "r") as fp:
        spa_groups = json.load(fp)
else:
    # This is O(k * n) overall for k SPAs and n students.
    # Expect ~2.25 hours on a laptop or 1.5 on a server.
    tic = datetime.now()
    spa_groups = students_df.groupby(by=group_students_by_spa).groups
    toc = datetime.now()
    print(toc - tic)

    def serialize_groups(obj):
        if isinstance(obj, pandas.Int64Index):
            return list(obj)
        else:
            raise Exception("obj was not an Int64Index as expected...")

    with open(groups_path, "w") as fp:
        json.dump(spa_groups, fp, default=serialize_groups)

In [None]:
# This has the form of Dict[school_level: str][weight: int] = List[trial: float]
# Utilization scores: lower is better
utilization_scores = dict(zip(X_S, [dict() for S in X_S]))

sigma = 5
for school_level in X_S:
    for weight in weights:
        final_partitions = [shc_results[school_level][weight][str(trial)]["info"]["Final"]["zones"] for trial in range(1, 26)]
        trial_utilizations = []

        for i, trial in enumerate(final_partitions, start=1):
            utilization = 0
            for school, school_district in trial.items():
                school_population = school_district["Population"]
                school_capacity = school_district["Capacity"]
                utilization += abs(math.atan(sigma * (1 - school_population / school_capacity)))
            utilization /= ((math.pi / 2) * len(trial.keys()))
            assert(utilization >= 0 and utilization <= 1)
            trial_utilizations.append(utilization) 
        utilization_scores[school_level][weight] = trial_utilizations

In [None]:
# Socioeconomic scores: lower is better
socioeconomic_scores = dict(zip(X_S, [dict() for S in X_S]))

# FSI value of 1 or 2 indicates free/reduced meals.
county_farm_rate = len(students_df[students_df.FSI <= 2]) / len(students_df)

def count_school_farms(spa_name, school_level):
    try:
        spa_students = students_df.loc[spa_groups[spa_name]]
    except:
        spa = spas_df[spas_df.STDYAREA == spa_name].iloc[0]
        assert spa.TOTAL_KG_5 + spa.TOTAL_6_8 + spa.TOTAL_9_12 == 0
        return 0
    if school_level == "ES":
        spa_students = spa_students[(spa_students.GRADE <= 5) | (spa_students.GRADE == 13)]
    elif school_level == "MS":
        spa_students = spa_students[(spa_students.GRADE >= 6) & (spa_students.GRADE <= 8)]
    elif school_level == "HS":
        spa_students = spa_students[(spa_students.GRADE >= 9) & (spa_students.GRADE <= 12)]
    return len(spa_students[spa_students.FSI <= 2])

tic = datetime.now()
for school_level in X_S:
    for weight in weights:
        final_partitions = [shc_results[school_level][weight][str(trial)]["info"]["Final"]["zones"] for trial in range(1, 26)]
        trial_economic = []

        for i, trial in enumerate(final_partitions, start=1):
            economic = 0
            for school, school_district in trial.items():
                school_farm_students = sum([count_school_farms(spa, school_level) for spa in school_district["STATE"]])
                if school_district["Population"] == 0:
                    # There are some strange plans where nobody was assigned to a school...
                    economic += county_farm_rate
                else:
                    economic += abs(school_farm_students / school_district["Population"] - county_farm_rate)
            economic /= len(trial.keys())
            assert economic >= 0 and economic <= county_farm_rate, economic
            trial_economic.append(economic)
        utilization_scores[school_level][weight] = trial_economic
toc = datetime.now()
print(toc - tic)