In [1]:
from geopy.distance import geodesic
import requests
import re
from thefuzz import process, fuzz
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import math
import osmnx as ox
import networkx as nx
import numpy as np
from shapely.geometry import Point

In [2]:
school_boundary_gdfs =[
    gpd.read_file("../../data/raw/shapefiles/quality_education/Administrative.geojson").to_crs("EPSG:4326"),
    gpd.read_file("../../data/raw/shapefiles/quality_education/APSBoundaries.json").to_crs("EPSG:4326"),
    gpd.read_file("../../data/raw/shapefiles/quality_education/DKE.json").to_crs("EPSG:4326"),
    gpd.read_file("../../data/raw/shapefiles/quality_education/DKM.json").to_crs("EPSG:4326"),
    gpd.read_file("../../data/raw/shapefiles/quality_education/DKBHS.json").to_crs("EPSG:4326")
    ]

In [3]:
school_df = pd.read_csv("../../data/processed/scoring_indicators/quality_education_areas/Option_C_Scores_Eligibility_with_BTO.csv")

In [4]:
state_avg_by_year ={
        "elementary": {
            2018: 77.8,
            2019: 79.9
        },
        "middle": {
            2018: 76.2,
            2019: 77
        },
        "high": {
            2018: 75.3,
            2019: 78.8
        }
    }

In [5]:
def get_school_names(point, school_boundary_gdfs):
    elementary = []
    middle = []
    high = []

    for i, gdf in enumerate(school_boundary_gdfs):
        if gdf is None or point is None:
            continue
        if gdf.crs != "EPSG:4326":
            gdf = gdf.to_crs("EPSG:4326")
        matched = gdf[gdf.contains(point)]
        if matched.empty:
            continue

        if i == 0:
            elementary.extend(matched["ELEMENTARY"].dropna().tolist())
            middle.extend(matched["MIDDLE"].dropna().tolist())
            high.extend(matched["HIGH"].dropna().tolist())
        elif i == 1:
            elementary.extend(matched["Elementary"].dropna().tolist())
            middle.extend(matched["Middle"].dropna().tolist())
            high.extend(matched["High"].dropna().tolist())
        elif i == 2:
            elementary.extend(matched["DDP_ES_Nam"].dropna().tolist())
        elif i == 3:
            middle.extend(matched["DDP_MS_Name"].dropna().tolist())
        elif i == 4:
            high.extend(matched["DDP_HS_Nam"].dropna().tolist())
    return elementary, middle, high

In [6]:
def preprocess_school_name(name):
    name = re.sub(r'[^\w\s]', '', str(name).lower())
    suffixes = ["elementary", "middle", "high", "school", "academy", "jr", "sr", "dr", "es", "ms", "hs"]
    tokens = [token for token in name.split() if token not in suffixes]
    cleaned = " ".join(tokens).strip()
    return cleaned

In [7]:
def find_best_match(school_df, school_names, school_type):
    if not school_names:
        return None

    grade_cluster = {"elementary": "E", "middle": "M", "high": "H"}.get(school_type.lower())
    filtered_df = school_df[school_df["Grade Cluster"] == grade_cluster]
    if filtered_df.empty:
        return None

    best_score = 0
    best_match_row = None

    for name in school_names:
        cleaned_input = preprocess_school_name(name)
        # store preprocessed name with index for lookup
        cleaned_map = filtered_df["School Name"].apply(preprocess_school_name)
        cleaned_names = cleaned_map.tolist()
        match, score = process.extractOne(cleaned_input, cleaned_names, scorer=fuzz.token_set_ratio)
        if score > best_score and score > 80:
            best_score = score
            match_index = cleaned_map[cleaned_map == match].index[0]
            best_match_row = filtered_df.loc[match_index]

    return best_match_row


In [8]:
def qualifies_by_A(school, state_avg_by_year):
    grade_cluster = school.get("Grade Cluster", "").strip().upper()
    cluster_key = {"E": "elementary", "M": "middle", "H": "high"}.get(grade_cluster)
    if not cluster_key or cluster_key not in state_avg_by_year:
        return False
    years = [y for y in [2018, 2019] if y in school.index and not pd.isna(school[y])]
    if not years:
        return False
    school_avg = school[years].mean()
    state_avg = sum(state_avg_by_year[cluster_key][y] for y in years) / len(years)
    return school_avg > state_avg

In [9]:
def qualifies_by_B(school):
    return school.get("2019 BTO Designation", "") == "Beating the Odds"

def qualifies_by_C(school):
    try:
        return (
            float(school["YoY Average"]) > 0 and
            float(school["Average score"]) >= float(school["Applicable 25th Percentile"])
        )
    except (ValueError, TypeError, KeyError):
        return False

def grade_cluster_to_grades(cluster):
    return {
        'E': list(range(0, 6)),
        'M': list(range(6, 9)),
        'H': list(range(9, 13)),
    }.get(str(cluster).strip().upper(), [])

In [10]:
def calculate_quality_education_score(latitude, longitude, school_df, state_avg_by_year, school_boundary_gdfs):
    point = Point(longitude, latitude)
    elementary, middle, high = get_school_names(point, school_boundary_gdfs)

    best_elementary = find_best_match(school_df, elementary, "elementary")
    best_middle = find_best_match(school_df, middle, "middle")
    best_high = find_best_match(school_df, high, "high")

    total_qualified_grades = set()
    tenancy_type = "family"

    for school in [best_elementary, best_middle, best_high]:
        if school is None or not isinstance(school, pd.Series):
            continue
        if (qualifies_by_A(school, state_avg_by_year) or
            qualifies_by_B(school) or
            qualifies_by_C(school)):
            grades = grade_cluster_to_grades(school.get("Grade Cluster", ""))
            total_qualified_grades.update(grades)

    grade_count = len(total_qualified_grades)
    if grade_count == 0:
        return 0
    elif grade_count == 3:
        return 1
    elif grade_count == 7:
        return 1.5
    elif grade_count == 13:
        return 3 if tenancy_type.lower() == "family" else 2
    elif 3 < grade_count < 7:
        return 1
    elif 7 < grade_count < 13:
        return 1.5
    return 0


In [11]:
df = pd.read_csv("education_score_metro_atl_point.csv")

scores = []
for i in range(len(df)):
    lat = df.loc[i, "latitude"]
    lon = df.loc[i, "longitude"]
    score = calculate_quality_education_score(lat, lon, school_df, state_avg_by_year, school_boundary_gdfs)
    scores.append(score)

df['score'] = scores
df.to_csv("education_score_metro_atl_point_with_scores.csv", index=False)