In [8]:
import duckdb, pandas as pd, numpy as np
from pathlib import Path

con = duckdb.connect()
con.execute("PRAGMA threads=8;")

CRASH = "../data/interim/crashes.parquet"
TRIPS = "../data/interim/tripdata_2013_2025.parquet"
GEO = "../data/raw/borough_boundaries.geojson"

OUT_DIR = Path("../data/processed")
OUT_DIR.mkdir(parents=True, exist_ok=True)

In [2]:
def describe_parquet(path):
    return con.execute(f"DESCRIBE SELECT * FROM read_parquet('{path}')").df()

print("Crashes rows:", con.execute(f"SELECT COUNT(*) FROM read_parquet('{CRASH}')").fetchone()[0])
describe_parquet(CRASH).head(30)


Crashes rows: 2232809


Unnamed: 0,column_name,column_type,null,key,default,extra
0,crash_date,TIMESTAMP,YES,,,
1,crash_time,VARCHAR,YES,,,
2,borough,VARCHAR,YES,,,
3,zip_code,VARCHAR,YES,,,
4,latitude,DOUBLE,YES,,,
5,longitude,DOUBLE,YES,,,
6,on_street_name,VARCHAR,YES,,,
7,cross_street_name,VARCHAR,YES,,,
8,number_of_persons_injured,INTEGER,YES,,,
9,number_of_persons_killed,INTEGER,YES,,,


In [4]:
con.execute(f"""
SELECT *
FROM read_parquet('../data/raw/weather_hourly_openmeteo/**/*.parquet')
WHERE timestamp >= TIMESTAMP '2020-01-01' AND timestamp < TIMESTAMP '2026-01-01';
""").df()


Unnamed: 0,timestamp,temp,prcp,snow,wspd,year,month,day,hour
0,2020-01-01 00:00:00+01:00,4.4,0.0,0.0,14.3,2019,12,31,23
1,2020-01-01 01:00:00+01:00,4.3,0.0,0.0,13.4,2020,1,1,0
2,2020-01-01 02:00:00+01:00,4.9,0.1,0.0,17.4,2020,1,1,1
3,2020-01-01 03:00:00+01:00,4.7,0.1,0.0,16.6,2020,1,1,2
4,2020-01-01 04:00:00+01:00,4.6,0.0,0.0,13.7,2020,1,1,3
...,...,...,...,...,...,...,...,...,...
52556,2025-12-29 20:00:00+01:00,9.3,0.1,0.0,18.2,2025,12,29,19
52557,2025-12-29 21:00:00+01:00,9.9,0.0,0.0,16.2,2025,12,29,20
52558,2025-12-29 22:00:00+01:00,8.4,0.0,0.0,19.2,2025,12,29,21
52559,2025-12-29 23:00:00+01:00,5.2,0.0,0.0,27.0,2025,12,29,22


In [3]:
print("Trips rows:", con.execute(f"SELECT COUNT(*) FROM read_parquet('{TRIPS}')").fetchone()[0])
describe_parquet(TRIPS).head(30)

Trips rows: 314728567


Unnamed: 0,column_name,column_type,null,key,default,extra
0,ride_id,VARCHAR,YES,,,
1,rideable_type,VARCHAR,YES,,,
2,started_at,TIMESTAMP,YES,,,
3,ended_at,TIMESTAMP,YES,,,
4,duration_sec,DOUBLE,YES,,,
5,start_station_id,VARCHAR,YES,,,
6,start_station_name,VARCHAR,YES,,,
7,start_lat,DOUBLE,YES,,,
8,start_lng,DOUBLE,YES,,,
9,end_station_id,VARCHAR,YES,,,


In [5]:
# Missingness important fields
con.execute(f"""
SELECT
  COUNT(*) AS n,
  ROUND(100.0 * SUM(start_lat IS NULL OR start_lng IS NULL) / COUNT(*), 2) AS miss_coord_start_pct,
  ROUND(100.0 * SUM(end_lat IS NULL OR end_lng IS NULL) / COUNT(*), 2) AS miss_coord_end_pct,
  ROUND(100.0 * SUM(duration_sec IS NULL OR ended_at IS NULL) / COUNT(*), 2) AS miss_date_pct,
  ROUND(100.0 * SUM(duration_sec IS NULL) / COUNT(*), 2) AS miss_duration
FROM read_parquet('{TRIPS}')
""").df()


