In [None]:
import os
import re
import json
import time
import math
import dill
import pandas as pd

from urllib.request import urlopen
import numpy as np

from sqlalchemy import (
    create_engine, Column, Integer, String, Date, MetaData, event, Table, text,
    LargeBinary,  BigInteger, SmallInteger, ForeignKey, Float, inspect
)
import ast
from sqlalchemy.dialects.postgresql import JSON, JSONB  # if not already imported

from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.engine import Engine
from sqlalchemy.exc import ProgrammingError

# If your project structure matches the original, keep these:
from src.helpers import *
from src.dbutils import *
from src.ORMutils import *
from src.models import *
from src.geo import *
from src.pdfutils import *

# Geo / GIS helpers used later
import fiona
import geopandas as gpd
from shapely import wkb, from_wkt  # Shapely 2.x
from geoalchemy2.shape import from_shape

import pprint


In [None]:
with open("environment_data/table_dicts.pkl", "rb") as f:
    env = dill.load(f)
globals().update(env)

In [None]:
pprint.pprint(dataset_info_dict)


* ### Create the database engine that will be used throughout the rest of the notebook.

In [None]:
import getpass
from urllib.parse import quote_plus

# your Linux login must match a PostgreSQL role with rights to create DBs on first run
PGUSER = "jam"  # or: getpass.getuser()
PGPASSWORD = None  # not used with peer auth
PGHOST = "/run/postgresql"  # socket directory on Arch/EndeavourOS; Debian/RHEL often /var/run/postgresql
PGPORT = "5432"   # ignored for sockets but harmless
PGDATABASE = "nyc_data"

# Force socket usage via query param (URL-encode the path just to be safe)
_socket = quote_plus(PGHOST)
POSTGRES_URL = None  # leave None so we can build URLs per DB below

def make_url(dbname: str) -> str:
    return f"postgresql+psycopg2://{PGUSER}@/{dbname}?host={_socket}"

In [None]:

# admin connection (to create DB if missing)
admin_engine = create_engine(make_url("postgres"), future=True)
# target DB connection
engine = create_engine(make_url(PGDATABASE), future=True)

In [None]:
import re
from sqlalchemy import text

SAFE_DB_RE = re.compile(r"^[a-z_][a-z0-9_]*$")  # postgres-friendly
if not SAFE_DB_RE.fullmatch(PGDATABASE):
    raise ValueError(f"Unsafe database name: {PGDATABASE!r}")

with admin_engine.connect() as conn:
    exists = conn.execute(
        text("SELECT 1 FROM pg_database WHERE datname = :d"),
        {"d": PGDATABASE},
    ).scalar()

if not exists:
    # autocommit required for CREATE DATABASE
    with admin_engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
        conn.execute(text(f"CREATE DATABASE {PGDATABASE}"))


In [None]:

# Create engine to the target database
engine = create_engine(
    make_url(PGDATABASE),
    echo=False,
    future=True,
    # Keep executemany behavior similar to your original bulk inserts.
    # (No PG-specific fast paths enabled unless you ask for them.)
)

SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)

# Ensure PostGIS is available
with engine.begin() as conn:
    conn.execute(text("CREATE EXTENSION IF NOT EXISTS postgis"))

* #### Load the objects created in previous notebooks

In [None]:
metadata = MetaData()
Base.metadata.reflect(bind=engine)

* ### Create lookup tables variables identified as categorical and for which definitions were extracted from the metadata in the previous notebook.

* There are borough codes in the PLUTO dataset, but annyoingly, in contrast to most other datasets, the borough code is a two letter inital like "BK" or "BX". Also in the PLUTO dataset, "Sanitation Borough" does use the standard numeric codes that most other NYC OpenData datasets use. All this is is to say that it requires special handling separate from my system to extract categories and create lookup tables for them programatically.

In [None]:
multicolumns = {'zoning_district': 4, 'commercial_overlay': 2, 'special_purpose_district': 3}

for dataset in dataset_info_dict.values():
    for name, repetitions in multicolumns.items():
        for k in dataset.col_customizations.keys():
            if dataset.col_customizations[k].new_name is None:
                dataset.col_customizations[k].new_name = dataset.col_customizations[k].short_name
        _ = {k: v for k, v in dataset.col_customizations.items()
             if dataset.col_customizations[k].new_name.startswith(name)}
        _ = [v for k, v in dataset.col_customizations.items()
             if dataset.col_customizations[k].new_name.endswith("_1")]

## Import the MaPLUTO data:
* List the layers in the file
* In this case there is only one layer, so it isn't necessary to know and specify which one to import, but including anyway for future reference.

In [None]:
pluto_version = "25v2_1"
gdb_path = f"{PROJECT_DATA}/files_to_use/MapPLUTO{pluto_version}.gdb"

geodata = {}
layers = fiona.listlayers(gdb_path)
for layer in layers:
    gdf_layer = gpd.read_file(gdb_path, layer=layer)
    try:
        gdf_layer['wkb'] = gdf_layer['geometry'].apply(lambda geom: geom.wkb if geom else None)
    except KeyError:
        pass
    geodata[layer] = gdf_layer

gdf = geodata[f'MapPLUTO_{pluto_version}_clipped']

In [None]:
def is_whole_number_series(s: pd.Series) -> bool:
    # Your helper might already implement this; fallback:
    try:
        notna = s.notna()
        return (notna & ((s[notna] % 1) == 0)).all()
    except Exception:
        return False

for col in gdf.columns:
    if gdf[col].dtype == float and is_whole_number_series(gdf[col]):
        gdf[col] = gdf[col].astype('Int64')

