## ETL Script

In [2]:
!pip install azure-storage-blob
!pip install snowflake-connector-python
!pip install snowflake-sqlalchemy




[notice] A new release of pip is available: 24.2 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip



Collecting snowflake-connector-python
  Downloading snowflake_connector_python-3.15.0-cp312-cp312-win_amd64.whl.metadata (72 kB)
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting boto3>=1.24 (from snowflake-connector-python)
  Downloading boto3-1.38.12-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore>=1.24 (from snowflake-connector-python)
  Downloading botocore-1.38.12-py3-none-any.whl.metadata (5.7 kB)
Collecting pyOpenSSL<26.0.0,>=22.0.0 (from snowflake-connector-python)
  Downloading pyOpenSSL-25.0.0-py3-none-any.whl.metadata (16 kB)
Collecting pyjwt<3.0.0 (from snowflake-connector-python)
  Downloading PyJWT-2.10.1-py3-none-any.whl.metadata (4.0 kB)
Collecting sortedcontainers>=2.4.0 (from snowflake-connector-python)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tomlkit (from snowflake-connector-python)
  Downloading tomlkit-0.13.2-py3-


[notice] A new release of pip is available: 24.2 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Collecting snowflake-sqlalchemy


[notice] A new release of pip is available: 24.2 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip



  Downloading snowflake_sqlalchemy-1.7.3-py3-none-any.whl.metadata (28 kB)
Collecting sqlalchemy>=1.4.19 (from snowflake-sqlalchemy)
  Downloading sqlalchemy-2.0.40-cp312-cp312-win_amd64.whl.metadata (9.9 kB)
Collecting greenlet>=1 (from sqlalchemy>=1.4.19->snowflake-sqlalchemy)
  Downloading greenlet-3.2.1-cp312-cp312-win_amd64.whl.metadata (4.2 kB)
Downloading snowflake_sqlalchemy-1.7.3-py3-none-any.whl (70 kB)
Downloading sqlalchemy-2.0.40-cp312-cp312-win_amd64.whl (2.1 MB)
   ---------------------------------------- 0.0/2.1 MB ? eta -:--:--
   ---------------------------------------- 2.1/2.1 MB 57.7 MB/s eta 0:00:00
Downloading greenlet-3.2.1-cp312-cp312-win_amd64.whl (296 kB)
Installing collected packages: greenlet, sqlalchemy, snowflake-sqlalchemy
Successfully installed greenlet-3.2.1 snowflake-sqlalchemy-1.7.3 sqlalchemy-2.0.40


In [3]:
# Data handling
import pandas as pd
import numpy as np
import json
import io
import os
import requests

# Cloud storage
from azure.storage.blob import BlobServiceClient

# Snowflake
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

# SQLAlchemy
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

# General cleanup
import warnings
warnings.filterwarnings('ignore')


## Connection String Load

In [4]:
def load_config_azure(config_path="config.json"):
  """Load the Azure configuration parameters from the config.json file."""
  with open(config_path,"r", encoding="utf-8") as config_file:
    config = json.load(config_file)
  return config["AZURE_CONNECTION_STRING"], config["austin_container"]

def load_config_snowflake(config_path="config.json"):
    """Load the Azure configuration parameters from the config.json file."""
    with open(config_path,"r", encoding="utf-8") as config_file:
        config = json.load(config_file)
    return config["SNOWFLAKE_USER"], config["SNOWFLAKE_PASSWORD"], config["SNOWFLAKE_ACCOUNT"], config["SNOWFLAKE_DATABASE"], config["SNOWFLAKE_SCHEMA"], config["SNOWFLAKE_WAREHOUSE"]

AZURE_CONNECTION_STRING, austin_container = load_config_azure()
SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT, SNOWFLAKE_DATABASE, SNOWFLAKE_SCHEMA, SNOWFLAKE_WAREHOUSE = load_config_snowflake()