Unnamed: 0,n,miss_coord_start_pct,miss_coord_end_pct,miss_date_pct,miss_duration
0,314728567,0.02,0.17,0.0,0.0


In [6]:
con.execute(f"""
SELECT *
FROM read_parquet('{TRIPS}')
WHERE end_lat IS NULL OR end_lng IS NULL
LIMIT 100
""").df()

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,duration_sec,start_station_id,start_station_name,start_lat,start_lng,end_station_id,end_station_name,end_lat,end_lng,member_casual
0,legacy_56,classic_bike,2013-06-01 00:11:04,2013-06-01 00:20:11,547.0,432,E 7 St & Avenue A,40.726218,-73.983799,,,,,member
1,legacy_102,classic_bike,2013-06-01 00:40:27,2013-06-01 00:49:24,537.0,482,W 15 St & 7 Ave,40.739355,-73.999318,,,,,member
2,legacy_120,classic_bike,2013-06-01 00:47:51,2013-06-01 00:55:43,472.0,528,2 Ave & E 31 St,40.742909,-73.977061,,,,,member
3,legacy_211,classic_bike,2013-06-01 01:32:55,2013-06-01 01:35:28,153.0,284,Greenwich Ave & 8 Ave,40.739017,-74.002638,,,,,member
4,legacy_289,classic_bike,2013-06-01 02:28:10,2013-06-01 02:42:11,841.0,509,9 Ave & W 22 St,40.745497,-74.001971,,,,,casual
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,legacy_2866,classic_bike,2013-06-01 13:43:26,2013-06-01 13:52:18,532.0,502,Henry St & Grand St,40.714215,-73.981346,,,,,member
96,legacy_2877,classic_bike,2013-06-01 13:44:08,2013-06-01 13:51:54,466.0,462,W 22 St & 10 Ave,40.746920,-74.004519,,,,,member
97,legacy_2881,classic_bike,2013-06-01 13:44:18,2013-06-01 14:08:53,1475.0,471,Grand St & Havemeyer St,40.712868,-73.956981,,,,,member
98,legacy_2911,classic_bike,2013-06-01 13:46:35,2013-06-01 14:11:54,1519.0,521,8 Ave & W 31 St N,40.750967,-73.994442,,,,,member


In [11]:
# Missingness important fields
con.execute(f"""
WITH base AS (
  SELECT
    CASE
      WHEN COALESCE(number_of_cyclist_injured,0) > 0
        OR COALESCE(number_of_cyclist_killed,0) > 0
      THEN 'cyclist_involved'
      ELSE 'all_other'
    END AS grp,
    borough,
    latitude,
    longitude,
    crash_date
FROM read_parquet('{CRASH}')
)
SELECT
  grp,
  COUNT(*) AS n,
  100.0 * SUM(borough IS NULL OR TRIM(CAST(borough AS VARCHAR))='') / COUNT(*) AS pct_miss_borough,
  100.0 * SUM(latitude IS NULL OR longitude IS NULL) / COUNT(*) AS pct_miss_coord,
  100.0 * SUM(crash_date IS NULL) / COUNT(*) AS pct_miss_date
FROM base
GROUP BY grp;


""").df()


Unnamed: 0,grp,n,pct_miss_borough,pct_miss_coord,pct_miss_date
0,all_other,2167981,30.85908,10.893915,0.0
1,cyclist_involved,64828,21.085025,6.637564,0.0


In [12]:
# Missingness important fields
con.execute(f"""
SELECT
  EXTRACT(year FROM crash_date) AS year,
  COUNT(*) AS n,
  100.0 * SUM(latitude IS NULL OR longitude IS NULL) / COUNT(*) AS pct_miss_coord,
  100.0 * SUM(borough IS NULL OR TRIM(CAST(borough AS VARCHAR))='') / COUNT(*) AS pct_miss_borough
FROM read_parquet('{CRASH}')
WHERE
  COALESCE(number_of_cyclist_injured, 0) > 0
  OR COALESCE(number_of_cyclist_killed, 0) > 0
GROUP BY 1
ORDER BY 1;

""").df()



