In [None]:
import math
import numpy as np
import geopandas as gpd
import logging

# Set up logging
logger = logging.getLogger(__name__)

: 

In [None]:

def create_buckets_from_tracts(
    initial_state_tracts: gpd.GeoDataFrame,
    geoid_field_name: str,
    target_score_field: str,
    high_low_zoom_threshold: int,
    number_of_buckets: int,
    homogeneity_threshold: int
):
    """
    Groups geographic tracts into buckets based on their scores.

    Args:
        initial_state_tracts (gpd.GeoDataFrame): GeoDataFrame containing the tracts.
        geoid_field_name (str): The name of the GEOID field in the GeoDataFrame.
        target_score_field (str): The name of the score field to use for bucketing.
        high_low_zoom_threshold (int): Minimum number of tracts required to avoid aggregation.
        number_of_buckets (int): Initial number of buckets to divide the tracts into.
        homogeneity_threshold (int): Threshold for adjusting bucket sizes for homogeneity.

    Returns:
        tuple: A tuple containing:
            - state_tracts (gpd.GeoDataFrame): Tracts grouped into buckets.
            - high_zoom_tracts (gpd.GeoDataFrame): Tracts kept at high zoom levels.
    """
    # Step 1: Identify states with fewer tracts than the threshold
    highzoom_state_tracts = initial_state_tracts.reset_index()
    highzoom_state_tracts["state"] = highzoom_state_tracts[geoid_field_name].str[:2]
    keep_high_zoom = highzoom_state_tracts.groupby("state")[geoid_field_name].transform(
        lambda x: x.count() <= high_low_zoom_threshold
    )

    # Ensure some tracts are kept at high zoom
    assert keep_high_zoom.sum() != initial_state_tracts.shape[0], \
        "Error: Cutoff is too high, nothing is aggregated"
    assert keep_high_zoom.sum() > 1, "Error: Nothing is kept at high zoom"

    # Step 2: Separate tracts for high zoom and those to be bucketed
    state_tracts = initial_state_tracts[~keep_high_zoom].copy()
    state_tracts[f"{target_score_field}_bucket"] = np.arange(len(state_tracts))

    # Step 3: Sort tracts by score and calculate bucket size
    state_tracts = state_tracts.sort_values(target_score_field, ascending=True)
    score_bucket = []
    bucket_size = math.ceil(len(state_tracts.index) / number_of_buckets)

    # Step 4: Adjust bucket size for homogeneity
    while state_tracts[target_score_field].sum() % bucket_size > homogeneity_threshold:
        number_of_buckets += 1
        bucket_size = math.ceil(len(state_tracts.index) / number_of_buckets)

    logger.debug(f"The number of buckets has increased to {number_of_buckets}")

    # Step 5: Assign tracts to buckets
    for i in range(len(state_tracts.index)):
        score_bucket.append(math.floor(i / bucket_size))
    state_tracts[f"{target_score_field}_bucket"] = score_bucket

    # Step 6: Return bucketed tracts and high zoom tracts
    return state_tracts, initial_state_tracts[keep_high_zoom]

In [None]:
import geopandas as gpd

# Example input data
initial_state_tracts = gpd.read_file("path_to_your_geojson_or_shapefile.geojson")

# Parameters
geoid_field_name = "GEOID10"
target_score_field = "SCORE"
high_low_zoom_threshold = 150
number_of_buckets = 10
homogeneity_threshold = 200

# Call the function
bucketed_tracts, high_zoom_tracts = create_buckets_from_tracts(
    initial_state_tracts,
    geoid_field_name,
    target_score_field,
    high_low_zoom_threshold,
    number_of_buckets,
    homogeneity_threshold
)

# Output results
print("Bucketed Tracts:")
print(bucketed_tracts.head())

print("High Zoom Tracts:")
print(high_zoom_tracts.head())

In [None]:
def _aggregate_buckets(
        self, state_tracts: gpd.GeoDataFrame, agg_func: str
    ) -> gpd.GeoDataFrame:
        keep_cols = [
            self.TARGET_SCORE_RENAME_TO,
            f"{self.TARGET_SCORE_RENAME_TO}_bucket",
            self.GEOMETRY_FIELD_NAME,
        ]

        #  We dissolve all other tracts by their score bucket
        state_dissolve = state_tracts[keep_cols].dissolve(
            by=f"{self.TARGET_SCORE_RENAME_TO}_bucket", aggfunc=agg_func
        )
        return state_dissolve

