In [305]:
import numpy as np
import pandas as pd
import geopandas as gpd

from shapely import wkb
from shapely.geometry import Point
from sklearn.cluster import DBSCAN
from sklearn.neighbors import BallTree
from datetime import datetime
from tqdm import tqdm

# load data

In [389]:
fire = pd.read_parquet("data/fire_episodes_2020_2025.parquet")
smoke = pd.read_parquet("data/ca_smoke_polygons_2020_2024.parquet")
data = pd.read_parquet("data/pm25_hourly")

smoke["geometry"] = smoke["geometry"].apply(wkb.loads)
smoke_gdf = gpd.GeoDataFrame(smoke, geometry="geometry", crs="EPSG:4326")
smoke_gdf["smoke_id"] = smoke_gdf["index"]
smoke_gdf["start_time"] = pd.to_datetime(smoke_gdf["Start Time"])
smoke_gdf["end_time"]   = pd.to_datetime(smoke_gdf["End Time"])

fire["geometry"] = fire["geometry"].apply(wkb.loads)
fire_gdf = gpd.GeoDataFrame(fire, geometry="geometry", crs="EPSG:4326")
fire["timestamp"] = pd.to_datetime(fire["start_time"])


# floor smoke start time + centroid trees for smoke

In [307]:
smoke_proj = smoke_gdf.to_crs(3310)
smoke_proj["centroid"] = smoke_proj.geometry.centroid
smoke_gdf["centroid"] = smoke_proj["centroid"].to_crs(4326)

smoke_gdf["hour_start"] = smoke_gdf["start_time"].dt.floor("H")
smoke_gdf["hour_end"]   = smoke_gdf["end_time"].dt.floor("H")

smoke_gdf["hour_bucket"] = smoke_gdf["hour_start"]

smoke_trees = {}
smoke_hour_groups = {}

for hour, group in smoke_gdf.groupby("hour_bucket"):
    smoke_hour_groups[hour] = group

    # building BallTree on centroids
    coords = np.vstack([
        group["centroid"].y.values,
        group["centroid"].x.values
    ]).T
    coords_rad = np.radians(coords)
    tree = BallTree(coords_rad, metric="haversine")
    smoke_trees[hour] = tree

  smoke_gdf["hour_start"] = smoke_gdf["start_time"].dt.floor("H")
  smoke_gdf["hour_end"]   = smoke_gdf["end_time"].dt.floor("H")


# ST-DBSCAN for fire episode prediction + fire centroid tree for fire

In [388]:
coords_rad = np.radians(fire[["lat", "lon"]])
t0 = fire["timestamp"].min()
fire["time_hours"] = (fire["timestamp"] - t0).dt.total_seconds() / 3600.0

temp_scale = 6.0   # hours scale
space_scale_km = 5 # 5 km spatial radius

EARTH_RADIUS = 6371.0
coords_km = coords_rad.copy()
coords_km["lat"] = EARTH_RADIUS * coords_rad["lat"]
coords_km["lon"] = EARTH_RADIUS * np.cos(coords_rad["lat"]) * coords_rad["lon"]

X = np.column_stack([
    coords_km["lat"],
    coords_km["lon"],
    fire["time_hours"] / temp_scale  # normalize time dimension
])

db = DBSCAN(
    eps=space_scale_km,
    min_samples=3,
    metric="euclidean",
    n_jobs=-1
)

fire["episode_key"] = db.fit_predict(X)
fire_clean = fire[fire["episode_key"] != -1].copy()

episodes = (fire_clean
            .groupby("episode_key")
            .agg(
                start_time=("timestamp", "min"),
                end_time=("timestamp", "max"),
                lat_mean=("lat", "mean"),
                lon_mean=("lon", "mean"),
                count_pixels=("lat", "count"),
            )
            .reset_index()
)

fire_ep = episodes.copy()
fire_ep["geometry"] = fire_ep.apply(
    lambda r: Point(r.lon_mean, r.lat_mean),
    axis=1
)
fire_ep = gpd.GeoDataFrame(fire_ep, geometry="geometry", crs="EPSG:4326")