Unnamed: 0,year,n,pct_miss_coord,pct_miss_borough
0,2012,2202,8.49228,8.673933
1,2013,4062,8.591827,8.6903
2,2014,4000,7.95,8.125
3,2015,4268,9.23149,9.723524
4,2016,4945,13.447927,26.309403
5,2017,4865,6.14594,31.243577
6,2018,4694,5.581594,28.291436
7,2019,4964,5.398872,28.142627
8,2020,5482,5.344765,27.544692
9,2021,4901,5.264232,27.729035


In [13]:
# Trip quality Check
con.execute(f"""
SELECT
  COUNT(*) AS n,
  SUM(started_at IS NULL) AS miss_started_at,
  SUM(duration_sec IS NULL) AS miss_duration,
  SUM(duration_sec < 0) AS neg_duration,
  SUM(duration_sec < 60) AS too_short,
  SUM(duration_sec > 6*3600) AS too_long
FROM read_parquet('{TRIPS}')
""").df()


Unnamed: 0,n,miss_started_at,miss_duration,neg_duration,too_short,too_long
0,314728567,0.0,0.0,1760.0,2331.0,397365.0


In [14]:
# Gefilterten Datensatz erstellen
con.execute(f"""
    COPY (
        SELECT * FROM read_parquet('{TRIPS}')
        WHERE duration_sec IS NOT NULL
          AND duration_sec >= 60
          AND duration_sec <= 6 * 3600
          -- Geographic filter for NYC area
          AND start_lat BETWEEN 40.50 AND 41.00
          AND start_lng BETWEEN -74.30 AND -73.50
          AND end_lat BETWEEN 40.50 AND 41.00
          AND end_lng BETWEEN -74.30 AND -73.50
    ) TO '../data/interim/tripdata_2013_2025_clean.parquet' (FORMAT PARQUET)
""")

<_duckdb.DuckDBPyConnection at 0x10fae4eb0>

In [15]:
con.execute(f"""
SELECT UPPER(TRIM(CAST(borough AS VARCHAR))) AS borough, COUNT(*) cnt
FROM read_parquet('{CRASH}')
GROUP BY 1 ORDER BY cnt DESC LIMIT 20
""").df()


Unnamed: 0,borough,cnt
0,,682688
1,BROOKLYN,497140
2,QUEENS,415330
3,MANHATTAN,343192
4,BRONX,229556
5,STATEN ISLAND,64903


In [16]:
con.execute(f"""
WITH cyclist_crashes AS (
  SELECT *
  FROM read_parquet('{CRASH}')
  WHERE
    vehicle_type_code1 = 'BICYCLE'
    OR vehicle_type_code2 = 'BICYCLE'
    OR COALESCE(number_of_cyclist_injured, 0) > 0
    OR COALESCE(number_of_cyclist_killed, 0) > 0
),
factors AS (
  SELECT contributing_factor_vehicle_1 AS factor
  FROM cyclist_crashes
  WHERE contributing_factor_vehicle_1 IS NOT NULL

  UNION ALL

  SELECT contributing_factor_vehicle_2 AS factor
  FROM cyclist_crashes
  WHERE contributing_factor_vehicle_2 IS NOT NULL
)
SELECT
  LOWER(TRIM(factor)) AS factor,
  COUNT(*) AS n
FROM factors
GROUP BY 1
ORDER BY n DESC;


""").df()

Unnamed: 0,factor,n
0,unspecified,68171
1,driver inattention/distraction,22916
2,failure to yield right-of-way,8666
3,pedestrian/bicyclist/other pedestrian error/co...,6655
4,,4886
5,traffic control disregarded,3589
6,passenger distraction,2970
7,passing or lane usage improper,2914
8,other vehicular,2573
9,following too closely,1764


In [17]:
con.execute(f"""
SELECT
  bike_factor,
  COUNT(*) AS n
FROM (
  SELECT
    CASE
      WHEN vehicle_type_code1 = 'BICYCLE' THEN LOWER(TRIM(contributing_factor_vehicle_1))
      WHEN vehicle_type_code2 = 'BICYCLE' THEN LOWER(TRIM(contributing_factor_vehicle_2))
    END AS bike_factor
  FROM read_parquet('{CRASH}')
  WHERE
    vehicle_type_code1 = 'BICYCLE'
    OR vehicle_type_code2 = 'BICYCLE'
)
GROUP BY 1
ORDER BY n DESC;


""").df()