inspector = inspect(engine)

col_customization_dict = dataset_info_dict['mapPLUTO'].col_customizations
rename_mappings = {v.short_name: v.new_name for v in col_customization_dict.values()}
gdf = gdf.rename(columns=rename_mappings)

more_mappings = {
    "HealthCenterDistrict": "health_center_district",
    "SanitDistrict": "sanitation_district_number",
    "Sanitboro": "sanitation_district_boro",
    "FIRM07_FLAG": "2007_flood_insurance_rate_map_indicator",
    "PFIRM15_FLAG": "2015_preliminary_flood_insurance_rate_map",
}
gdf = gdf.rename(columns=more_mappings)

# after you load dataset_info_dict and before create_dynamic_table_class() / create_all()
for k, v in col_customization_dict.items():
    if v.new_name == "version_number":
        v.dtype = "String"   # ensure TEXT in Postgres

In [None]:
def map_custom_dtype(dtype):
    if dtype == 'Integer':
        return Integer
    elif dtype == 'String':
        return String
    elif dtype == 'Float':
        return Float
    elif dtype == 'Date':
        return Date
    elif dtype == 'LargeBinary':
        return LargeBinary
    else:
        raise ValueError(f"Unsupported dtype: {dtype}")

def create_dynamic_table_class(table_name, col_customization_dict):
    # IMPORTANT: Use LargeBinary for 'geometry' because you actually store WKB bytes there.
    attrs = {
        '__tablename__': table_name,
        'id': Column(Integer, primary_key=True, autoincrement=True),
        'geometry': Column(LargeBinary),  # store WKB bytes (BYTEA)
        'wkb': Column(LargeBinary),       # kept for parity with original schema
        'Shape_Leng': Column(Float),
        'Shape_Area': Column(Float),
        '__table_args__': {'extend_existing': True}
    }
    for k, v in col_customization_dict.items():
        if any(name for name in multicolumns if name in k):
            k = re.sub('_[0-9]$', '', k)
        col_type = map_custom_dtype(v.dtype)
        attrs[v.new_name] = Column(col_type)
    return type(table_name, (Base,), attrs)

MapPLUTO_Clipped = create_dynamic_table_class(f'MapPLUTO_{pluto_version}_clipped', col_customization_dict)
Base.metadata.create_all(engine)

In [None]:
import math, re

def normalize_zip(v):
    if v is None: 
        return None
    # floats/ints -> 5-digit string
    if isinstance(v, (int, float)) and not (isinstance(v, float) and math.isnan(v)):
        return f"{int(v):05d}"
    s = str(v).strip()
    if s == "": 
        return None
    # '10013.0' -> '10013'
    m = re.fullmatch(r"\d+(?:\.0+)?", s)
    if m:
        return f"{int(float(s)):05d}"
    # '10013-1234' or '10013 1234' -> '10013-1234'
    m = re.fullmatch(r"(\d{5})[-\s]?(\d{4})", s)
    if m:
        return f"{m.group(1)}-{m.group(2)}"
    # anything else: leave as-is (rare)
    return s


batch_size = 100000
with SessionLocal() as session:
    for start in range(0, len(gdf), batch_size):
        batch = gdf.iloc[start:start + batch_size]
        for idx, row in batch.iterrows():
            try:
                if 'apportionment_date' in row and row['apportionment_date']:
                    row['apportionment_date'] = parseDateString(row['apportionment_date'])
                for col in gdf.columns:
                    # print(f"Processing column: {col}")
                    val = row[col]
                    # print(f"Value: {val}, Type: {type(val)}")
                    if isinstance(val, pd.Series):
                        try:
                            first_value = val.iloc[0]
                            row[col] = first_value
                        except Exception as e:
                            print(f"Error processing Series in column {col} at row {idx}: {e}")
                    if pd.isna(val):
                        row[col] = None
                geometry_wkb = row['geometry'].wkb if row['geometry'] is not None else None
                # in your insert loop before constructing MapPLUTO_Clipped(...)
                if 'zip_code' in row:
                    row['zip_code'] = normalize_zip(row['zip_code'])
                pluto_entry = MapPLUTO_Clipped(
                    geometry=geometry_wkb,
                    **{col: row[col] for col in gdf.columns if col != 'geometry'}
                )
                session.add(pluto_entry)
            except Exception as e:
                print(f"Error at row index {idx}")
                for col in gdf.columns:
                    try:
                        print(f"Column: {col}, Value: {row[col]}, Type: {type(row[col])}")
                    except Exception as sub_e:
                        print(f"Error printing column {col}: {sub_e}")
                raise e
        session.commit()

In [None]:
pd.set_option('display.max_columns', None)
print(gdf[gdf.eq("E-61").any(axis=1)])


In [None]:
from shapely import wkb
import pandas as pd

df = pd.read_sql('SELECT zip_code, geometry FROM "MapPLUTO_25v2_1_clipped"', engine)

def to_geom(val):
    if val is None:
        return None
    # psycopg2 often returns memoryview for bytea
    if isinstance(val, memoryview):
        return wkb.loads(val.tobytes())
    if isinstance(val, (bytes, bytearray)):
        return wkb.loads(bytes(val))
    # if somehow a hex text like "\x010203..." sneaks through:
    if isinstance(val, str) and val.startswith("\\x"):
        return wkb.loads(bytes.fromhex(val[2:]))
    raise TypeError(f"Unexpected geometry value type: {type(val)}")

df["geometry"] = df["geometry"].apply(to_geom)
gdf_map = gpd.GeoDataFrame(df, geometry="geometry")