# building BallTree of fire episode centroids
coords_fire = np.radians(
    np.column_stack([fire_ep["lat_mean"], fire_ep["lon_mean"]])
)
fire_tree = BallTree(coords_fire, metric="haversine")

# PM data

In [309]:
def build_pm_gdf(
    data,
    lat_col="latitude",
    lon_col="longitude",
    date_col="date_local",
    time_col="time_local",
):
    dt = pd.to_datetime(
        data[date_col].astype(str) + " " + data[time_col].astype(str),
        errors="coerce",
    )

    data = data.copy()
    data["time_utc"] = dt

    geometry = [Point(xy) for xy in zip(data[lon_col], data[lat_col])]
    pm_gdf = gpd.GeoDataFrame(data, geometry=geometry, crs="EPSG:4326")
    return pm_gdf

In [310]:
pm_gdf = build_pm_gdf(pm_raw)

pm_gdf["timestamp"] = pd.to_datetime(
    pm_gdf["date_local"].astype(str) + " " + pm_gdf["time_local"].astype(str),
    errors="coerce"
)

pm_gdf["hour_bucket"] = pm_gdf["timestamp"].dt.floor("H")

  pm_gdf["hour_bucket"] = pm_gdf["timestamp"].dt.floor("H")


# Given a PM measure, find k nearest active fire + smoke episodes

In [311]:
def nearest_fire_features(pm_row, fire_ep, k=3):
    ts = pm_row.timestamp
    lat, lon = pm_row.geometry.y, pm_row.geometry.x

    active = fire_ep[
        (fire_ep["start_time"] <= ts) &
        (fire_ep["end_time"] >= ts)
    ]

    if active.empty:
        return {
            **{f"fire_id_{i}": np.nan for i in range(1, k+1)},
            **{f"fire_dist_{i}": np.nan for i in range(1, k+1)}
        }

    coords = np.column_stack([active["lat_mean"], active["lon_mean"]])
    tree = BallTree(np.radians(coords), metric="haversine")

    pm_rad = np.radians([[lat, lon]])
    k_use = min(k, len(active))

    dists, idxs = tree.query(pm_rad, k=k_use)
    dists = dists[0] * 6371.0
    idxs = idxs[0]

    active_reset = active.reset_index(drop=True)
    out = {}

    for rank, i in enumerate(idxs, start=1):
        row = active_reset.iloc[i]
        out[f"fire_id_{rank}"] = row["episode_key"]
        out[f"fire_dist_{rank}"] = dists[rank-1]

    for rank in range(len(idxs)+1, k+1):
        out[f"fire_id_{rank}"] = np.nan
        out[f"fire_dist_{rank}"] = np.nan

    return out


def nearest_smoke_features(pm_row, smoke_gdf, k=3):
    ts = pm_row.timestamp
    lat, lon = pm_row.geometry.y, pm_row.geometry.x

    active = smoke_gdf[
        (smoke_gdf["start_time"] <= ts) &
        (smoke_gdf["end_time"] >= ts)
    ]

    if active.empty:
        return {
            **{f"smoke_id_{i}": np.nan for i in range(1, k+1)},
            **{f"smoke_dist_{i}": np.nan for i in range(1, k+1)}
        }

    coords = np.vstack([active["centroid"].y.values,
                        active["centroid"].x.values]).T
    tree = BallTree(np.radians(coords), metric="haversine")

    pm_rad = np.radians([[lat, lon]])
    k_use = min(k, len(active))

    dists, idxs = tree.query(pm_rad, k=k_use)
    dists = dists[0] * 6371.0
    idxs = idxs[0]

    active_reset = active.reset_index(drop=True)
    out = {}

    for rank, i in enumerate(idxs, start=1):
        row = active_reset.iloc[i]
        out[f"smoke_id_{rank}"] = row["smoke_id"]
        out[f"smoke_dist_{rank}"] = dists[rank-1]

    for rank in range(len(idxs)+1, k+1):
        out[f"smoke_id_{rank}"] = np.nan
        out[f"smoke_dist_{rank}"] = np.nan

    return out