Unnamed: 0,bike_factor,n
0,unspecified,15432
1,passenger distraction,752
2,driver inattention/distraction,675
3,,636
4,other vehicular,429
5,failure to yield right-of-way,221
6,lost consciousness,165
7,physical disability,152
8,fatigued/drowsy,96
9,traffic control disregarded,82


In [18]:

CRASH = "../data/interim/crashes.parquet"
OUT = "../data/processed/crashes_bike_liability.parquet"


con.execute(f"""
COPY (
  WITH base AS (
    SELECT
      *,
      LOWER(TRIM(COALESCE(vehicle_type_code1, ''))) AS vtype1_norm,
      LOWER(TRIM(COALESCE(vehicle_type_code2, ''))) AS vtype2_norm,
      LOWER(TRIM(COALESCE(contributing_factor_vehicle_1, ''))) AS factor1_norm,
      LOWER(TRIM(COALESCE(contributing_factor_vehicle_2, ''))) AS factor2_norm
    FROM read_parquet('{CRASH}')
  ),

  typed AS (
    SELECT
      *,
      -- Bike / E-Bike detection (robust), scooters explicitly excluded
      CASE
        WHEN (
          (vtype1_norm LIKE '%bicy%' OR vtype1_norm LIKE '%bike%')
          AND vtype1_norm NOT LIKE '%scoot%'
        )
        THEN 1 ELSE 0
      END AS is_bike_v1,

      CASE
        WHEN (
          (vtype2_norm LIKE '%bicy%' OR vtype2_norm LIKE '%bike%')
          AND vtype2_norm NOT LIKE '%scoot%'
        )
        THEN 1 ELSE 0
      END AS is_bike_v2
    FROM base
  ),

  bike_only AS (
    SELECT
      *,
      1 AS cyclist_involved,
      CASE
        WHEN is_bike_v1 = 1 THEN factor1_norm
        WHEN is_bike_v2 = 1 THEN factor2_norm
        ELSE ''
      END AS bike_factor_norm
    FROM typed
    WHERE is_bike_v1 = 1 OR is_bike_v2 = 1
  )

  SELECT
    -- ORIGINAL COLUMNS
    crash_date,
    crash_time,
    borough,
    zip_code,
    latitude,
    longitude,
    on_street_name,
    cross_street_name,
    number_of_persons_injured,
    number_of_persons_killed,
    number_of_pedestrians_injured,
    number_of_pedestrians_killed,
    number_of_cyclist_injured,
    number_of_cyclist_killed,
    number_of_motorist_injured,
    number_of_motorist_killed,
    contributing_factor_vehicle_1,
    contributing_factor_vehicle_2,
    vehicle_type_code1,
    vehicle_type_code2,
    collision_id,

    -- DERIVED
    cyclist_involved,
    bike_factor_norm,

    -- Observed liability (documented, plausible bike defects)
    CASE
      WHEN bike_factor_norm IN (
        'brakes defective',
        'steering failure',
        'tire failure/inadequate',
        'headlights defective',
        'other lighting defects'
      )
      THEN 1 ELSE 0
    END AS is_observed_liability,

    -- Potential liability (pricing-conservative)
    CASE
      WHEN bike_factor_norm IN (
        'brakes defective',
        'steering failure',
        'tire failure/inadequate',
        'headlights defective',
        'other lighting defects'
      )
      OR bike_factor_norm = 'unspecified'
      OR bike_factor_norm = ''
      THEN 1 ELSE 0
    END AS potential_liability

  FROM bike_only
)
TO '{OUT}'
(FORMAT PARQUET);
""")

print("Saved:", OUT)


Saved: ../data/processed/crashes_bike_liability.parquet


In [19]:
con.execute("""
SELECT
  COUNT(*) AS n,
  SUM(is_observed_liability) AS observed_liability,
  SUM(potential_liability) AS potential_liability
FROM read_parquet('../data/processed/crashes_bike_liability.parquet');
""").fetchdf()

Unnamed: 0,n,observed_liability,potential_liability
0,88367,190.0,56063.0


In [20]:
import duckdb