In [None]:
# merged_gdf = gdf_map.dissolve(by='zip_code', aggfunc={'zip_code': 'first'})

In [None]:
import geopandas as gpd
import pandas as pd
from shapely import wkb
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import networkx as nx
from sqlalchemy import create_engine, event, text

# Read the data from the database
# query = f"SELECT zip_code, geometry FROM MapPLUTO_{pluto_version}_clipped"
# df = pd.read_sql(query, engine)

# Debug: Print the DataFrame columns
print("DataFrame columns:", df.columns)

# Convert the geometry column from WKB to Shapely geometries
# df['geometry'] = df['geometry'].apply(lambda x: wkb.loads(x) if x else None)

# Convert the DataFrame to a GeoDataFrame
gdf = gpd.GeoDataFrame(df, geometry='geometry')

# Print the GeoDataFrame
print(gdf.head())

# Ensure that zip_code is preserved during the dissolve process
merged_gdf = gdf.dissolve(by='zip_code', aggfunc={'zip_code': 'first'})  # Explicit aggregation of zip_code

# Check if zip_code is now present after dissolving
print(merged_gdf.columns)  # Should include 'zip_code'

# Create a new adjacency graph based on the merged geometries
G = nx.Graph()

# Add nodes and edges based on adjacency of merged shapes
for i, shape1 in merged_gdf.iterrows():
    for j, shape2 in merged_gdf.iterrows():
        if i != j and shape1.geometry.touches(shape2.geometry):
            G.add_edge(i, j)

# Perform graph coloring to ensure adjacent shapes don't share the same color
color_map = nx.coloring.greedy_color(G, strategy="largest_first")

# Plot the map with the colors assigned
fig, ax = plt.subplots(1, 1, figsize=(10, 10))

# Normalize the color map to cover the full range of the node indices
norm = mcolors.Normalize(vmin=min(color_map.values()), vmax=max(color_map.values()))
sm = plt.cm.ScalarMappable(cmap=plt.cm.tab20, norm=norm)

# Color the merged geometries based on the graph coloring using the full palette
merged_gdf['color'] = merged_gdf.index.map(color_map)
merged_gdf.plot(ax=ax, color=[sm.to_rgba(i) for i in merged_gdf['color']], edgecolor='black', linewidth=0, legend=False)

# Add labels at the center of each merged shape
for _, row in merged_gdf.iterrows():
    centroid = row.geometry.centroid
    ax.text(centroid.x, centroid.y, str(row['zip_code']), fontsize=2, ha='center', va='center')

# Add a colorbar to visualize the full range of colors
cbar = fig.colorbar(sm, ax=ax)
cbar.set_label('Color Range (Graph Coloring)', rotation=270, labelpad=20)

plt.savefig(f"{PROJECT_DATA}/figures/map_output_zip_shuffled2.pdf", format="pdf")

plt.show()

In [None]:
def _pg_int_bounds(sqlatype):
    # Map SQL types to numpy bounds
    if isinstance(sqlatype, SmallInteger):
        return np.iinfo(np.int16).min, np.iinfo(np.int16).max
    elif isinstance(sqlatype, BigInteger):
        return np.iinfo(np.int64).min, np.iinfo(np.int64).max
    else:
        # Default Integer in Postgres is 32-bit
        return np.iinfo(np.int32).min, np.iinfo(np.int32).max

def enforce_integer_bounds(df, DynamicTable):
    """
    For each INTEGER-like column in DynamicTable, coerce df[col] to numeric and
    replace out-of-range values with None. Logs offenders for inspection.
    """
    for col in df.columns:
        if col not in DynamicTable.__table__.columns:
            continue
        col_type = DynamicTable.__table__.columns[col].type

        # Only care about integer-ish SQL column types
        if not isinstance(col_type, (SmallInteger, Integer, BigInteger)):
            continue

        lo, hi = _pg_int_bounds(col_type)

        # Coerce to numeric safely; keep strings like "7" working
        s = pd.to_numeric(df[col], errors="coerce")  # floats OK here
        # If column is conceptually integer, store as pandas nullable Int64 to preserve NULLs
        # but do comparisons on the coerced float/int series 's'
        mask_oob = (s.notna()) & ((s < lo) | (s > hi))

        if mask_oob.any():
            bad = df.loc[mask_oob, col]
            # Show a small sample for debugging
            sample_vals = bad.head(5).tolist()
            print(
                f"❗ Out-of-range detected in integer column '{col}' "
                f"({len(bad)} rows). Bounds [{lo}, {hi}]. Sample: {sample_vals}"
            )
            # Null them out so insert won't fail
            df.loc[mask_oob, col] = None

        # Final cast to pandas nullable integer if column is nullable; else leave as numeric
        # (Your pipeline already handles nullable detection; this is safe either way.)
        try:
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")
        except Exception:
            # If the column had non-integer strings, keep as object; the DB will cast or fail elsewhere
            pass

from sqlalchemy.exc import DataError

def insert_with_diagnosis(conn, insert_stmt, batch_rows, key_hint='sid'):
    try:
        # conn.execute(insert_stmt, batch_rows)
        insert_with_diagnosis(conn, insert_stmt, batch_rows, key_hint='sid')
        return
    except DataError as e:
        print("❌ Batch insert failed; diagnosing offending rows…", e)
        lo, hi = 0, len(batch_rows)
        # binary search the smallest failing prefix
        while lo + 1 < hi:
            mid = (lo + hi) // 2
            try:
                conn.execute(insert_stmt, batch_rows[:mid])
                lo = mid
            except DataError:
                hi = mid
        bad = batch_rows[lo:hi]  # minimal failing slice
        print(f"🚩 Offending rows count: {len(bad)}")
        if key_hint and key_hint in bad[0]:
            print(f"Example offending {key_hint}: {bad[0][key_hint]}")
        # You can drop or sanitize and retry:
        # conn.execute(insert_stmt, batch_rows[:lo] + batch_rows[hi:])
        raise