blob_service_client = BlobServiceClient.from_connection_string(AZURE_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(austin_container)

connection_params = {
    "user": SNOWFLAKE_USER,
    "password": SNOWFLAKE_PASSWORD,
    "account": SNOWFLAKE_ACCOUNT,
    "warehouse": SNOWFLAKE_WAREHOUSE,
    "database": SNOWFLAKE_DATABASE,
    "schema": SNOWFLAKE_SCHEMA
}

engine = create_engine(URL(
    user=SNOWFLAKE_USER,
    password=SNOWFLAKE_PASSWORD,
    account=SNOWFLAKE_ACCOUNT,
    warehouse=SNOWFLAKE_WAREHOUSE,
    database=SNOWFLAKE_DATABASE,
    schema=SNOWFLAKE_SCHEMA))


## Data Extraction


In [5]:
# Define the path to your blob inside the Azure container
crash_blob = "austincrashdatafull.csv"

# Get a BlobClient for the CSV file
blob_client = blob_service_client.get_blob_client(container=austin_container, blob=crash_blob)

# Download the blob content
blob_data = blob_client.download_blob().readall()

# Read the blob content into a pandas DataFrame
crash_df = pd.read_csv(io.BytesIO(blob_data))

# Preview the first few rows
crash_df.head()


Unnamed: 0,id,cris_crash_id,crash_fatal_fl,case_id,address_primary,address_secondary,rpt_street_name,rpt_street_sfx,crash_speed_limit,road_constr_zone_fl,...,rpt_block_num,latitude,longitude,point,:@computed_region_e9j2_6w3z,:@computed_region_m2th_e4b7,:@computed_region_rxpj_nzrk,:@computed_region_8spj_utxs,:@computed_region_jcrc_4uuy,:@computed_region_q9nd_rr82
0,584,13647704.0,False,140090520,RIDGELINE BLVD BLVD,N N FM 620 RD RD,RIDGELINE BLVD,BLVD,-1.0,False,...,,,,,,,,,,
1,849,13653672.0,False,140101374,NOT REPORTED,AIRPORT BLVD BLVD,NOT REPORTED,,-1.0,False,...,,,,,,,,,,
2,1148,13663307.0,False,140151612,6200 NOT REPORTED HWY,6300 JOE TANNER LN,NOT REPORTED,HWY,45.0,False,...,6200.0,,,,,,,,,
3,1387,13668080.0,False,140081670,RESEARCH BLVD SVRD NB BLVD,STECK AVE,RESEARCH BLVD SVRD NB,BLVD,-1.0,False,...,,,,,,,,,,
4,1877,13679314.0,False,140380242,4400 NOT REPORTED,AIRPORT BLVD,NOT REPORTED,,60.0,False,...,4400.0,,,,,,,,,


##Data Reformatting

In [6]:
print(crash_df.shape)
print(crash_df.isnull().sum())
crash_df.rename(columns={"TOT_INJRY_CNT": "total_injury_count"}, inplace=True)

#Dropping unneeded columns

crash_df.drop([":@computed_region_e9j2_6w3z",":@computed_region_m2th_e4b7",":@computed_region_rxpj_nzrk",
               ":@computed_region_8spj_utxs",":@computed_region_jcrc_4uuy",":@computed_region_q9nd_rr82","is_deleted","is_temp_record"], axis=1, inplace=True)

(217394, 50)
id                                        0
cris_crash_id                             7
crash_fatal_fl                            0
case_id                                2920
address_primary                           0
address_secondary                         2
rpt_street_name                           1
rpt_street_sfx                        67395
crash_speed_limit                         6
road_constr_zone_fl                       7
crash_sev_id                              0
sus_serious_injry_cnt                     0
nonincap_injry_cnt                        0
poss_injry_cnt                            0
non_injry_cnt                             0
unkn_injry_cnt                            0
tot_injry_cnt                             0
death_cnt                                 0
units_involved                            0
motor_vehicle_death_count                 0
motor_vehicle_serious_injury_count        0
bicycle_death_count                       0
bicycle_serious_inj

In [7]:
crash_df.dropna(subset=["cris_crash_id", "latitude", "longitude"], inplace=True)

mode_speed = crash_df["crash_speed_limit"].mode()[0]
crash_df["crash_speed_limit"].fillna(mode_speed, inplace=True)

crash_df["road_constr_zone_fl"].fillna(False, inplace=True)

crash_df.dropna(subset=["rpt_street_name"], inplace=True)

crash_df.drop(columns=["point"], inplace=True)

crash_df.drop(columns=["case_id"], inplace=True)

crash_df.rename(columns={"id": "crash_id"}, inplace=True)

crash_df.reset_index(drop=True, inplace=True)

print(crash_df.isnull().sum())


crash_id                                  0
cris_crash_id                             0
crash_fatal_fl                            0
address_primary                           0
address_secondary                         2
rpt_street_name                           0
rpt_street_sfx                        65860
crash_speed_limit                         0
road_constr_zone_fl                       0
crash_sev_id                              0
sus_serious_injry_cnt                     0
nonincap_injry_cnt                        0
poss_injry_cnt                            0
non_injry_cnt                             0
unkn_injry_cnt                            0
tot_injry_cnt                             0
death_cnt                                 0
units_involved                            0
motor_vehicle_death_count                 0
motor_vehicle_serious_injury_count        0
bicycle_death_count                       0
bicycle_serious_injury_count              0
pedestrian_death_count          

In [8]:
crash_df["crash_timestamp"] = pd.to_datetime(crash_df["crash_timestamp"])
crash_df["crash_timestamp_ct"] = pd.to_datetime(crash_df["crash_timestamp_ct"])


## Data Transformation

In [9]:
# Create dim_calendar
dim_calendar = pd.DataFrame()
dim_calendar["crash_date"] = crash_df["crash_timestamp"].dt.date
dim_calendar["crash_year"] = crash_df["crash_timestamp"].dt.year
dim_calendar["crash_month"] = crash_df["crash_timestamp"].dt.month
dim_calendar["crash_day"] = crash_df["crash_timestamp"].dt.day
dim_calendar["crash_hour"] = crash_df["crash_timestamp"].dt.hour
dim_calendar["crash_weekofyear"] = crash_df["crash_timestamp"].dt.isocalendar().week
dim_calendar["crash_quarter"] = crash_df["crash_timestamp"].dt.quarter
dim_calendar["is_weekend"] = crash_df["crash_timestamp"].dt.dayofweek >= 5
dim_calendar["is_holiday"] = False  # optional custom flag

# Map to readable string values
dim_calendar["weekend_label"] = dim_calendar["is_weekend"].map({True: "Weekend", False: "Weekday"})
dim_calendar["holiday_label"] = dim_calendar["is_holiday"].map({True: "Holiday", False: "Non-Holiday"})

# Optional: Map month number to month name
month_map = {
    1: "January", 2: "February", 3: "March", 4: "April",
    5: "May", 6: "June", 7: "July", 8: "August",
    9: "September", 10: "October", 11: "November", 12: "December"
}
dim_calendar["month_label"] = dim_calendar["crash_month"].map(month_map)

# Create Date_ID
dim_calendar["Date_ID"] = dim_calendar["crash_date"].astype(str).str.replace("-", "").astype(int)

# Drop duplicates
dim_calendar = dim_calendar.drop_duplicates(subset="Date_ID").reset_index(drop=True)

# Time-of-day mapping
def map_time_of_day(hour):
    if 5 <= hour < 12:
        return "Morning"
    elif 12 <= hour < 17:
        return "Afternoon"
    elif 17 <= hour < 21:
        return "Evening"
    else:
        return "Night"

dim_calendar["time_of_day"] = dim_calendar["crash_hour"].apply(map_time_of_day)

# Preview
dim_calendar.head()


Unnamed: 0,crash_date,crash_year,crash_month,crash_day,crash_hour,crash_weekofyear,crash_quarter,is_weekend,is_holiday,weekend_label,holiday_label,month_label,Date_ID,time_of_day
0,2014-02-14,2014,2,14,23,7,1,False,False,Weekday,Non-Holiday,February,20140214,Night
1,2014-03-06,2014,3,6,2,10,1,False,False,Weekday,Non-Holiday,March,20140306,Night
2,2014-01-08,2014,1,8,19,2,1,False,False,Weekday,Non-Holiday,January,20140108,Evening
3,2014-01-19,2014,1,19,11,3,1,True,False,Weekend,Non-Holiday,January,20140119,Morning
4,2014-02-27,2014,2,27,14,9,1,False,False,Weekday,Non-Holiday,February,20140227,Afternoon


In [10]:
# Select relevant columns
dim_location = crash_df[[
    "address_primary",
    "address_secondary",
    "rpt_block_num",
    "rpt_street_name",
    "rpt_street_sfx",
    "latitude",
    "longitude"
]].copy()

# Create a Location_ID based on unique combinations
dim_location["Location_ID"] = dim_location.astype(str).agg("|".join, axis=1).factorize()[0] + 1  # Start at 1

# Reorder columns so Location_ID comes first
dim_location = dim_location[[
    "Location_ID",
    "address_primary",
    "address_secondary",
    "rpt_block_num",
    "rpt_street_name",
    "rpt_street_sfx",
    "latitude",
    "longitude"
]]

dim_location["rpt_block_num"] = dim_location["rpt_block_num"].astype(str)


# Drop duplicates
dim_location = dim_location.drop_duplicates(subset="Location_ID").reset_index(drop=True)

# Preview
dim_location.head()


Unnamed: 0,Location_ID,address_primary,address_secondary,rpt_block_num,rpt_street_name,rpt_street_sfx,latitude,longitude
0,1,2200 NOT REPORTED,OLTORF ST,2200,NOT REPORTED,,30.236824,-97.739375
1,2,8600 N US 183 NB HWY,9000 METRIC BLVD,8600,US 183 NB,HWY,30.360368,-97.716673
2,3,2800 N LAMAR BLVD,SAN GABRIEL ST,2800,LAMAR,BLVD,30.244344,-97.781309
3,4,1800 LAVACA ST,W 18TH ST,1800,LAVACA,ST,30.280295,-97.740912
4,5,7500 NOT REPORTED,CONVICT HILL RD,7500,NOT REPORTED,,30.216696,-97.848713


In [11]:
# Select and rename relevant columns
dim_txDot = crash_df[[
    "onsys_fl",
    "private_dr_fl",
    "road_constr_zone_fl"
]].copy()

# Rename columns for clarity
dim_txDot.rename(columns={
    "onsys_fl": "on_txDot_highway",
    "private_dr_fl": "crash_on_private_drive",
    "road_constr_zone_fl": "construction_zone_crash"
}, inplace=True)

# Create TxDot_ID based on unique combinations
dim_txDot["TxDot_ID"] = dim_txDot.astype(str).agg("|".join, axis=1).factorize()[0] + 1

# Reorder so ID comes first
dim_txDot = dim_txDot[[
    "TxDot_ID",
    "on_txDot_highway",
    "crash_on_private_drive",
    "construction_zone_crash"
]]

# Drop duplicates
dim_txDot = dim_txDot.drop_duplicates(subset="TxDot_ID").reset_index(drop=True)

# Preview
dim_txDot.head()

Unnamed: 0,TxDot_ID,on_txDot_highway,crash_on_private_drive,construction_zone_crash
0,1,True,False,False
1,2,False,False,False
2,3,False,False,True
3,4,True,False,True


In [12]:
# Select relevant columns
dim_severity = crash_df[["crash_sev_id", "crash_fatal_fl"]].copy()

# Ensure consistency: crash_sev_id 5 should always have fatal flag as True
dim_severity.loc[dim_severity["crash_sev_id"] == 5, "crash_fatal_fl"] = True

# Map severity labels
severity_labels = {
    0: "Unknown",
    1: "No Injury",
    2: "Possible Injury",
    3: "Non-Incapacitating Injury",
    4: "Incapacitating Injury",
    5: "Fatal Injury"
}

dim_severity["severity_label"] = dim_severity["crash_sev_id"].map(severity_labels)

# Drop duplicates based on crash_sev_id (acts as primary key)
dim_severity = dim_severity.drop_duplicates(subset="crash_sev_id").reset_index(drop=True)

# Reorder columns (if needed)
dim_severity = dim_severity[["crash_sev_id", "severity_label", "crash_fatal_fl"]]

# Preview
dim_severity.head()



Unnamed: 0,crash_sev_id,severity_label,crash_fatal_fl
0,2,Possible Injury,False
1,3,Non-Incapacitating Injury,False
2,0,Unknown,False
3,5,Fatal Injury,True
4,1,No Injury,False


In [None]:
# Select columns for fact table
fact_crashes = crash_df[[
    "crash_id",
    "tot_injry_cnt",
    "death_cnt",
    "crash_speed_limit",
    "sus_serious_injry_cnt",
    "nonincap_injry_cnt",
    "poss_injry_cnt",
    "non_injry_cnt",
    "unkn_injry_cnt",
    "motor_vehicle_death_count",
    "motor_vehicle_serious_injury_count",
    "bicycle_death_count",
    "bicycle_serious_injury_count",
    "pedestrian_death_count",
    "pedestrian_serious_injury_count",
    "motorcycle_death_count",
    "motorcycle_serious_injury_count",
    "micromobility_death_count",
    "micromobility_serious_injury_count",
    "other_death_count",
    "other_serious_injury_count",
    "est_comp_cost_crash_based",
    "est_total_person_comp_cost"
]].copy()

# Add foreign keys

fact_crashes["Date_ID"] = crash_df["crash_timestamp"].dt.date.astype(str).str.replace("-", "").astype(int)

fact_crashes["TxDot_ID"] = crash_df[["road_constr_zone_fl", "onsys_fl", "private_dr_fl"]].astype(str).agg("|".join, axis=1).factorize()[0] + 1

fact_crashes["crash_sev_id"] = crash_df["crash_sev_id"]


fact_crashes["Location_ID"] = crash_df[[
    "address_primary", "address_secondary", "rpt_block_num",
    "rpt_street_name", "rpt_street_sfx", "latitude", "longitude"
]].astype(str).agg("|".join, axis=1).factorize()[0] + 1

# Preview
fact_crashes.head()


Unnamed: 0,crash_id,tot_injry_cnt,death_cnt,crash_speed_limit,sus_serious_injry_cnt,nonincap_injry_cnt,poss_injry_cnt,non_injry_cnt,unkn_injry_cnt,motor_vehicle_death_count,...,micromobility_death_count,micromobility_serious_injury_count,other_death_count,other_serious_injury_count,est_comp_cost_crash_based,est_total_person_comp_cost,Date_ID,TxDot_ID,severity_ID,Location_ID
0,2455,2,0,-1.0,0,1,1,1,0,0,...,0,0,0,0,250000,470000,20140214,1,2,1
1,3819,3,0,65.0,0,1,2,0,0,0,...,0,0,0,0,250000,650000,20140306,1,2,2
2,1295,4,0,35.0,0,0,4,1,0,0,...,0,0,0,0,200000,820000,20140108,1,3,3
3,1662,0,0,30.0,0,0,0,0,1,0,...,0,0,0,0,20000,20000,20140119,2,0,4
4,187,0,0,65.0,0,0,0,3,0,0,...,0,0,0,0,20000,60000,20140108,1,5,5


## Loading

In [54]:
from sqlalchemy import text

# Set chunk size
chunksize = 50000

# Define table uploads using chunked loops for ALL tables
table_mapping = {
    "dim_calendar": dim_calendar,
    "dim_location": dim_location,
    "dim_txdot": dim_txDot,
    "dim_severity": dim_severity,
    "fact_crashes": fact_crashes
}

# Drop all existing tables first to avoid schema or duplicate issues
with engine.connect() as conn:
    for table_name in table_mapping:
        conn.execute(text(f"DROP TABLE IF EXISTS {table_name};"))
        print(f"🗑️ Dropped table '{table_name}' if it existed.")

# Upload each table in chunks
for table_name, df in table_mapping.items():
    for i in range(0, df.shape[0], chunksize):
        chunk = df.iloc[i:i+chunksize]
        chunk.to_sql(table_name, engine, if_exists='append', index=False, method='multi')
        print(f"✅ Uploaded chunk {i//chunksize + 1} of '{table_name}'")


🗑️ Dropped table 'dim_calendar' if it existed.
🗑️ Dropped table 'dim_location' if it existed.
🗑️ Dropped table 'dim_txdot' if it existed.
🗑️ Dropped table 'dim_severity' if it existed.
🗑️ Dropped table 'fact_crashes' if it existed.
✅ Uploaded chunk 1 of 'dim_calendar'
✅ Uploaded chunk 1 of 'dim_location'
✅ Uploaded chunk 2 of 'dim_location'
✅ Uploaded chunk 3 of 'dim_location'
✅ Uploaded chunk 4 of 'dim_location'
✅ Uploaded chunk 5 of 'dim_location'
✅ Uploaded chunk 1 of 'dim_txdot'
✅ Uploaded chunk 1 of 'dim_severity'
✅ Uploaded chunk 1 of 'fact_crashes'
✅ Uploaded chunk 2 of 'fact_crashes'
✅ Uploaded chunk 3 of 'fact_crashes'
✅ Uploaded chunk 4 of 'fact_crashes'
✅ Uploaded chunk 5 of 'fact_crashes'