duckdb.sql("""
    SELECT 
        YEAR(started_at) as year,
        COUNT(*) as total_trips,
        SUM(CASE WHEN end_lat IS NULL OR end_lng IS NULL OR ended_at IS NULL THEN 1 ELSE 0 END) as incomplete,
        ROUND(100.0 * SUM(CASE WHEN end_lat IS NULL OR end_lng IS NULL OR ended_at IS NULL THEN 1 ELSE 0 END) / COUNT(*), 2) as pct
    FROM read_parquet('../data/interim/tripdata_2013_2025_clean.parquet')
    GROUP BY YEAR(started_at)
    ORDER BY year
""").show()


┌───────┬─────────────┬────────────┬────────┐
│ year  │ total_trips │ incomplete │  pct   │
│ int64 │    int64    │   int128   │ double │
├───────┼─────────────┼────────────┼────────┤
│  2013 │    11182372 │          0 │    0.0 │
│  2014 │     8078329 │          0 │    0.0 │
│  2015 │     9924014 │          0 │    0.0 │
│  2016 │    13830135 │          0 │    0.0 │
│  2017 │    16350982 │          0 │    0.0 │
│  2018 │    35069874 │          0 │    0.0 │
│  2019 │    20534621 │          0 │    0.0 │
│  2020 │    19486751 │          0 │    0.0 │
│  2021 │    27034048 │          0 │    0.0 │
│  2022 │    29759321 │          0 │    0.0 │
│  2023 │    34973540 │          0 │    0.0 │
│  2024 │    44148813 │          0 │    0.0 │
│  2025 │    43525267 │          0 │    0.0 │
├───────┴─────────────┴────────────┴────────┤
│ 13 rows                         4 columns │
└───────────────────────────────────────────┘



In [21]:

print("="*70)
print("CRASH COORDINATES ANALYSIS")
print("="*70)

missing_coords = con.execute(f"""
    SELECT
        COUNT(*) as total_crashes,
        SUM(CASE WHEN latitude IS NULL OR longitude IS NULL THEN 1 ELSE 0 END) as missing_coords,
        SUM(CASE WHEN latitude IS NULL OR longitude IS NULL THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as pct_missing,
        
        -- Of those with missing coords, how many have borough?
        SUM(CASE 
            WHEN (latitude IS NULL OR longitude IS NULL) 
            AND borough IS NOT NULL 
            AND TRIM(borough) != '' 
            THEN 1 ELSE 0 
        END) as missing_coords_have_borough,
        
        -- How many have zip?
        SUM(CASE 
            WHEN (latitude IS NULL OR longitude IS NULL) 
            AND zip_code IS NOT NULL 
            AND TRIM(CAST(zip_code AS VARCHAR)) != '' 
            THEN 1 ELSE 0 
        END) as missing_coords_have_zip,
        
        -- How many have street names?
        SUM(CASE 
            WHEN (latitude IS NULL OR longitude IS NULL) 
            AND (on_street_name IS NOT NULL OR cross_street_name IS NOT NULL)
            THEN 1 ELSE 0 
        END) as missing_coords_have_streets
        
    FROM read_parquet('{CRASH}')
    WHERE 
        -- Apply bike filter first
        (
            LOWER(TRIM(COALESCE(vehicle_type_code1, ''))) LIKE '%bicy%' OR 
            LOWER(TRIM(COALESCE(vehicle_type_code1, ''))) LIKE '%bike%' OR
            LOWER(TRIM(COALESCE(vehicle_type_code2, ''))) LIKE '%bicy%' OR
            LOWER(TRIM(COALESCE(vehicle_type_code2, ''))) LIKE '%bike%'
        )
""").fetchdf()

print(missing_coords)

print("\n" + "="*70)

# Check outlier coordinates
outliers = con.execute(f"""
    SELECT
        COUNT(*) as n_outliers,
        MIN(latitude) as min_lat,
        MAX(latitude) as max_lat,
        MIN(longitude) as min_lng,
        MAX(longitude) as max_lng
    FROM read_parquet('{CRASH}')
    WHERE latitude IS NOT NULL 
        AND longitude IS NOT NULL
        AND (
            latitude NOT BETWEEN 40.50 AND 41.00 OR
            longitude NOT BETWEEN -74.30 AND -73.50
        )
        AND (
            LOWER(TRIM(COALESCE(vehicle_type_code1, ''))) LIKE '%bicy%' OR 
            LOWER(TRIM(COALESCE(vehicle_type_code1, ''))) LIKE '%bike%' OR
            LOWER(TRIM(COALESCE(vehicle_type_code2, ''))) LIKE '%bicy%' OR
            LOWER(TRIM(COALESCE(vehicle_type_code2, ''))) LIKE '%bike%'
        )
""").fetchdf()