In [312]:
results = []

for pm_row in tqdm(pm_gdf.itertuples(), total=len(pm_gdf)):
    sm = nearest_smoke_features(pm_row, smoke_gdf, k=3)
    fr = nearest_fire_features(pm_row, fire_ep, k=3)

    combined = {**sm, **fr}
    results.append(combined)

df_features = pd.DataFrame(results)
gdf_pm = pd.concat([pm_gdf.reset_index(drop=True), df_features], axis=1)

100%|██████████| 24419/24419 [00:58<00:00, 420.76it/s]


In [313]:
gdf_pm["smoke_dist_3"].count()

np.int64(3008)

# code sanity check

In [322]:
pm_with_smoke = gdf_pm[gdf_pm["smoke_id_1"].notna()]

if len(pm_with_smoke) == 0:
    print("No PM rows matched to smoke polygons.")
else:
    # randomly pick 1 row
    sample_pm = pm_with_smoke.sample(1).iloc[0]

    sid = int(sample_pm["smoke_id_1"])
    smoke_row = smoke_gdf.loc[smoke_gdf["smoke_id"] == sid].iloc[0]

    print("Smoke start_time:", smoke_row.start_time)
    print("Smoke end_time:", smoke_row.end_time)

    print("\nMatched Smoke ID:", sid)
    print("PM timestamp:", sample_pm.timestamp)

    print("\nOther data:")
    print("PM geometry:", sample_pm.geometry)
    print("Smoke centroid:", smoke_row.centroid)
    print("Original geometry bounds:", smoke_row.geometry.bounds)

    if smoke_row.start_time <= sample_pm.timestamp <= smoke_row.end_time:
        print("\nPM timestamp is INSIDE smoke duration")
    else:
        print("\nPM timestamp OUTSIDE smoke duration")


Smoke start_time: 2020-10-24 17:30:00
Smoke end_time: 2020-10-24 23:00:00

Matched Smoke ID: 30156
PM timestamp: 2020-10-24 20:00:00

Other data:
PM geometry: POINT (-122.302741 37.864767)
Smoke centroid: POINT (-121.90438127417501 39.04237026837368)
Original geometry bounds: (-122.01939, 38.930085, -121.79014, 39.164657)

PM timestamp is INSIDE smoke duration


In [387]:
pm_with_fire = gdf_pm[gdf_pm["fire_id_3"].notna()]

if len(pm_with_fire) == 0:
    print("No PM rows matched to fire episodes.")
else:
    sample_pm_fire = pm_with_fire.sample(1).iloc[0]

    fid = int(sample_pm_fire["fire_id_1"])
    fire_row = fire_ep.loc[fire_ep["episode_key"] == fid].iloc[0]

    print("Fire start_time:", fire_row.start_time)
    print("Fire end_time:", fire_row.end_time)

    print("\nMatched Fire Episode:", fid)
    print("PM timestamp:", sample_pm_fire.timestamp)

    print("\nOther data:")
    print("PM geometry:", sample_pm_fire.geometry)
    print("Fire centroid:", fire_row.geometry)
    print("Fire duration (min):", (fire_row.end_time - fire_row.start_time))

    if fire_row.start_time <= sample_pm_fire.timestamp <= fire_row.end_time:
        print("\nPM timestamp is INSIDE fire episode window")
    else:
        print("\nM timestamp is OUTSIDE fire episode window")


Fire start_time: 2023-04-03 20:08:00
Fire end_time: 2023-04-06 21:39:00

Matched Fire Episode: 8611
PM timestamp: 2023-04-06 18:00:00

Other data:
PM geometry: POINT (-122.302741 37.864767)
Fire centroid: POINT (-121.654 37.869800000000005)
Fire duration (min): 3 days 01:31:00

PM timestamp is INSIDE fire episode window