In [None]:
def enforce_integer_bounds(df, DynamicTable):
    """
    For each INTEGER-like column in DynamicTable, coerce df[col] to numeric and
    replace out-of-range values with None. Logs offenders for inspection.
    """
    for col in df.columns:
        if col not in DynamicTable.__table__.columns:
            continue
        col_type = DynamicTable.__table__.columns[col].type

        # Only care about integer-ish SQL column types
        if not isinstance(col_type, (SmallInteger, Integer, BigInteger)):
            continue

        lo, hi = _pg_int_bounds(col_type)

        # Coerce to numeric safely; keep strings like "7" working
        s = pd.to_numeric(df[col], errors="coerce")  # floats OK here
        # If column is conceptually integer, store as pandas nullable Int64 to preserve NULLs
        # but do comparisons on the coerced float/int series 's'
        mask_oob = (s.notna()) & ((s < lo) | (s > hi))

        if mask_oob.any():
            bad = df.loc[mask_oob, col]
            # Show a small sample for debugging
            sample_vals = bad.head(5).tolist()
            print(
                f"❗ Out-of-range detected in integer column '{col}' "
                f"({len(bad)} rows). Bounds [{lo}, {hi}]. Sample: {sample_vals}"
            )
            # Null them out so insert won't fail
            df.loc[mask_oob, col] = None

        # Final cast to pandas nullable integer if column is nullable; else leave as numeric
        # (Your pipeline already handles nullable detection; this is safe either way.)
        try:
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")
        except Exception:
            # If the column had non-integer strings, keep as object; the DB will cast or fail elsewhere
            pass


In [None]:
# import jsonlines
# import orjson

datatype_mappings = {"meta_data": String, "calendar_date": Date, "number": Float, "text": String, "point": String}

def split_address(row_data, address_col):
    if address_col in row_data.keys():
        if row_data[address_col] is None:
            row_data["building_num"], row_data["street_name"] = None, None
        else:
            if row_data[address_col] and row_data[address_col][0].isdigit():
                try:
                    addr = row_data[address_col].split(" ", 1)
                    if len(addr) == 1:
                        addr = [None] + addr
                    row_data["building_num"], row_data["street_name"] = addr
                except Exception as e:
                    print(e, row_data[address_col])
    return row_data

def convert_wkt(rows_to_insert):
    raw_wkts = [r.get('_raw_geocoded_column') for r in rows_to_insert]
    try:
        shapely_geoms = from_wkt(raw_wkts)
    except Exception as e:
        print(f"Error converting batch geometry: {e}")
        shapely_geoms = [None] * len(rows_to_insert)
    geoms = [from_shape(geom, srid=4326) if geom is not None else None for geom in shapely_geoms]
    for r, geom in zip(rows_to_insert, geoms):
        r['geocoded_column'] = geom
        r.pop('_raw_geocoded_column', None)