print("Outlier coordinates:")
print(outliers)

CRASH COORDINATES ANALYSIS
   total_crashes  missing_coords  pct_missing  missing_coords_have_borough  \
0          88367          5730.0     6.484321                       1495.0   

   missing_coords_have_zip  missing_coords_have_streets  
0                   1495.0                       5730.0  

Outlier coordinates:
   n_outliers  min_lat  max_lat  min_lng  max_lng
0         579      0.0      0.0      0.0      0.0


In [22]:
CRASH = "../data/interim/crashes.parquet"
OUT = "../data/processed/crashes_bike_liability.parquet"

print("="*70)
print("CREATING CRASH DATASET WITH SMART COORDINATE IMPUTATION")
print("="*70)

# Step 1: Load all bike crashes
crashes = con.execute(f"""
    WITH base AS (
        SELECT
            *,
            LOWER(TRIM(COALESCE(vehicle_type_code1, ''))) AS vtype1_norm,
            LOWER(TRIM(COALESCE(vehicle_type_code2, ''))) AS vtype2_norm
        FROM read_parquet('{CRASH}')
    )
    SELECT *
    FROM base
    WHERE (
        (vtype1_norm LIKE '%bicy%' OR vtype1_norm LIKE '%bike%')
        AND vtype1_norm NOT LIKE '%scoot%'
    ) OR (
        (vtype2_norm LIKE '%bicy%' OR vtype2_norm LIKE '%bike%')
        AND vtype2_norm NOT LIKE '%scoot%'
    )
""").fetchdf()

print(f"Total bike crashes: {len(crashes):,}")

# Step 2: Classify coordinates
import numpy as np

def classify_coords(row):
    """Classify coordinate quality"""
    lat, lng = row['latitude'], row['longitude']
    
    # Valid coordinates
    if (pd.notna(lat) and pd.notna(lng) and 
        lat != 0.0 and lng != 0.0 and
        40.50 <= lat <= 41.00 and -74.30 <= lng <= -73.50):
        return 'valid', lat, lng
    
    # Has borough info for imputation
    borough = str(row['borough']).strip().upper()
    if borough in ['MANHATTAN', 'BROOKLYN', 'QUEENS', 'BRONX', 'STATEN ISLAND']:
        return 'impute_borough', None, None
    
    # No info - needs citywide imputation
    return 'impute_citywide', None, None

crashes[['coord_type', 'lat_clean', 'lng_clean']] = crashes.apply(
    classify_coords, axis=1, result_type='expand'
)

print("\nCoordinate classification:")
print(f"  Valid:            {(crashes['coord_type']=='valid').sum():,} ({(crashes['coord_type']=='valid').sum()/len(crashes)*100:.1f}%)")
print(f"  Borough impute:   {(crashes['coord_type']=='impute_borough').sum():,} ({(crashes['coord_type']=='impute_borough').sum()/len(crashes)*100:.1f}%)")
print(f"  Citywide impute:  {(crashes['coord_type']=='impute_citywide').sum():,} ({(crashes['coord_type']=='impute_citywide').sum()/len(crashes)*100:.1f}%)")

# Step 3: Borough-based imputation
BOROUGH_CENTROIDS = {
    'MANHATTAN':     (40.7831, -73.9712, 0.08, 0.05),  # (lat, lng, lat_radius, lng_radius)
    'BROOKLYN':      (40.6782, -73.9442, 0.10, 0.08),
    'QUEENS':        (40.7282, -73.7949, 0.12, 0.10),
    'BRONX':         (40.8448, -73.8648, 0.08, 0.06),
    'STATEN ISLAND': (40.5795, -74.1502, 0.10, 0.08)
}

def impute_borough_coords(row):
    """Impute based on borough"""
    borough = str(row['borough']).strip().upper()
    
    if borough in BOROUGH_CENTROIDS:
        lat_c, lng_c, lat_r, lng_r = BOROUGH_CENTROIDS[borough]
        
        # Uniform random within borough bounds
        lat = np.random.uniform(lat_c - lat_r, lat_c + lat_r)
        lng = np.random.uniform(lng_c - lng_r, lng_c + lng_r)
        
        # Clip to NYC bounds
        lat = np.clip(lat, 40.50, 41.00)
        lng = np.clip(lng, -74.30, -73.50)
        
        return lat, lng
    
    return None, None

