### Load KSI file

In [1]:
from pathlib import Path
import pandas as pd
import geopandas as gpd

REPO_ROOT = Path.cwd().parents[0] if Path.cwd().name == "notebooks" else Path.cwd()
DATA_DIR = REPO_ROOT / "data"
RAW_DIR = DATA_DIR / "raw"
INTERIM_DIR = DATA_DIR / "interim"
INTERIM_DIR.mkdir(parents=True, exist_ok=True)

ksi_dir = RAW_DIR / "ksi"
if not ksi_dir.exists():
    raise FileNotFoundError(f"Create folder and place KSI file here: {ksi_dir}")

files = list(ksi_dir.glob("*"))
print("Found files:", [f.name for f in files])

if len(files) == 0:
    raise FileNotFoundError("No files found in data/raw/ksi/")

ksi_path = files[0]
print("Using:", ksi_path)

# Read based on extension
if ksi_path.suffix.lower() in [".parquet"]:
    df = pd.read_parquet(ksi_path)
elif ksi_path.suffix.lower() in [".csv"]:
    df = pd.read_csv(ksi_path)
elif ksi_path.suffix.lower() in [".geojson", ".json", ".shp"]:
    gdf = gpd.read_file(ksi_path)
    df = pd.DataFrame(gdf.drop(columns="geometry"))
else:
    raise ValueError(f"Unsupported file type: {ksi_path.suffix}")

print("KSI shape:", df.shape)
df.head(2)


Found files: ['total_ksi.geojson']
Using: C:\code\pyspark-playground\Covercheck-Toronto\data\raw\ksi\total_ksi.geojson
KSI shape: (18957, 52)


Unnamed: 0,OBJECTID,INDEX_,ACCNUM,DATE,TIME,STREET1,STREET2,OFFSET,ROAD_CLASS,DISTRICT,...,SPEEDING,AG_DRIV,REDLIGHT,ALCOHOL,DISABILITY,HOOD_158,NEIGHBOURHOOD_158,HOOD_140,NEIGHBOURHOOD_140,DIVISION
0,1,3389067,893184,"Sun, 01 Jan 2006 10:00:00 GMT",236,WOODBINE AVE,O CONNOR DR,,Major Arterial,Toronto and East York,...,Yes,Yes,,Yes,,60,Woodbine-Lumsden,60,Woodbine-Lumsden (60),D55
1,2,3389068,893184,"Sun, 01 Jan 2006 10:00:00 GMT",236,WOODBINE AVE,O CONNOR DR,,Major Arterial,Toronto and East York,...,Yes,Yes,,Yes,,60,Woodbine-Lumsden,60,Woodbine-Lumsden (60),D55


### Clean, parse date and build flags

In [2]:
# Parse date: "Sun, 01 Jan 2006 10:00:00 GMT" to datetime to date
df["date"] = pd.to_datetime(df["DATE"], utc=True, errors="coerce").dt.date

# Drop rows where the date completely failed to parse
df = df.dropna(subset=["date"]).copy()

# Standardize neighbourhood id & Handle "NSA" (No Specific Address)
df["nbhd_id"] = pd.to_numeric(df["HOOD_158"], errors="coerce")

# FIX: Drop the unmappable "NSA" rows BEFORE the assert line
invalid_hoods = df["nbhd_id"].isna().sum()
print(f"Dropping {invalid_hoods} rows without a valid neighbourhood (e.g., 'NSA').")
df = df.dropna(subset=["nbhd_id"]).copy()

# Now it is safe to assert and convert to integer
assert df["nbhd_id"].isna().sum() == 0, "HOOD_158 still has null values"
df["nbhd_id"] = df["nbhd_id"].astype(int)

# Injury flags
# Toronto KSI data often codes serious injuries as "Major".
# Using a regex '|' (or) to catch both "serious" and "major".
inj = df["INJURY"].astype(str).str.lower()
df["is_fatal"] = inj.str.contains("fatal", na=False)
df["is_serious"] = inj.str.contains("serious|major", regex=True, na=False)

# Fatal victims
df["fatal_no"] = pd.to_numeric(df["FATAL_NO"], errors="coerce").fillna(0).astype(int)

print("Cleaned DataFrame shape:", df.shape)
df[["ACCNUM", "date", "nbhd_id", "INJURY", "is_fatal", "is_serious", "fatal_no"]].head(5)

Dropping 128 rows without a valid neighbourhood (e.g., 'NSA').
Cleaned DataFrame shape: (18829, 57)


Unnamed: 0,ACCNUM,date,nbhd_id,INJURY,is_fatal,is_serious,fatal_no
0,893184,2006-01-01,60,Major,False,True,0
1,893184,2006-01-01,60,Minor,False,False,0
2,893184,2006-01-01,60,Minor,False,False,0
3,893184,2006-01-01,60,Minor,False,False,0
4,893184,2006-01-01,60,Minor,False,False,0


### Collapse to accident level

In [3]:
acc_level = (
    df.groupby(["ACCNUM", "date", "nbhd_id"], as_index=False)
      .agg(
          fatal_collision=("is_fatal", "max"),
          serious_collision=("is_serious", "max"),
          fatal_victims=("is_fatal", "sum"), #changed from fatal_no to is_fatal since final output had mismatched numbers
          victim_count=("OBJECTID", "count")
      )
)

print("Accident-level rows:", acc_level.shape)
acc_level.head()


Accident-level rows: (4919, 7)


Unnamed: 0,ACCNUM,date,nbhd_id,fatal_collision,serious_collision,fatal_victims,victim_count
0,1000002228,2021-01-01,155,True,True,1,6
1,1000028,2007-11-01,92,False,True,0,5
2,1000041217,2021-01-07,85,False,True,0,2
3,1000055,2007-10-24,73,False,True,0,3
4,1000067206,2021-01-11,139,False,True,0,5


### Aggregate daily per neighbourhood

In [4]:
ksi_daily = (
    acc_level.groupby(["date", "nbhd_id"], as_index=False)
      .agg(
          ksi_collisions=("ACCNUM", "nunique"),
          ksi_fatal_collisions=("fatal_collision", "sum"),
          ksi_serious_collisions=("serious_collision", "sum"),
          ksi_fatal_victims=("fatal_victims", "sum"),
          ksi_victim_count=("victim_count", "sum")
      )
)

ksi_daily["ksi_weighted_score"] = (
    3 * ksi_daily["ksi_fatal_collisions"] +
    2 * ksi_daily["ksi_serious_collisions"]
)

#protecting from schema drift
assert (ksi_daily["ksi_fatal_victims"] <= ksi_daily["ksi_victim_count"]).all(), \
    "Fatal victims exceed victim count — something is wrong."


print("KSI daily shape:", ksi_daily.shape)
ksi_daily.head()


KSI daily shape: (4879, 8)


Unnamed: 0,date,nbhd_id,ksi_collisions,ksi_fatal_collisions,ksi_serious_collisions,ksi_fatal_victims,ksi_victim_count,ksi_weighted_score
0,2006-01-01,60,1,0,1,0,8,2
1,2006-01-01,64,1,0,1,0,5,2
2,2006-01-02,78,1,0,1,0,2,2
3,2006-01-04,83,1,0,1,0,2,2
4,2006-01-06,47,1,0,1,0,2,2


### Save

In [5]:
out_path = INTERIM_DIR / "ksi_nbhd_daily.parquet"
ksi_daily.to_parquet(out_path, index=False)
print("Saved:", out_path)


Saved: C:\code\pyspark-playground\Covercheck-Toronto\data\interim\ksi_nbhd_daily.parquet
