In [1]:
import geopandas as gpd
import pandas as pd
import numpy as np


In [None]:
def compute_violation_areas(
    objects_gdf: gpd.GeoDataFrame,
    nogozones_gdf: gpd.GeoDataFrame
) -> pd.DataFrame:
    """
    Computes violation area per date × no-go zone.

    Returns DataFrame:
    date | zone_id | violation_area_ha | confidence
    """
    # Define expected columns to prevent KeyErrors on empty data
    expected_cols = ["date", "zone_id", "violation_area_ha", "confidence"]

    # 1. Handle completely empty input
    if objects_gdf.empty:
        return pd.DataFrame(columns=expected_cols)

    # CRS safety
    if objects_gdf.crs != nogozones_gdf.crs:
        nogozones_gdf = nogozones_gdf.to_crs(objects_gdf.crs)

    records = []

    # 2. Check if "date" exists in objects_gdf before grouping
    if "date" not in objects_gdf.columns:
        raise KeyError("The input objects_gdf must contain a 'date' column.")

    for date, objs_t in objects_gdf.groupby("date"):
        for _, zone in nogozones_gdf.iterrows():
            zone_geom = zone.geometry
            zone_id = zone.zone_id

            # Spatial check
            intersecting = objs_t[objs_t.intersects(zone_geom)]

            if intersecting.empty:
                area_ha = 0.0
                conf = 0.0
            else:
                # Calculate the actual intersection geometry
                inter_geom = intersecting.geometry.intersection(zone_geom)
                
                # NOTE: Ensure your CRS is in meters (UTM) for .area to represent m²
                # If CRS is EPSG:4326, .area is in square degrees!
                area_ha = inter_geom.area.sum() / 10_000 
                
                if "confidence" in intersecting.columns:
                    conf = intersecting["confidence"].mean()
                else:
                    conf = 80.0

            records.append({
                "date": date,
                "zone_id": zone_id,
                "violation_area_ha": area_ha,
                "confidence": conf
            })

    # 3. Final construction with column safety
    if not records:
        return pd.DataFrame(columns=expected_cols)
        
    df = pd.DataFrame(records)
    df["date"] = pd.to_datetime(df["date"])

    return df

In [3]:
def detect_events_for_zone(
    zone_df: pd.DataFrame,
    area_eps: float = 0.05,
    stable_steps: int = 3
) -> list:
    """
    Detects regulatory events for a single no-go zone.
    """

    events = []

    prev_area = 0.0
    stable_count = 0
    first_violation_seen = False

    for _, row in zone_df.iterrows():

        date = row.date
        area = row.violation_area_ha
        conf = row.confidence

        # No violation
        if area <= area_eps:
            prev_area = area
            stable_count = 0
            continue

        # First violation
        if not first_violation_seen:
            events.append({
                "date": date,
                "zone_id": row.zone_id,
                "violation_area_ha": area,
                "event_type": "first_violation",
                "confidence": conf
            })
            first_violation_seen = True
            prev_area = area
            stable_count = 0
            continue

        # Expansion
        if area > prev_area + area_eps:
            events.append({
                "date": date,
                "zone_id": row.zone_id,
                "violation_area_ha": area,
                "event_type": "expansion",
                "confidence": conf
            })
            stable_count = 0

        # Stabilization
        else:
            stable_count += 1
            if stable_count == stable_steps:
                events.append({
                    "date": date,
                    "zone_id": row.zone_id,
                    "violation_area_ha": area,
                    "event_type": "stabilized",
                    "confidence": conf
                })

        prev_area = area

    return events


In [4]:
def generate_no_go_zone_alerts(
    objects_gdf: gpd.GeoDataFrame,
    nogozones_gdf: gpd.GeoDataFrame,
    mine_id: str,
    area_eps: float = 0.05,
    stable_steps: int = 3
):
    """
    Full STEP 7 pipeline.

    Returns:
    --------
    alerts_df     : Regulatory alert log
    violation_df  : Time series per zone
    """

    # Step 1 — spatial violation areas
    violation_df = compute_violation_areas(
        objects_gdf=objects_gdf,
        nogozones_gdf=nogozones_gdf
    )

    # Step 2 — event detection
    alert_records = []

    for zone_id, zdf in violation_df.groupby("zone_id"):
        zdf = zdf.sort_values("date")
        alert_records.extend(
            detect_events_for_zone(
                zdf,
                area_eps=area_eps,
                stable_steps=stable_steps
            )
        )

    alerts_df = pd.DataFrame(alert_records)

    if not alerts_df.empty:
        alerts_df["mine_id"] = mine_id
        alerts_df = alerts_df[
            ["date", "mine_id", "zone_id",
             "violation_area_ha", "event_type", "confidence"]
        ]

    return alerts_df, violation_df


In [None]:
# Example Usage 
alerts_df, violation_df = generate_no_go_zone_alerts(
    objects_gdf=objects_gdf,
    nogozones_gdf=nogozones_gdf,
    mine_id="mine_001",
    area_eps=0.05,
    stable_steps=3
)

alerts_df.head()


In [None]:
# Optinal export

def export_no_go_outputs(
    alerts_df: pd.DataFrame,
    violation_df: pd.DataFrame,
    out_dir: str = "./results"
):
    import os
    os.makedirs(out_dir, exist_ok=True)

    alerts_df.to_csv(f"{out_dir}/no_go_zone_alert_log.csv", index=False)
    violation_df.to_csv(f"{out_dir}/no_go_zone_violation_timeseries.csv", index=False)