def _breakup_multipolygons(
    self, state_bucketed_df: gpd.GeoDataFrame, num_buckets: int
) -> gpd.GeoDataFrame:

    compressed = []
    for i in range(num_buckets):
        for j in range(
            len(state_bucketed_df[self.GEOMETRY_FIELD_NAME][i].geoms)
        ):
            compressed.append(
                [
                    state_bucketed_df[self.TARGET_SCORE_RENAME_TO][i],
                    state_bucketed_df[self.GEOMETRY_FIELD_NAME][i].geoms[j],
                ]
            )
    return compressed

def _join_high_and_low_zoom_frames(
    self, compressed: list, keep_high_zoom_df: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
    keep_columns = [
        self.TARGET_SCORE_RENAME_TO,
        self.GEOMETRY_FIELD_NAME,
    ]
    compressed_geodf = gpd.GeoDataFrame(
        compressed,
        columns=keep_columns,
        crs="EPSG:4326",
    )
    return pd.concat([compressed_geodf, keep_high_zoom_df[keep_columns]])

In [None]:
def breakup_multipolygons(
    state_bucketed_df: gpd.GeoDataFrame,
    target_score_field: str,
    geometry_field_name: str,
    num_buckets: int
) -> list:
    """
    Breaks up multipolygon geometries into individual polygons.

    Args:
        state_bucketed_df (gpd.GeoDataFrame): GeoDataFrame containing bucketed geometries.
        target_score_field (str): The name of the score field.
        geometry_field_name (str): The name of the geometry field.
        num_buckets (int): Number of buckets.

    Returns:
        list: A list of individual polygons with their associated scores.
    """
    compressed = []
    for i in range(num_buckets):
        for j in range(len(state_bucketed_df[geometry_field_name][i].geoms)):
            compressed.append(
                [
                    state_bucketed_df[target_score_field][i],
                    state_bucketed_df[geometry_field_name][i].geoms[j],
                ]
            )
    return compressed

In [None]:
import pandas as pd

def join_high_and_low_zoom_frames(
    compressed: list,
    keep_high_zoom_df: gpd.GeoDataFrame,
    target_score_field: str,
    geometry_field_name: str
) -> gpd.GeoDataFrame:
    """
    Combines high-zoom tracts with bucketed low-zoom tracts.

    Args:
        compressed (list): List of individual polygons with their scores.
        keep_high_zoom_df (gpd.GeoDataFrame): GeoDataFrame of high-zoom tracts.
        target_score_field (str): The name of the score field.
        geometry_field_name (str): The name of the geometry field.

    Returns:
        gpd.GeoDataFrame: Combined GeoDataFrame.
    """
    keep_columns = [
        target_score_field,
        geometry_field_name,
    ]
    compressed_geodf = gpd.GeoDataFrame(
        compressed,
        columns=keep_columns,
        crs="EPSG:4326",
    )
    return pd.concat([compressed_geodf, keep_high_zoom_df[keep_columns]])

In [None]:
# Example usage of the standalone functions
import geopandas as gpd

# Load your GeoDataFrame
state_tracts = gpd.read_file("path_to_your_geojson_or_shapefile.geojson")

# Parameters
target_score_field = "SCORE"
geometry_field_name = "geometry"
agg_func = "mean"
num_buckets = 10

# Aggregate buckets
aggregated = aggregate_buckets(state_tracts, target_score_field, geometry_field_name, agg_func)

# Break up multipolygons
compressed = breakup_multipolygons(aggregated, target_score_field, geometry_field_name, num_buckets)

# Combine high-zoom and low-zoom frames
keep_high_zoom_df = gpd.read_file("path_to_high_zoom_geojson.geojson")
combined = join_high_and_low_zoom_frames(compressed, keep_high_zoom_df, target_score_field, geometry_field_name)

# Save the result
combined.to_file("path_to_output.geojson", driver="GeoJSON")