In [1]:
%pip install geopandas pyproj shapely fiona numpy pandas


Defaulting to user installation because normal site-packages is not writeable
Collecting geopandas
  Downloading geopandas-1.1.1-py3-none-any.whl.metadata (2.3 kB)
Collecting pyproj
  Downloading pyproj-3.7.2-cp313-cp313-win_amd64.whl.metadata (31 kB)
Collecting shapely
  Downloading shapely-2.1.2-cp313-cp313-win_amd64.whl.metadata (7.1 kB)
Collecting fiona
  Downloading fiona-1.10.1-cp313-cp313-win_amd64.whl.metadata (58 kB)
Collecting pyogrio>=0.7.2 (from geopandas)
  Downloading pyogrio-0.12.1-cp313-cp313-win_amd64.whl.metadata (6.0 kB)
Collecting click~=8.0 (from fiona)
  Downloading click-8.3.1-py3-none-any.whl.metadata (2.6 kB)
Collecting click-plugins>=1.0 (from fiona)
  Downloading click_plugins-1.1.1.2-py2.py3-none-any.whl.metadata (6.5 kB)
Collecting cligj>=0.5 (from fiona)
  Downloading cligj-0.7.2-py3-none-any.whl.metadata (5.0 kB)
Downloading geopandas-1.1.1-py3-none-any.whl (338 kB)
Downloading pyproj-3.7.2-cp313-cp313-win_amd64.whl (6.3 MB)
   ---------------------------