# Step 4: Citywide proportional imputation
valid_crashes = crashes[crashes['coord_type'] == 'valid'].copy()

def impute_citywide_coords(row):
    """Sample from actual crash distribution"""
    sample = valid_crashes.sample(n=1)
    return sample['lat_clean'].iloc[0], sample['lng_clean'].iloc[0]

# Apply imputations
print("\nApplying imputation...")

for idx, row in crashes.iterrows():
    if row['coord_type'] == 'valid':
        continue
    elif row['coord_type'] == 'impute_borough':
        lat, lng = impute_borough_coords(row)
        if lat is not None:
            crashes.at[idx, 'lat_clean'] = lat
            crashes.at[idx, 'lng_clean'] = lng
        else:
            lat, lng = impute_citywide_coords(row)
            crashes.at[idx, 'lat_clean'] = lat
            crashes.at[idx, 'lng_clean'] = lng
    else:  # impute_citywide
        lat, lng = impute_citywide_coords(row)
        crashes.at[idx, 'lat_clean'] = lat
        crashes.at[idx, 'lng_clean'] = lng

# Verify no missing coordinates
missing = crashes[['lat_clean', 'lng_clean']].isna().any(axis=1).sum()
print(f"Missing after imputation: {missing} (should be 0!)")

# Step 5: Select final columns (NO LIABILITY COLUMNS!)
final_crashes = crashes[[
    'crash_date', 'crash_time', 'borough', 'zip_code',
    'lat_clean', 'lng_clean',
    'on_street_name', 'cross_street_name',
    'number_of_persons_injured', 'number_of_persons_killed',
    'number_of_pedestrians_injured', 'number_of_pedestrians_killed',
    'number_of_cyclist_injured', 'number_of_cyclist_killed',
    'number_of_motorist_injured', 'number_of_motorist_killed',
    'contributing_factor_vehicle_1', 'contributing_factor_vehicle_2',
    'vehicle_type_code1', 'vehicle_type_code2', 'collision_id',
    'coord_type'
]].rename(columns={'lat_clean': 'latitude', 'lng_clean': 'longitude'})

# Save
final_crashes.to_parquet(OUT, index=False)

print(f"\n Saved: {OUT}")
print(f"   Total crashes: {len(final_crashes):,}")

# Stats
print("\n" + "="*70)
print("FINAL STATISTICS")
print("="*70)
print(f"Total crashes:          {len(final_crashes):,}")
print(f"Valid coordinates:      {(final_crashes['coord_type']=='valid').sum():,} ({(final_crashes['coord_type']=='valid').sum()/len(final_crashes)*100:.1f}%)")
print(f"Borough imputed:        {(final_crashes['coord_type']=='impute_borough').sum():,} ({(final_crashes['coord_type']=='impute_borough').sum()/len(final_crashes)*100:.1f}%)")
print(f"Citywide imputed:       {(final_crashes['coord_type']=='impute_citywide').sum():,} ({(final_crashes['coord_type']=='impute_citywide').sum()/len(final_crashes)*100:.1f}%)")
print(f"Lat range:              [{final_crashes['latitude'].min():.6f}, {final_crashes['latitude'].max():.6f}]")
print(f"Lng range:              [{final_crashes['longitude'].min():.6f}, {final_crashes['longitude'].max():.6f}]")
print("="*70)

CREATING CRASH DATASET WITH SMART COORDINATE IMPUTATION
Total bike crashes: 88,367

Coordinate classification:
  Valid:            82,058 (92.9%)
  Borough impute:   1,985 (2.2%)
  Citywide impute:  4,324 (4.9%)

Applying imputation...
Missing after imputation: 0 (should be 0!)

 Saved: ../data/processed/crashes_bike_liability.parquet
   Total crashes: 88,367

FINAL STATISTICS
Total crashes:          88,367
Valid coordinates:      82,058 (92.9%)
Borough imputed:        1,985 (2.2%)
Citywide imputed:       4,324 (4.9%)
Lat range:              [40.500000, 40.924491]
Lng range:              [-74.250270, -73.696074]
