# Uploading Ohio Crash Statistics to Snowflake - PySpark Version

### Background

Ohio vehicle crash data can be freely obtain [here](https://ohtrafficdata.dps.ohio.gov/CrashStatistics/Home).  The data will then be received via email in a zip file.  This notebook shows a local process which uses Python to unzip the zip file and open the relevant CSV file using PySpark.  The data is then cleaned up (removed unnecessary tab characters, properly formatted the dates, etc) and uploaded as a Snowflake table.

**NOTE:** Snowflake, as most cloud databases, do **NOT** actually enforce column constraint for uniqueness.  So beware of uploading the same data more than once.  Snowflake will not give you a warning, nor prevent the upserts from occurring.

#### Python library imports

In [1]:
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, to_timestamp
from pyspark.sql.types import IntegerType, FloatType
import configparser
import os
import pandas as pd
import pyspark
import zipfile

#### Obtaining Snowflake credentials and starting a local PySpark session with specified JDBC driver and Snowflake-Spark driver jar file paths

It is assumed that an environment variable `CONFIG_PATH` was already set.  It contains the file path to the `config.ini` text file

In [2]:
config_file = os.getenv("CONFIG_PATH")

In [3]:
config = configparser.ConfigParser()
try:
    config.read(config_file)
except ConfigFileNotFound:
    print("config.ini file not found")

To connect to Snowflake using JDBC driver, you need to download/obtain the JDBC driver jar file.  You also need to obtain the Snowflake-Spark driver jar file.  The jar files can be obtained [here](https://search.maven.org/search?q=g:net.snowflake).

In [4]:
sf_jdbc_driver = config['snowflake']['jdbc_driver_path']
sf_spark_driver = config['snowflake']['spark_driver_path']

In [5]:
spark = (
    SparkSession.builder.master("local[*]")
    .appName("Snowflake_JDBC")
    .config("spark.jars", f"{sf_jdbc_driver},{sf_spark_driver}")   # This is needed for "local" PySpark environment
    .getOrCreate()
)

#### Specifying the calendar month to choose which zip file to unzip, read in the crash stats csv file

In [16]:
MONTH = '06'

In [17]:
with zipfile.ZipFile(f"data/OH_2022-{MONTH}.zip", "r") as zip_ref:
    # extract all the contents of the zip file to the specified directory
    zip_ref.extractall(f"data/OH_2022-{MONTH}")

In [18]:
crash_stats_df = spark.read.csv(f"data/OH_2022-{MONTH}/CrashStatistics.csv", header=True, sep=",")

#### Data Clean Up: Formatting dates, integers, and removing unnecessary tab characters

Per this [doc](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html), the correct formatting is: "M/d/yyyy h:mm:ss a"

In [19]:
crash_stats_df = crash_stats_df.withColumn("CrashDateTime", to_timestamp("CrashDateTime","M/d/yyyy h:mm:ss a"))
crash_stats_df = crash_stats_df.withColumn("CrashReportedDateTime", to_timestamp("CrashReportedDateTime", "M/d/yyyy h:mm:ss a"))
crash_stats_df = crash_stats_df.withColumn("DispatchedDateTime", to_timestamp("DispatchedDateTime", "M/d/yyyy h:mm:ss a"))
crash_stats_df = crash_stats_df.withColumn("ArrivedDateTime", to_timestamp("ArrivedDateTime", "M/d/yyyy h:mm:ss a"))
crash_stats_df = crash_stats_df.withColumn("SceneClearedDateTime", to_timestamp("SceneClearedDateTime", "M/d/yyyy h:mm:ss a"))
crash_stats_df = crash_stats_df.withColumn("NumberOfUnits", crash_stats_df["NumberOfUnits"].cast(IntegerType()))
crash_stats_df = crash_stats_df.withColumn("NumberOfApproaches", crash_stats_df["NumberOfApproaches"].cast(IntegerType()))
crash_stats_df = crash_stats_df.withColumn("Latitude", crash_stats_df["Latitude"].cast(FloatType()))
crash_stats_df = crash_stats_df.withColumn("Longitude", crash_stats_df["Longitude"].cast(FloatType()))

Removing unnecessary tab characters

In [20]:
crash_stats_df = crash_stats_df.withColumn("LocalReportNumber", regexp_replace("LocalReportNumber", "\t", ""))
crash_stats_df = crash_stats_df.withColumn("ReportingAgencyNCIC", regexp_replace("ReportingAgencyNCIC", "\t", ""))

#### Confirming column data type / schema

In [21]:
crash_stats_df.printSchema()

root
 |-- LocalReportNumber: string (nullable = true)
 |-- DocumentNumber: string (nullable = true)
 |-- CrashSeverity: string (nullable = true)
 |-- HitSkip: string (nullable = true)
 |-- LocalInformation: string (nullable = true)
 |-- SecondaryCrash: string (nullable = true)
 |-- PhotosTaken: string (nullable = true)
 |-- OH2: string (nullable = true)
 |-- OH3: string (nullable = true)
 |-- OH1P: string (nullable = true)
 |-- OHOther: string (nullable = true)
 |-- PrivateProperty: string (nullable = true)
 |-- ReportingAgencyNCIC: string (nullable = true)
 |-- NumberOfUnits: integer (nullable = true)
 |-- UnitInError: string (nullable = true)
 |-- County: string (nullable = true)
 |-- InCityVillageTownship: string (nullable = true)
 |-- CityVillageTownshipName: string (nullable = true)
 |-- CrashDateTime: timestamp (nullable = true)
 |-- FIPSPlaceCode: string (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- LocationRouteType: strin

#### Upload cleaned data as a Snowflake table

Obtain Snowflake credentials from config.ini file

In [22]:
sf_account = config['snowflake']['account']
sf_user = config['snowflake']['username']
sf_database = config['snowflake']['database']
sf_schema = config['snowflake']['schema']
sf_role = config['snowflake']['role']
sf_warehouse = config['snowflake']['warehouse']
sf_authenticator = config['snowflake']['authenticator']

In [23]:
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

In [24]:
# Snowflake connection parameters
sfparams = {
    "sfURL" : f"{sf_account}.snowflakecomputing.com",
    "sfUser" : sf_user,
    "sfPassword" : "your_password",  # Not applicable when using externalbrowser authenticator
    "sfDatabase" : sf_database,
    "sfSchema" : "vehicle_crash_data",
    "sfRole" : sf_role,
    "sfWarehouse" : sf_warehouse,
    "sfAuthenticator" : sf_authenticator
}

Upload / write the PySpark dataframe as a Snowflake table, appending to table if it already exists

In [25]:
(crash_stats_df
 .write.format(SNOWFLAKE_SOURCE_NAME)
 .options(**sfparams)
 .option("dbtable", "oh_crash_statistics")
 .mode("append")
 .save()
)