def insert_dataset(engine: Engine, dataset, jsonfile, columns, batch_size=100000):
    import io
    import jsonlines
    import orjson
    import pandas as pd
    from sqlalchemy import Date  # ensure Date is in scope
    # ensure textClean is imported from your helpers at module level
    from shapely.geometry import shape as shapely_from_mapping
    from shapely import from_wkt as shapely_from_wkt

    # --- create the target table once and reuse ---
    DynamicTable = create_table_for_dataset(
        columns=dataset.col_types,   # your dict of logical dtypes
        prefix=dataset.short_name,   # table name
        engine=engine
    )

    col_names = list(columns.keys())
    expected_width = len(col_names)
    rows_buffer = []
    geom_cols = []  # (col, srid)

    def custom_loads(s):
        return orjson.loads(s.encode("utf-8"))

    def normalize_rows(rows, col_names):
        out = []
        for r in rows:
            if isinstance(r, list):
                out.append({col_names[i]: (r[i] if i < len(col_names) else None) for i in range(len(col_names))})
            elif isinstance(r, dict):
                out.append(r)
            else:
                out.append(r)
        return out

    from geoalchemy2.types import Geometry as GA2Geometry  # add with other shapely imports

    def detect_geometry_columns(col_types):
        found = []
        for col, typ in col_types.items():
            # 1) Handle real Geometry types
            if isinstance(typ, GA2Geometry):
                found.append((col, getattr(typ, "srid", 4326) or 4326))
                continue
            # 2) Handle string-y declarations (e.g., "multipolygon")
            t = str(typ).lower()
            if t in ("multipolygon", "polygon", "point", "multilinestring", "linestring", "multipoint"):
                found.append((col, 4326))
        # 3) Fallback by common name if nothing detected (covers Socrata exports)
        if not found and "the_geom" in col_types:
            print("ℹ️ No declared geometry type found; treating 'the_geom' as geometry (SRID 4326).")
            found.append(("the_geom", 4326))
        return found


    # def geom_to_wkt(raw):
    #     if raw in (None, "", "NULL"):
    #         return None
    #     try:
    #         if isinstance(raw, dict) and "type" in raw and "coordinates" in raw:
    #             shp = shapely_from_mapping(raw)
    #         elif isinstance(raw, str):
    #             s = raw.strip()
    #             if not s:
    #                 return None
    #             if s.upper().startswith("SRID=") and ";" in s:
    #                 s = s.split(";", 1)[1]
    #             shp = shapely_from_wkt(s)
    #         else:
    #             return None
    #         return shp.wkt if shp is not None else None
    #     except Exception:
    #         return None

    def geom_to_wkt(raw):
        # Treat obvious numerics as "no geometry" to avoid parse errors
        if raw is None:
            return None
        if isinstance(raw, (int, float)):
            return None
        if isinstance(raw, str):
            s = raw.strip()
            if not s or s.upper() == "NULL":
                return None
            # Numeric-looking string? bail out.
            try:
                float(s)
                return None
            except Exception:
                pass
            # Strip optional SRID prefix
            if s.upper().startswith("SRID=") and ";" in s:
                s = s.split(";", 1)[1]
            try:
                shp = shapely_from_wkt(s)
                return f"SRID=4326;{shp.wkt}"
            except Exception:
                return None
        if isinstance(raw, dict) and "type" in raw and "coordinates" in raw:
            try:
                shp = shapely_from_mapping(raw)
                return f"SRID=4326;{shp.wkt}"
            except Exception:
                return None
        return None

    _WKT_RX = re.compile(
        r'(?i)(?:SRID=\d+;)?\s*(?:MULTI(?:POINT|LINESTRING|POLYGON)|POINT|LINESTRING|POLYGON)\s*\('
        )

    def looks_like_wkt_series(s: pd.Series) -> float:
        sample = s.dropna().astype(str).head(200)
        if sample.empty:
            return 0.0
        return sample.str.contains(_WKT_RX, na=False).mean()


    def looks_like_geojson_series(s: pd.Series) -> float:
        """Return ratio of values that look like GeoJSON dicts with type/coordinates."""
        vals = s.dropna().head(200).tolist()
        if not vals:
            return 0.0
        hits = 0
        for v in vals:
            if isinstance(v, dict) and "type" in v and "coordinates" in v:
                hits += 1
        return hits / len(vals)

    def is_mostly_numeric_series(s: pd.Series) -> float:
        sample = s.dropna().astype(str).head(200)
        if sample.empty:
            return 0.0
        def _num(x):
            try:
                float(x)
                return True
            except Exception:
                return False
        return sample.apply(_num).mean()

    def detect_json_columns(col_types):
        cols = []
        for col, typ in col_types.items():
            # SQLAlchemy types
            if isinstance(typ, (JSON, JSONB)):
                cols.append(col)
                continue
            # String-y declarations like "json"/"jsonb"
            t = str(typ).lower()
            if t in ("json", "jsonb"):
                cols.append(col)
        return cols

    def _coerce_socrata_location(value):
        """
        Socrata sometimes provides location as:
        [human_address_json_string, latitude, longitude, <unused>, needs_recoding_bool]
        Convert to a single dict.
        """
        if not isinstance(value, list):
            return value
        if len(value) >= 3:
            human = value[0]
            # human_address may itself be a JSON string
            if isinstance(human, str):
                try:
                    human = orjson.loads(human)
                except Exception:
                    pass
            out = {
                "human_address": human,
                "latitude": value[1],
                "longitude": value[2],
            }
            if len(value) >= 5:
                out["needs_recoding"] = bool(value[4])
            return out
        return value

    # add with your imports if not already present
    import ast
    from sqlalchemy.dialects.postgresql import JSON, JSONB  # you already used these in detect_json_columns

    def detect_json_like_columns_in_df(df: pd.DataFrame, declared_json_cols: list[str]) -> list[str]:
        """
        Return columns that either:
        - were declared JSON/JSONB, or
        - contain dict/list objects, or
        - are strings that look like JSON/Python-lists that we can parse.
        """
        cols = set(declared_json_cols)

        # Heuristic: common Socrata name
        if "location" in df.columns:
            cols.add("location")

        sample_n = 100
        for c in df.columns:
            s = df[c].dropna().head(sample_n)
            if s.empty:
                continue
            # already Python dict/list?
            if any(isinstance(v, (dict, list)) for v in s):
                cols.add(c)
                continue
            # string that looks like JSON / Python list?
            looks_like = s.astype(str).str.strip().str.startswith(("{", "[")).mean() > 0.2
            if looks_like:
                # prove we can parse at least one value
                ok = False
                for v in s.astype(str):
                    t = v.strip()
                    try:
                        orjson.loads(t)
                        ok = True
                        break
                    except Exception:
                        try:
                            _ = ast.literal_eval(t)
                            ok = True
                            break
                        except Exception:
                            pass
                if ok:
                    cols.add(c)
        return list(cols)


    def normalize_json_value(v):
        if v is None or (isinstance(v, str) and not v.strip()):
            return None
        py = v
        # Parse strings into Python values
        if isinstance(v, str):
            s = v.strip()
            try:
                py = orjson.loads(s)
            except Exception:
                try:
                    # Handle Python reprs like "['...', None, False]"
                    py = ast.literal_eval(s)
                except Exception:
                    # Leave as plain string
                    py = s
        # Fix Socrata location list -> dict
        py = _coerce_socrata_location(py)
        # Dump to proper JSON text (double quotes, true/false/null)
        try:
            return orjson.dumps(py).decode("utf-8")
        except Exception:
            return None


    def ensure_geometry_columns(df: pd.DataFrame, geom_cols: list[tuple[str,int]]) -> pd.DataFrame:
        """
        Guarantee that each declared geometry column contains EWKT or NULL.
        If a geometry column is numeric/malformed, try to source geometry from
        another column that looks like WKT/GeoJSON. Convert to EWKT via geom_to_wkt.
        """
        # 1) Try to fix obvious misalignment by copying from the best-looking geometry column
        candidates = []
        for c in df.columns:
            wkt_ratio = looks_like_wkt_series(df[c])
            gj_ratio  = looks_like_geojson_series(df[c])
            if (wkt_ratio >= 0.2) or (gj_ratio >= 0.2):
                candidates.append((c, wkt_ratio, gj_ratio))
        # Rank candidates by (geojson or wkt ratio), favor WKT a little
        candidates.sort(key=lambda x: (max(x[1], x[2]), x[1]), reverse=True)

        for gcol, _srid in geom_cols:
            if gcol not in df.columns:
                # If missing, create and fill from best candidate if any
                src = candidates[0][0] if candidates else None
                df[gcol] = df[src] if src else None
                if src:
                    print(f"🔧 geometry column '{gcol}' missing — copying from '{src}'")
                continue

            num_ratio = is_mostly_numeric_series(df[gcol])
            wkt_ratio = looks_like_wkt_series(df[gcol])
            gj_ratio  = looks_like_geojson_series(df[gcol])

            if num_ratio > 0.7 and max(wkt_ratio, gj_ratio) < 0.2:
                # very likely mis-mapped; try best candidate
                if candidates:
                    src = candidates[0][0]
                    if src != gcol:
                        print(f"🔧 geometry misalignment: mapping '{src}' → '{gcol}'")
                        df[gcol] = df[src]

            # Finally: convert whatever is in gcol to EWKT (or None)
            df[gcol] = df[gcol].apply(geom_to_wkt)

            # Fail-fast guard: no numeric leftovers
            after_num_ratio = is_mostly_numeric_series(df[gcol])
            if after_num_ratio > 0.0:
                # Nuke numeric stragglers to NULL so COPY can't choke
                def _clean_num_to_none(v):
                    if v is None:
                        return None
                    if isinstance(v, (int, float)):
                        return None
                    if isinstance(v, str):
                        try:
                            float(v.strip())
                            return None
                        except Exception:
                            return v
                    return v
                df[gcol] = df[gcol].apply(_clean_num_to_none)

        return df


    def relink_geometry_sources(df: pd.DataFrame, geom_cols: list[str]) -> pd.DataFrame:
        """If a declared geometry column is numeric/malformed, try to find the real geometry column in df."""
        for gcol, _srid in geom_cols:
            if gcol not in df.columns:
                continue
            col = df[gcol]
            # If this geometry column is numeric for most rows, it's likely mis-mapped
            numeric_ratio = pd.to_numeric(col, errors="coerce").notna().mean() if len(col) else 0.0
            looks_like_wkt = looks_like_wkt_series(col)
            looks_like_geojson = looks_like_geojson_series(col)

            if numeric_ratio > 0.7 and not looks_like_wkt and not looks_like_geojson:
                # try to discover a better source
                candidates = []
                for c in df.columns:
                    if c == gcol:
                        continue
                    if looks_like_wkt_series(df[c]) or looks_like_geojson_series(df[c]):
                        candidates.append(c)
                if candidates:
                    src = candidates[0]
                    print(f"🔧 Detected geometry misalignment: mapping '{src}' → '{gcol}'")
                    df[gcol] = df[src]
        return df


    # def process_batch(df, geom_cols):
    #     df = df.where(pd.notnull(df), None)

    #     # Sanitize strings
    #     for col in df.columns:
    #         if df[col].dtype == object:
    #             df[col] = df[col].apply(lambda x: textClean(x) if isinstance(x, str) else x)

    #     # Dates → date objects
    #     datetime_cols = [key for key, typ in columns.items() if typ is Date]
    #     for col in datetime_cols:
    #         if col in df.columns:
    #             df[col] = pd.to_datetime(df[col], errors="coerce")
    #             df[col] = df[col].apply(lambda x: x.date() if pd.notnull(x) else None)

    #     # Geometry → WKT (PostGIS can parse text into geometry columns)
    #     for col, _srid in geom_cols:
    #         if col in df.columns:
    #             df[col] = df[col].apply(geom_to_wkt)
    #     return df

    def process_batch(df, geom_cols, json_cols):
        df = df.where(pd.notnull(df), None)

        # Sanitize strings
        NUMERICISH_COLS = {"latitude", "longitude", "lat", "lon", "lng", "x", "y"}

        for col in df.columns:
            if df[col].dtype == object:
                # If this column is mostly numeric, convert to numeric and skip textClean
                numeric_ratio = pd.to_numeric(df[col], errors="coerce").notna().mean()
                if col.lower() in NUMERICISH_COLS or numeric_ratio >= 0.8:
                    df[col] = pd.to_numeric(df[col], errors="coerce")
                else:
                    df[col] = df[col].apply(lambda x: textClean(x) if isinstance(x, str) else x)

        # Dates → date objects
        datetime_cols = [key for key, typ in columns.items() if typ is Date]
        for col in datetime_cols:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors="coerce")
                df[col] = df[col].apply(lambda x: x.date() if pd.notnull(x) else None)

        # 🔎 Try to fix obvious geometry mis-mapping before conversion
        df = relink_geometry_sources(df, geom_cols)

        # Geometry → EWKT text (explicit SRID)
        for col, _srid in geom_cols:
            if col in df.columns:
                df[col] = df[col].apply(geom_to_wkt)
          
        # Normalize JSON columns to valid JSON text
        for col in json_cols:
            if col in df.columns:
                df[col] = df[col].apply(normalize_json_value)

        json_like_cols = detect_json_like_columns_in_df(df, json_cols)
        for col in json_like_cols:
            if col in df.columns:
                df[col] = df[col].apply(normalize_json_value)

        return df

    def verify_batch(engine, table_like, sid_series, sid_col="sid", sample_missing=10):
        import io
        import pandas as pd

        table = getattr(table_like, "__table__", table_like)
        tgt_schema = getattr(table, "schema", None)
        tgt_table = f'"{tgt_schema}"."{table.name}"' if tgt_schema else f'"{table.name}"'

        # Build a one-column CSV buffer for fast COPY
        buf = io.StringIO()
        pd.Series(sid_series, name="sid").to_csv(buf, index=False, header=False)
        buf.seek(0)

        tmp = "tmp_expected_sids"

        conn = engine.raw_connection()
        try:
            cur = conn.cursor()

            # NOTE: no "ON COMMIT DROP"
            cur.execute(f'CREATE TEMP TABLE {tmp} (sid text);')

            # Copy the SIDs into the temp table (same transaction)
            cur.copy_expert(f"COPY {tmp} (sid) FROM STDIN WITH (FORMAT CSV)", buf)

            # Now run the comparisons BEFORE any commit
            cur.execute(f"SELECT COUNT(*) FROM {tmp};")
            expected = cur.fetchone()[0]

            cur.execute(f'SELECT COUNT(*) FROM {tgt_table} t JOIN {tmp} x ON t."{sid_col}" = x.sid;')
            found = cur.fetchone()[0]

            missing = []
            if found != expected:
                cur.execute(
                    f'''
                    SELECT x.sid
                    FROM {tmp} x
                    LEFT JOIN {tgt_table} t ON t."{sid_col}" = x.sid
                    WHERE t."{sid_col}" IS NULL
                    LIMIT %s;
                    ''',
                    (sample_missing,)
                )
                missing = [r[0] for r in cur.fetchall()]

            print(f"🔍 Batch verify: expected={expected}, found={found}, missing={expected - found}")
            if missing:
                print("   e.g. missing SIDs:", missing)

            # Clean up and commit at the end
            cur.execute(f"DROP TABLE IF EXISTS {tmp};")
            conn.commit()
            return expected, found, missing

        finally:
            try:
                cur.close()
            except Exception:
                pass
            conn.close()


    def enforce_integer_compat(df, table_class):
        """Coerce whole-number floats/strings to Int64 for integer/bigint columns.
        If fractional values exist in an integer column, raise with examples.
        """
        int_cols = [
            c.name for c in table_class.__table__.columns
            if isinstance(getattr(c, "type", None), (Integer, BigInteger)) and c.name != "id"
        ]
        for c in int_cols:
            if c not in df.columns:
                continue
            s = pd.to_numeric(df[c], errors="coerce")

            # detect true fractional values
            frac_mask = s.notna() & ((s % 1) != 0)
            if frac_mask.any():
                examples = s[frac_mask].head(5).tolist()
                raise ValueError(
                    f"Column '{c}' is INTEGER/BIGINT but has fractional values, e.g. {examples}. "
                    f"Either clean your data or change the column to NUMERIC(14,2) (recommended for money-like fields)."
                )

            # coerce whole numbers (including '85392.00') to nullable int
            df[c] = s.astype("Int64")
        return df

    # def copy_batch(engine, df, table_class):
    #     # Build the column list from the actual table (exclude serial PKs like "id")
    #     copy_cols = [c.name for c in table_class.__table__.columns if c.name != "id"]

    #     # Ensure all COPY columns exist in the DF (add missing as None), then order
    #     for cname in copy_cols:
    #         if cname not in df.columns:
    #             df[cname] = None
    #     df = df[copy_cols]

    #     buf = io.StringIO()
    #     df.to_csv(buf, index=False, header=False, na_rep="\\N")
    #     buf.seek(0)

    #     conn = engine.raw_connection()  # not a context manager
    #     try:
    #         cur = conn.cursor()
    #         # Qualify table if you set a schema in __table_args__ on the class
    #         tname = table_class.__tablename__
    #         if getattr(table_class.__table__, "schema", None):
    #             tname = f'"{table_class.__table__.schema}"."{tname}"'
    #         else:
    #             tname = f'"{tname}"'

    #         collist_sql = ", ".join(f'"{c}"' for c in copy_cols)
    #         cur.copy_expert(
    #             f"COPY {tname} ({collist_sql}) FROM STDIN WITH (FORMAT CSV, NULL '\\N')",
    #             buf
    #         )
    #         conn.commit()
    #     finally:
    #         try:
    #             cur.close()
    #         except Exception:
    #             pass
    #         conn.close()

    def copy_batch(engine, df, table_like):
        """
        COPY df -> table_like. Accepts either a Declarative class (with __table__)
        or a plain sqlalchemy.Table.
        """
        import io

        # Resolve to a sqlalchemy.Table
        table = getattr(table_like, "__table__", table_like)
        if not hasattr(table, "name"):
            raise TypeError("copy_batch expected a Declarative class or sqlalchemy.Table")

        # Build fully-qualified table name
        name = table.name
        schema = getattr(table, "schema", None)
        if schema:
            fqtn = f'"{schema}"."{name}"'
        else:
            fqtn = f'"{name}"'

        # Columns to COPY (skip autoincrement PKs like 'id' if present)
        copy_cols = [c.name for c in table.columns if c.name != "id"]

        # Ensure df has all COPY columns and in correct order
        for col in copy_cols:
            if col not in df.columns:
                df[col] = None
        df = df[copy_cols]

        # Prepare CSV buffer (NULL -> \N for COPY)
        buf = io.StringIO()
        df.to_csv(buf, index=False, header=False, na_rep="\\N")
        buf.seek(0)

        # Execute COPY
        conn = engine.raw_connection()
        try:
            cur = conn.cursor()
            collist_sql = ", ".join(f'"{c}"' for c in copy_cols)
            sql = f"COPY {fqtn} ({collist_sql}) FROM STDIN WITH (FORMAT CSV, NULL '\\N')"
            cur.copy_expert(sql, buf)
            conn.commit()
        finally:
            try:
                cur.close()
            except Exception:
                pass
            conn.close()


    # Detect geometry columns once
    geom_cols = detect_geometry_columns(dataset.col_types)
    # Detect JSON columns once
    json_cols = detect_json_columns(dataset.col_types)


    # --- Main loop ---
    with jsonlines.open(jsonfile, mode="r", loads=custom_loads) as reader:
        batch_start = time.perf_counter()
        for idx, row in enumerate(reader):
            if isinstance(row, list) and len(row) > expected_width:
                row = row[:expected_width]
            rows_buffer.append(row)

            if (idx + 1) % batch_size == 0:
                print(f"⏳ Processing batch ending at row {idx + 1}…", end=" ", flush=True)
                normed = normalize_rows(rows_buffer, col_names)
                df = pd.DataFrame(normed)
                # print(f"First df.head is {df.head().to_dict(orient='records')}")
                if not geom_cols and "the_geom" in df.columns:
                    geom_cols = [("the_geom", 4326)]
                df = process_batch(df, geom_cols, json_cols)
                # print(f"Second df.head is {df.head().to_dict(orient='records')}")
                df = ensure_geometry_columns(df, geom_cols)
                df = enforce_integer_compat(df, DynamicTable)
                print(f"Third df.head is {df.head().to_dict(orient='records')}")
                # Optional: quick peek to prove it's not numeric anymore
                print("Filtered dict: ", df.filter(items=["the_geom"]).head(3).to_dict(orient="records"))
                print("GEOM sample:",
                    df.filter(items=[c for c in ["new_georeferenced_column", "location", "the_geom"] if c in df.columns])
                        .head(2).to_dict(orient="records"))
                print("LAT/LON sample:",
                    df.filter(items=[c for c in ["latitude", "longitude", "lat", "lon"] if c in df.columns])
                        .head(2).to_dict(orient="records"))
                copy_batch(engine, df, DynamicTable)
                # Verify this batch by SID
                if "sid" in df.columns:
                    _exp, _found, _missing = verify_batch(engine, DynamicTable, df["sid"])
                else:
                    print("⚠️ No 'sid' column available to verify this batch.")
                t3 = time.perf_counter()
                print(f"📤 Inserted {len(df)} rows in {t3 - batch_start:.2f}s")
                rows_buffer.clear()
                batch_start = time.perf_counter()

        # Final flush
        if rows_buffer:
            normed = normalize_rows(rows_buffer, col_names)
            df = pd.DataFrame(normed)
            if not geom_cols and "the_geom" in df.columns:
                geom_cols = [("the_geom", 4326)]
            df = process_batch(df, geom_cols, json_cols)
            df = ensure_geometry_columns(df, geom_cols)
            df = enforce_integer_compat(df, DynamicTable)
            # Optional: quick peek to prove it's not numeric anymore
            print("Filtered dict (last):", df.filter(items=["the_geom"]).head(3).to_dict(orient="records"))
            print("GEOM sample:",
                df.filter(items=[c for c in ["new_georeferenced_column", "location", "the_geom"] if c in df.columns])
                    .head(2).to_dict(orient="records"))

            print("LAT/LON sample:",
                df.filter(items=[c for c in ["latitude", "longitude", "lat", "lon"] if c in df.columns])
                    .head(2).to_dict(orient="records"))
            copy_batch(engine, df, DynamicTable)
            if "sid" in df.columns:
                _exp, _found, _missing = verify_batch(engine, DynamicTable, df["sid"])
            else:
                print("⚠️ No 'sid' column available to verify this batch.")
            print(f"✅ Final batch inserted ({len(df)} rows)")

    # Final sanity check: count source vs target rows
    # Count source rows (stream)
    src_count = 0
    with jsonlines.open(dataset.dataset_path, mode="r", loads=lambda s: orjson.loads(s.encode("utf-8"))) as rdr:
        for _ in rdr:
            src_count += 1

    # Count target rows
    from sqlalchemy import text
    with engine.connect() as conn:
        tgt_count = conn.execute(text(f'SELECT COUNT(*) FROM "{DynamicTable.__table__.name}"')).scalar_one()

    print(f"Source rows: {src_count}  |  Target rows: {tgt_count}")


# # Run for all JSON datasets
for name, dataset in dataset_info_dict.items():
    if dataset.format == 'json':
        print(f'Starting dataset {dataset.short_name}')
        insert_dataset(engine, dataset, dataset.dataset_path, dataset.col_types, batch_size=100000)


In [None]:
dataset_info_dict.keys()

In [None]:

# for name, dataset in dataset_info_dict.items():
#     if dataset.format == 'json':
#         print(f'Starting dataset {dataset.short_name}')
#         if name == 'NTAs2020':
#             insert_dataset(engine, dataset, dataset.dataset_path, dataset.col_types, batch_size=100000)