## NYC Traffic Safety Analysis Using Spark SQL 



This project analyzes New York City's motor vehicle collision data to evaluate whether traffic safety has improved over time.  
Using Spark SQL and Scala, we retrieve collision records from the NYC Open Data API and compute safety ratios such as:

- **Pedestrian injuries vs. pedestrian deaths**
- **Total injuries vs. total deaths**

We also investigate safety trends within each borough to understand regional differences.


#  Load Collision Data from NYC Open Data

We use the Socrata API to load collision data containing:
- borough  
- crash date  
- number of pedestrians injured/killed  
- number of persons injured/killed  

Only records where at least one person was injured or killed are included.

#  Extract Year from Crash Date

The crash date is stored as a string (YYYY-MM-DD).  
We extract the year using a simple slicing UDF and create a new column `year`.

#  Citywide Safety Ratios

For each year, we compute:

- **pedestrian injuries / pedestrian deaths**  
- **total injuries / total deaths**  

When a denominator is zero, we use NULL to avoid division errors.

#  Safety Ratios by Borough

To understand regional differences, we compute the same ratios for each borough:

- BRONX  
- MANHATTAN  
- QUEENS  
- BROOKLYN  
- STATEN ISLAND  

We filter out invalid borough names before aggregating.

#  Final Results

We show:

- Citywide ratios by year  
- Borough-level ratios by year for each borough  

These metrics help evaluate whether NYCâ€™s traffic safety initiatives (speed limits, road redesign, etc.) are effective.



In [2]:

import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.sql.functions._

def GetJsonWebData(url: String): DataFrame ={
    //Grab the contents of the web page at url
    val rdd = spark.sparkContext.parallelize(scala.io.Source.fromURL(url).mkString :: Nil) 
    //Convert it into JSON (must be single line JSON)
    val df = spark.read.json(rdd)
    return df
}  


val selection =
  "borough,crash_date,number_of_pedestrians_injured," +
  "number_of_pedestrians_killed,number_of_persons_injured," +
  "number_of_persons_killed"

val where_clause =
  "number_of_persons_injured>0%20OR%20number_of_persons_killed>0"

val url =
  s"https://data.cityofnewyork.us/resource/h9gi-nx95.json?$$select=$selection&$$where=$where_clause&$$limit=500000"


val response = GetJsonWebData(url)



import org.apache.spark.sql.functions.udf

def slice_col(n: Int) =
udf(
  (value: String) => value.slice(0,n))
val df_year = response.withColumn("year", slice_col(4)($"crash_date"))



val df_proportions =
  df_year.groupBy("year")
    .agg(
      (sum("number_of_pedestrians_injured") /
        sum(when(col("number_of_pedestrians_killed") === 0, lit(null))
              .otherwise(col("number_of_pedestrians_killed")))
      ).alias("ped_inj_to_death"),

      (sum("number_of_persons_injured") /
        sum(when(col("number_of_persons_killed") === 0, lit(null))
              .otherwise(col("number_of_persons_killed")))
      ).alias("total_inj_to_death")
    )
    .orderBy("year")

val df_proportions_formatted =
  df_proportions.select(
    $"year",
    format_number($"ped_inj_to_death", 2).alias("ped_inj_to_death"),
    format_number($"total_inj_to_death", 2).alias("total_inj_to_death")
  )

// CODE FOR CALCULATING PROPORTIONS BY YEAR FOR EACH BOROUGH GOES HERE
val borough_list = List("BRONX","MANHATTAN","STATEN ISLAND","QUEENS","BROOKLYN")

val df_borough =df_year.filter($"borough".isin(borough_list: _*))
                
val df_borough_prop =
  df_borough.groupBy("borough", "year")
    .agg(
      (sum("number_of_pedestrians_injured") /
        sum(when(col("number_of_pedestrians_killed") === 0, lit(null))
              .otherwise(col("number_of_pedestrians_killed")))
      ).alias("ped_inj_to_death"),

      (sum("number_of_persons_injured") /
        sum(when(col("number_of_persons_killed") === 0, lit(null))
              .otherwise(col("number_of_persons_killed")))
      ).alias("total_inj_to_death")
    )
    .orderBy("borough", "year")

val df_borough_prop_formatted =
  df_borough_prop.select(
    $"borough",
    $"year",
    format_number($"ped_inj_to_death", 2).alias("ped_inj_to_death"),
    format_number($"total_inj_to_death", 2).alias("total_inj_to_death")
  )
// EXTRACT DATA FOR EACH BOROUGH INTO A DATAFRAME
val manhattan     = df_borough_prop_formatted.filter($"borough" === "MANHATTAN")
val staten_island = df_borough_prop_formatted.filter($"borough" === "STATEN ISLAND")
val queens        = df_borough_prop_formatted.filter($"borough" === "QUEENS")
val bronx         = df_borough_prop_formatted.filter($"borough" === "BRONX")
val brooklyn      = df_borough_prop_formatted.filter($"borough" === "BROOKLYN")




df_proportions_formatted.show 

//For each year by borough

manhattan.show
staten_island.show
queens.show
bronx.show
brooklyn.show


+----+----------------+------------------+
|year|ped_inj_to_death|total_inj_to_death|
+----+----------------+------------------+
|2012|           82.03|            200.39|
|2013|           68.11|            185.62|
|2014|           82.98|            195.52|
|2015|           75.82|            211.35|
|2016|           74.42|            245.19|
|2017|           87.79|            236.94|
|2018|           90.41|            268.14|
|2019|           80.67|            251.60|
|2020|           66.25|            165.85|
|2021|           57.27|            174.94|
|2022|           66.96|            179.70|
|2023|           85.59|            197.20|
|2024|           75.67|            198.83|
|2025|          113.00|            283.89|
+----+----------------+------------------+

+---------+----+----------------+------------------+
|  borough|year|ped_inj_to_death|total_inj_to_death|
+---------+----+----------------+------------------+
|MANHATTAN|2012|           81.76|            184.19|
|MANHATTAN|20

selection = borough,crash_date,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_persons_injured,number_of_persons_killed
where_clause = number_of_persons_injured>0%20OR%20number_of_persons_killed>0
url = https://data.cityofnewyork.us/resource/h9gi-nx95.json?$select=borough,crash_date,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_persons_injured,number_of_persons_killed&$where=number_of_persons_injured>0%20OR%20number_of_persons_killed>0&$limit=500000
response = [borough: string, crash_date: string ... 4 more fields]


GetJsonWebData: (url: String)org.apache.spark.sql.DataFrame


[borough: string, crash_date: string ... 4 more fields]

# Summary

This Spark SQL project demonstrates:

- Retrieving and parsing API JSON data  
- Cleaning and filtering categorical fields  
- Creating UDFs for string manipulation  
- Aggregating metrics using Spark SQL functions  
- Formatting results for presentation  
- Comparing safety trends across NYC boroughs  

The results show year-over-year changes in pedestrian and total injury/death ratios, offering insight into NYC's traffic safety progress.