[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import os
import ast
import time
import gc
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from tqdm import tqdm

# SET GDAL/OGRIO OPTIONS TO SKIP COMPLEX POLYGON PROCESSING
# This prevents the "organizePolygons" warning and massive slowdowns
os.environ['GDAL_DISABLE_READDIR_ON_OPEN'] = 'YES'
os.environ['GDAL_INGESTED_FILE_CACHE_SIZE'] = '0'

# Import ogrio after setting env vars
import warnings
warnings.filterwarnings('ignore', message='organizePolygons')

# ---------------------------------------------------------
# USER PATHS
# ---------------------------------------------------------
GSSURGO_GDB = r"C:\Users\wongb\Downloads\gSSURGO_CONUS\gSSURGO_CONUS.gdb"  # <-- change this
STRUCT_COORDS_CSV = r"C:\Users\wongb\Bridge-ML\Bridge-ML-LLM-Embedding-Architecture\enriched_data\structure_coordinates.csv"
OUT_CSV = r"C:\Users\wongb\Bridge-ML\Bridge-ML-LLM-Embedding-Architecture\enriched_data\gssurgo_enriched.csv"

# Crash prevention settings
CHUNK_SIZE = 500  # Process structures in chunks to avoid memory spikes during spatial join
ENABLE_CHECKPOINTS = True  # Save intermediate results
CHECKPOINT_DIR = r"C:\Users\wongb\Bridge-ML\Bridge-ML-LLM-Embedding-Architecture\checkpoints"

# Create checkpoint directory if needed
if ENABLE_CHECKPOINTS and not os.path.exists(CHECKPOINT_DIR):
    os.makedirs(CHECKPOINT_DIR)
    print(f"Created checkpoint directory: {CHECKPOINT_DIR}")

# If you're not sure about the polygon layer name, uncomment this to list all layers:
# from fiona import listlayers
# print(listlayers(GSSURGO_GDB))

# Typical layer names:
MUPOLYGON_LAYER = "MUPOLYGON"   # or "MapunitPoly" etc.
MUAGGATT_LAYER = "muaggatt"
COMPONENT_LAYER = "component"
CHORIZON_LAYER = "chorizon"

# ---------------------------------------------------------
# 1. Load structure coordinates as a GeoDataFrame
# ---------------------------------------------------------
t1 = time.time()
print("\n[STEP 1] Loading structure coordinates...")

df_struct = pd.read_csv(STRUCT_COORDS_CSV)

def parse_coord(x):
    if isinstance(x, tuple):
        return x
    try:
        return ast.literal_eval(x)
    except Exception:
        return (np.nan, np.nan)

df_struct["COORDINATES"] = df_struct["COORDINATES"].apply(parse_coord)
df_struct["LAT"] = df_struct["COORDINATES"].apply(lambda c: c[0])
df_struct["LON"] = df_struct["COORDINATES"].apply(lambda c: c[1])

gdf_struct = gpd.GeoDataFrame(
    df_struct,
    geometry=gpd.points_from_xy(df_struct["LON"], df_struct["LAT"]),
    crs="EPSG:4326"
)
print(f"  ✓ Loaded {len(gdf_struct)} structures in {time.time() - t1:.2f}s")

# ---------------------------------------------------------
# 2. Load gSSURGO polygon layer & tables
# ---------------------------------------------------------
t2 = time.time()
print("\n[STEP 2] Loading gSSURGO polygons and tables...")

try:
    print("  Loading MUPOLYGON (this may take a moment with complex geometries)...")
    gdf_mu = gpd.read_file(GSSURGO_GDB, layer=MUPOLYGON_LAYER)
    print(f"  ✓ Loaded MUPOLYGON layer: {len(gdf_mu)} polygons in {time.time() - t2:.2f}s")
    
    # AGGRESSIVE simplification to prevent bad_alloc
    print("  Aggressively simplifying polygon geometries...")
    t_simplify = time.time()
    gdf_mu['geometry'] = gdf_mu.geometry.simplify(tolerance=0.001, preserve_topology=True)
    gdf_mu = gdf_mu.reset_index(drop=True)
    gc.collect()  # Force garbage collection
    print(f"  ✓ Simplified geometries in {time.time() - t_simplify:.2f}s")
    
    # Most gSSURGO polygon layers are in EPSG:4269 (NAD83), but we'll reproject to WGS84.
    if gdf_mu.crs is None:
        gdf_mu.set_crs("EPSG:4269", inplace=True)
        print("    (Set CRS to EPSG:4269)")
    
    t_reproject = time.time()
    gdf_mu = gdf_mu.to_crs("EPSG:4326")
    gc.collect()
    print(f"  ✓ Reprojected to EPSG:4326 in {time.time() - t_reproject:.2f}s")
    
except Exception as e:
    print(f"  ✗ Error loading MUPOLYGON: {e}")
    raise

# Non-spatial tables (GeoPandas will load them as DataFrames with no geometry)
try:
    t_muagg = time.time()
    df_muagg = gpd.read_file(GSSURGO_GDB, layer=MUAGGATT_LAYER)
    print(f"  ✓ Loaded MUAGGATT in {time.time() - t_muagg:.2f}s")
    
    t_comp = time.time()
    df_comp = gpd.read_file(GSSURGO_GDB, layer=COMPONENT_LAYER)
    print(f"  ✓ Loaded COMPONENT in {time.time() - t_comp:.2f}s")
    
    t_ch = time.time()
    df_ch = gpd.read_file(GSSURGO_GDB, layer=CHORIZON_LAYER)
    print(f"  ✓ Loaded CHORIZON in {time.time() - t_ch:.2f}s")
    
except Exception as e:
    print(f"  ✗ Error loading attribute tables: {e}")
    raise

# Make sure they're plain pandas DataFrames (drop geometry if present)
df_muagg = pd.DataFrame(df_muagg.drop(columns=[c for c in df_muagg.columns if c == "geometry"]))
df_comp = pd.DataFrame(df_comp.drop(columns=[c for c in df_comp.columns if c == "geometry"]))
df_ch = pd.DataFrame(df_ch.drop(columns=[c for c in df_ch.columns if c == "geometry"]))

# ---------------------------------------------------------
# 3. Spatial join: bridge point → MUKEY (CHUNKED to prevent memory overflow)
# ---------------------------------------------------------
t3 = time.time()
print("\n[STEP 3] Performing spatial join (bridges → MUKEY) in chunks...")
print(f"  Processing {len(gdf_struct)} structures in chunks of {CHUNK_SIZE}...")

# Process structures in chunks to avoid memory explosion
chunk_results = []
num_chunks = (len(gdf_struct) + CHUNK_SIZE - 1) // CHUNK_SIZE

for chunk_idx in tqdm(range(num_chunks), desc="Spatial join chunks", unit=" chunk"):
    start_idx = chunk_idx * CHUNK_SIZE
    end_idx = min(start_idx + CHUNK_SIZE, len(gdf_struct))
    
    gdf_struct_chunk = gdf_struct.iloc[start_idx:end_idx].copy()
    
    try:
        # Spatial join for this chunk
        chunk_joined = gpd.sjoin(
            gdf_struct_chunk,
            gdf_mu[["MUKEY", "geometry"]],
            how="left",
            predicate="intersects"
        ).drop(columns=["index_right"], errors='ignore')
        
        chunk_results.append(chunk_joined)
        del chunk_joined  # Free memory
        gc.collect()
        
    except Exception as e:
        print(f"\n  ✗ Error in chunk {chunk_idx}: {e}")
        print(f"  Retrying chunk {chunk_idx} with 'within' predicate...")
        try:
            chunk_joined = gpd.sjoin(
                gdf_struct_chunk,
                gdf_mu[["MUKEY", "geometry"]],
                how="left",
                predicate="within"
            ).drop(columns=["index_right"], errors='ignore')
            chunk_results.append(chunk_joined)
            del chunk_joined
            gc.collect()
        except Exception as e2:
            print(f"  ✗ Both predicates failed for chunk {chunk_idx}: {e2}")
            raise

# Concatenate all chunks
gdf_struct_mu = pd.concat(chunk_results, ignore_index=True)
del chunk_results
gc.collect()
print(f"\n  ✓ Spatial join completed in {time.time() - t3:.2f}s ({len(gdf_struct_mu)} records matched)")

# ---------------------------------------------------------
# 4. Prepare muaggatt: mapunit-level aggregated features
# ---------------------------------------------------------
t4 = time.time()
print("\n[STEP 4] Preparing muaggatt features...")

mu_cols = [
    "MUKEY",
    "brockdepmin",   # min depth to bedrock
    "wtdepannmin",   # min annual water table depth
    "wtdepaprjunmin",
    "slopegraddcp",  # slope gradient, dominant
    "drclassdcd",    # drainage class, dominant
    "hydgrpdcd",     # hydrologic group, dominant
    "flodfreqdcd",   # flooding frequency, dominant
    "aws0100wta",    # available water storage 0-100 cm
    "aws0150wta"     # available water storage 0-150 cm
]

df_muagg_sub = df_muagg[[c for c in mu_cols if c in df_muagg.columns]].copy()
df_muagg_sub = df_muagg_sub.add_prefix("MA_")

# MUKEY got prefixed, rename back that one:
df_muagg_sub = df_muagg_sub.rename(columns={"MA_MUKEY": "MUKEY"})
print(f"  ✓ Prepared {len(df_muagg_sub.columns)} muaggatt features in {time.time() - t4:.2f}s")

# ---------------------------------------------------------
# 5. Prepare component table: dominant component per MUKEY
# ---------------------------------------------------------
t5 = time.time()
print("\n[STEP 5] Preparing component-level features (dominant component per mapunit)...")

comp_cols = [
    "MUKEY",
    "COKEY",
    "comppct_r",   # component percent
    "slope_r",
    "drainagecl",
    "hydricrating",
    "hydgrp",
    "corcon",
    "corsteel",
    "soilslippot",
    "frostact",
    "elev_r"
]
df_comp_sub = df_comp[[c for c in comp_cols if c in df_comp.columns]].copy()

# Choose dominant component per MUKEY by highest comppct_r
df_comp_sub = df_comp_sub.sort_values(["MUKEY", "comppct_r"], ascending=[True, False])
df_comp_dom = df_comp_sub.groupby("MUKEY", as_index=False).first()

df_comp_dom = df_comp_dom.add_prefix("CO_")
df_comp_dom = df_comp_dom.rename(columns={"CO_MUKEY": "MUKEY"})  # restore MUKEY
print(f"  ✓ Prepared {len(df_comp_dom)} dominant components in {time.time() - t5:.2f}s")

# ---------------------------------------------------------
# 6. Prepare chorizon: depth-weighted averages over top 100 cm
# ---------------------------------------------------------
t6 = time.time()
print("\n[STEP 6] Preparing horizon-level aggregated features (0–100 cm)...")

ch_cols = [
    "COKEY",
    "hzdept_r",    # horizon top depth (cm)
    "hzdepb_r",    # horizon bottom depth (cm)
    "sandtotal_r",
    "silttotal_r",
    "claytotal_r",
    "dbovendry_r",
    "ksat_r",
    "awc_r",
    "om_r",
    "ll_r",
    "pi_r",
    "wsatiated_r"
]
df_ch_sub = df_ch[[c for c in ch_cols if c in df_ch.columns]].copy()

# Compute horizon thickness and clip to [0, 100] cm for depth-weighted averages
df_ch_sub["hzthk"] = df_ch_sub["hzdepb_r"] - df_ch_sub["hzdept_r"]

# Clip horizons to 0–100cm.
top, bottom = 0.0, 100.0
df_ch_sub["eff_thk"] = (
    np.minimum(df_ch_sub["hzdepb_r"], bottom) - np.maximum(df_ch_sub["hzdept_r"], top)
).clip(lower=0)

# Keep only horizons that intersect 0–100cm and have positive thickness
df_ch_sub = df_ch_sub[df_ch_sub["eff_thk"] > 0].copy()
print(f"  ✓ Filtered to {len(df_ch_sub)} horizons (0-100cm) in {time.time() - t6:.2f}s")

def depth_weighted(group, col):
    """
    Depth-weighted average of 'col' over eff_thk.
    """
    vals = group[col].to_numpy(dtype=float)
    w = group["eff_thk"].to_numpy(dtype=float)
    mask = ~np.isnan(vals)
    if not mask.any():
        return np.nan
    vals = vals[mask]
    w = w[mask]
    if w.sum() == 0:
        return np.nan
    return np.sum(vals * w) / np.sum(w)

agg_dict = {}
for col in [
    "sandtotal_r",
    "silttotal_r",
    "claytotal_r",
    "dbovendry_r",
    "ksat_r",
    "awc_r",
    "om_r",
    "ll_r",
    "pi_r",
    "wsatiated_r"
]:
    if col in df_ch_sub.columns:
        agg_dict[col] = lambda g, c=col: depth_weighted(g, c)

t_group = time.time()
grouped = df_ch_sub.groupby("COKEY")
print(f"  ✓ Grouped by COKEY ({len(grouped)} groups) in {time.time() - t_group:.2f}s")

rows = []
print("  Aggregating horizon features per COKEY (0–100 cm)...")
for cokey, g in tqdm(grouped, total=len(grouped), desc="  Horizon agg", unit=" COKEY"):
    row = {"COKEY": cokey}
    for col, func in agg_dict.items():
        row[col] = func(g)
    rows.append(row)

df_ch_agg = pd.DataFrame(rows)

# Prefix and join with component-level dominant COKEY later
df_ch_agg = df_ch_agg.add_prefix("CH_")
df_ch_agg = df_ch_agg.rename(columns={"CH_COKEY": "CO_COKEY"})
print(f"  ✓ Horizon aggregation completed in {time.time() - t6:.2f}s total for Step 6")

# ---------------------------------------------------------
# 7. Merge muaggatt + component + chorizon, then attach to bridges
# ---------------------------------------------------------
t7 = time.time()
print("\n[STEP 7] Merging soil tables and attaching to structures...")

# Merge muaggatt and component-dominant on MUKEY
df_soil = gdf_struct_mu.merge(df_muagg_sub, on="MUKEY", how="left")
print(f"  ✓ Merged muaggatt in {time.time() - t7:.2f}s")

t_comp_merge = time.time()
df_soil = df_soil.merge(df_comp_dom, on="MUKEY", how="left")
print(f"  ✓ Merged component in {time.time() - t_comp_merge:.2f}s")

# Merge chorizon aggregates on dominant component COKEY
if "CO_COKEY" in df_soil.columns:
    t_ch_merge = time.time()
    df_soil = df_soil.merge(df_ch_agg, on="CO_COKEY", how="left")
    print(f"  ✓ Merged chorizon in {time.time() - t_ch_merge:.2f}s")
else:
    print("  ✗ Warning: CO_COKEY not found after component merge; horizon features will be missing.")

# Drop geometry for final CSV
df_final = pd.DataFrame(df_soil.drop(columns=["geometry"]))

t_save = time.time()
df_final.to_csv(OUT_CSV, index=False)
print(f"\n✓ Saved gSSURGO-enriched dataset ({len(df_final)} records) in {time.time() - t_save:.2f}s")
print(f"  Location: {OUT_CSV}")

# Final timing summary
total_time = time.time() - t1
print(f"\n{'='*60}")
print(f"TOTAL EXECUTION TIME: {total_time:.2f}s ({total_time/60:.2f} minutes)")
print(f"{'='*60}")



[STEP 1] Loading structure coordinates...
  ✓ Loaded 4914 structures in 0.19s

[STEP 2] Loading gSSURGO polygons and tables...
  Loading MUPOLYGON (this may take a moment with complex geometries)...
  ✗ Error loading MUPOLYGON: 
  ✗ Error loading MUPOLYGON: 


MemoryError: 

In [None]:
import json
import requests
from typing import List, Tuple

# ---------------------------------------------------------
# Alternative: Query SDA over HTTPS (post.rest)
# - Avoids downloading/handling the full gSSURGO geodatabase
# - Returns MUKEY + selected attributes for point locations
# ---------------------------------------------------------
SDA_POST_URL = "https://SDMDataAccess.sc.egov.usda.gov/Tabular/post.rest"

# Columns to return from mapunit/legend
SDA_COLUMNS = [
    "mu.mukey",
    "mu.musym",
    "mu.muname",
    "mu.brockdepmin",
    "mu.wtdepannmin",
    "mu.wtdepaprjunmin",
    "mu.slopegraddcp",
    "mu.drclassdcd",
    "mu.hydgrpdcd",
    "mu.flodfreqdcd",
    "mu.aws0100wta",
    "mu.aws0150wta",
]


def build_point_query(points: List[Tuple[float, float]]) -> str:
    """Build a SQL query to fetch MUKEY + mapunit attributes for (lat, lon) points via SDA macro.
    Uses SDA_Get_Mukey_from_intersect instead of mapunitpoly table.
    """
    # Number the points for a stable pid
    vals_rows = []
    for idx, (lat, lon) in enumerate(points):
        vals_rows.append(
            f"({idx}, {lat}, {lon}, geometry::STPointFromText('POINT({lon} {lat})', 4326))"
        )
    vals_clause = ",\n    ".join(vals_rows)

    cols = ",".join(SDA_COLUMNS)
    sql = f"""
    WITH pts AS (
        SELECT * FROM (VALUES
        {vals_clause}
        ) AS v(pid, lat, lon, geom)
    ),
    hits AS (
        SELECT p.pid, p.lat, p.lon, x.mukey, x.areasymbol
        FROM pts p
        CROSS APPLY SDA_Get_Mukey_from_intersect(p.geom) AS x
    )
    SELECT {cols}, h.areasymbol, h.pid, h.lat, h.lon
    FROM hits h
    JOIN mapunit mu ON mu.mukey = h.mukey
    JOIN legend l ON mu.lkey = l.lkey
    """
    return sql


def sda_query(sql: str) -> pd.DataFrame:
    """Execute an SDA post.rest SQL query and return a DataFrame."""
    payload = {
        "format": "JSON+COLUMNNAME",  # first row = column names
        "query": sql,
    }
    resp = requests.post(SDA_POST_URL, data=payload, timeout=120)
    if not resp.ok:
        # Print server message to help diagnose (often includes SQL error)
        msg = resp.text[:2000]
        raise RuntimeError(f"SDA request failed: HTTP {resp.status_code}\n{msg}")

    data = resp.json()
    if not data:
        return pd.DataFrame()
    # First row = column names
    cols = data[0]
    rows = data[1:]
    return pd.DataFrame(rows, columns=cols)


def fetch_soil_for_points(points: List[Tuple[float, float]], batch_size: int = 200) -> pd.DataFrame:
    """Batch points to stay under SDA limits (<=100k rows) and avoid huge SQL.
    Returns MUKEY + selected attributes for all points.
    """
    results = []
    for i in range(0, len(points), batch_size):
        batch = points[i : i + batch_size]
        sql = build_point_query(batch)
        df_batch = sda_query(sql)
        df_batch["batch_index"] = i // batch_size
        results.append(df_batch)
    if not results:
        return pd.DataFrame()
    return pd.concat(results, ignore_index=True)

# Example usage (small sample):
# sample_points = [(41.58, -93.62), (40.75, -111.88)]  # (lat, lon)
# df_sda = fetch_soil_for_points(sample_points, batch_size=50)
# print(df_sda.head())


In [9]:
import pandas as pd
from typing import List, Tuple

# ---------------------------------------------------------
# Full pipeline via SDA API (no local GDB)
# - Reads structure coordinates CSV
# - Batches points, calls SDA post.rest with SQL
# - Returns MUKEY + mapunit, dominant component, and depth-weighted horizon stats (0–100 cm)
# ---------------------------------------------------------
SDA_BATCH_SIZE = 200  # Tune to stay under SDA limits (<100k rows total)

MU_COLS = [
    "brockdepmin",
    "wtdepannmin",
    "wtdepaprjunmin",
    "slopegraddcp",
    "drclassdcd",
    "hydgrpdcd",
    "flodfreqdcd",
    "aws0100wta",
    "aws0150wta",
]

COMP_COLS = [
    "comppct_r",
    "slope_r",
    "drainagecl",
    "hydricrating",
    "hydgrp",
    "corcon",
    "corsteel",
    "soilslippot",
    "frostact",
    "elev_r",
]

CH_COLS = [
    "sandtotal_r",
    "silttotal_r",
    "claytotal_r",
    "dbovendry_r",
    "ksat_r",
    "awc_r",
    "om_r",
    "ll_r",
    "pi_r",
    "wsatiated_r",
]


def _values_clause(points: List[Tuple[int, float, float]]) -> str:
    rows = []
    for pid, lat, lon in points:
        rows.append(
            f"({pid}, {lat}, {lon}, geometry::STPointFromText('POINT({lon} {lat})', 4326))"
        )
    return ",\n    ".join(rows)


def build_full_sql(points: List[Tuple[int, float, float]]) -> str:
    """SQL to fetch mapunit + dominant component + horizon agg (0–100cm) for given points via SDA macro."""
    vals = _values_clause(points)
    mu_select = ", ".join([f"mu.{c}" for c in MU_COLS])
    comp_select = ", ".join([f"cd.{c}" for c in COMP_COLS])
    ch_select = ", ".join([f"ch.{c} AS ch_{c}" for c in CH_COLS])
    sql = f"""
WITH pts AS (
    SELECT * FROM (VALUES
    {vals}
    ) AS v(pid, lat, lon, geom)
),
hits AS (
    SELECT p.pid, p.lat, p.lon, x.mukey
    FROM pts p
    CROSS APPLY SDA_Get_Mukey_from_intersect(p.geom) AS x
),
comp_dom AS (
    SELECT mukey, cokey, {', '.join(COMP_COLS)},
           ROW_NUMBER() OVER (PARTITION BY mukey ORDER BY comppct_r DESC) AS rn
    FROM component
),
comp1 AS (
    SELECT * FROM comp_dom WHERE rn = 1
),
ch AS (
    SELECT cokey,
           SUM(CASE WHEN eff_thk > 0 THEN sandtotal_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS sandtotal_r,
           SUM(CASE WHEN eff_thk > 0 THEN silttotal_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS silttotal_r,
           SUM(CASE WHEN eff_thk > 0 THEN claytotal_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS claytotal_r,
           SUM(CASE WHEN eff_thk > 0 THEN dbovendry_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS dbovendry_r,
           SUM(CASE WHEN eff_thk > 0 THEN ksat_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS ksat_r,
           SUM(CASE WHEN eff_thk > 0 THEN awc_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS awc_r,
           SUM(CASE WHEN eff_thk > 0 THEN om_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS om_r,
           SUM(CASE WHEN eff_thk > 0 THEN ll_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS ll_r,
           SUM(CASE WHEN eff_thk > 0 THEN pi_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS pi_r,
           SUM(CASE WHEN eff_thk > 0 THEN wsatiated_r * eff_thk END) / NULLIF(SUM(NULLIF(eff_thk,0)),0) AS wsatiated_r
    FROM (
        SELECT cokey,
               CASE
                 WHEN hzdepb_r > 100 THEN 100 ELSE hzdepb_r
               END - CASE
                 WHEN hzdept_r < 0 THEN 0 ELSE hzdept_r
               END AS eff_thk,
               sandtotal_r, silttotal_r, claytotal_r, dbovendry_r, ksat_r, awc_r, om_r, ll_r, pi_r, wsatiated_r
        FROM chorizon
    ) c
    WHERE eff_thk > 0
    GROUP BY cokey
)
SELECT h.pid, h.lat, h.lon, h.mukey,
       {mu_select},
       comp1.cokey, {comp_select},
       {ch_select}
FROM hits h
LEFT JOIN muaggatt mu ON mu.mukey = h.mukey
LEFT JOIN comp1 ON comp1.mukey = h.mukey
LEFT JOIN ch ON ch.cokey = comp1.cokey
"""
    return sql


def fetch_soil_via_sda(struct_csv: str, batch_size: int = SDA_BATCH_SIZE) -> pd.DataFrame:
    """Fetch soil attributes for all structures via SDA post.rest.
    - struct_csv: path to structure_coordinates.csv
    - batch_size: number of points per SQL call (adjust if you hit limits)
    """
    df_struct = pd.read_csv(struct_csv).copy()
    if "COORDINATES" in df_struct.columns:
        df_struct["COORDINATES"] = df_struct["COORDINATES"].apply(lambda x: x if isinstance(x, tuple) else ast.literal_eval(x))
        df_struct["LAT"] = df_struct["COORDINATES"].apply(lambda c: c[0])
        df_struct["LON"] = df_struct["COORDINATES"].apply(lambda c: c[1])
    elif {"LAT", "LON"}.issubset(df_struct.columns):
        pass
    else:
        raise ValueError("Expected COORDINATES tuple column or LAT/LON columns in struct CSV")

    df_struct = df_struct.reset_index().rename(columns={"index": "STRUCT_ID"})

    results = []
    for i in range(0, len(df_struct), batch_size):
        batch = df_struct.iloc[i : i + batch_size]
        pts = list(zip(batch["STRUCT_ID"], batch["LAT"], batch["LON"]))
        sql = build_full_sql(pts)
        df_batch = sda_query(sql)
        results.append(df_batch)

    if not results:
        return pd.DataFrame()

    df_soil = pd.concat(results, ignore_index=True)
    # Join back to original struct rows
    df_struct_out = df_struct.merge(df_soil, left_on="STRUCT_ID", right_on="pid", how="left")
    return df_struct_out

# Example run (small batch):
df_api = fetch_soil_via_sda(STRUCT_COORDS_CSV, batch_size=100)
df_api.to_csv("gssurgo_enriched_via_api.csv", index=False)

RuntimeError: SDA request failed: HTTP 400
<?xml version='1.0' encoding="UTF-8" standalone="no" ?>
<ServiceExceptionReport xmlns="http://www.opengis.net/ogc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/ogc http://schemas.opengis.net/wms/1.1.1/OGC-exception.xsd">
<ServiceException>
Invalid query: Invalid object name &#39;SDA_Get_Mukey_from_intersect&#39;.</ServiceException>
</ServiceExceptionReport>
