In [31]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("HadoopDataFetch") \
    .getOrCreate()

# Read the CSV files from Hadoop HDFS
crashes_df = spark.read.csv("hdfs://localhost:9000/user/input/crash_vehicles/crashes.csv", header=True, inferSchema=True)
vehicles_df = spark.read.csv("hdfs://localhost:9000/user/input/crash_vehicles/vehicles.csv", header=True, inferSchema=True)

# Display the schema of the dataframes
crashes_df.printSchema()
vehicles_df.printSchema()

# Show a sample of the data
crashes_df.show(5)
vehicles_df.show(5)


root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: strin

In [32]:
# Rename the COLLISION_ID column in the vehicles_df
vehicles_df = vehicles_df.withColumnRenamed("COLLISION_ID", "VEH_COLLISION_ID")

# Join the DataFrames on COLLISION_ID and VEH_COLLISION_ID
joined_df = crashes_df.join(vehicles_df, crashes_df["COLLISION_ID"] == vehicles_df["VEH_COLLISION_ID"])

# Show the schema of the joined DataFrame
joined_df.printSchema()

# Show a sample of the joined data
joined_df.show(5)

root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: strin

In [33]:
#  Analyze the Top 5 Boroughs with the Highest Average Number of Persons Injured Per Crash

from pyspark.sql.functions import col, avg, desc

# Calculate the average number of persons injured per crash for each borough
avg_injuries_per_borough = joined_df.groupBy("BOROUGH") \
    .agg(avg(col("NUMBER OF PERSONS INJURED")).alias("avg_persons_injured")) \
    .orderBy(desc("avg_persons_injured")) \
    .limit(5)

avg_injuries_per_borough.show()



+-------------+-------------------+
|      BOROUGH|avg_persons_injured|
+-------------+-------------------+
|         NULL| 0.3718370738818704|
|     BROOKLYN| 0.3320651962752386|
|        BRONX| 0.3257748910630601|
|       QUEENS| 0.3019928305539296|
|STATEN ISLAND|0.29832209244941604|
+-------------+-------------------+



In [34]:
# Identify the Top 5 Vehicle Types with the Highest Frequency of Crashes Involving Fatalities

from pyspark.sql.functions import sum

# Filter the joined dataframe to include only crashes with fatalities
fatal_crashes = joined_df.filter((col("NUMBER OF PERSONS KILLED") > 0) | 
                                 (col("NUMBER OF PEDESTRIANS KILLED") > 0) |
                                 (col("NUMBER OF CYCLIST KILLED") > 0) |
                                 (col("NUMBER OF MOTORIST KILLED") > 0))

# Group by vehicle type and count the number of fatal crashes
top_fatal_vehicle_types = fatal_crashes.groupBy("VEHICLE_TYPE") \
    .count() \
    .orderBy(desc("count")) \
    .limit(5)

top_fatal_vehicle_types.show()


+--------------------+-----+
|        VEHICLE_TYPE|count|
+--------------------+-----+
|Station Wagon/Spo...| 1116|
|               Sedan| 1081|
|   PASSENGER VEHICLE|  584|
|SPORT UTILITY / S...|  360|
|          Motorcycle|  289|
+--------------------+-----+



In [35]:
# Determine the Top 5 Contributing Factors for Crashes Involving Pedestrians

from pyspark.sql.functions import count

# Filter the joined dataframe to include only crashes involving pedestrian injuries or fatalities
pedestrian_crashes = joined_df.filter((col("NUMBER OF PEDESTRIANS INJURED") > 0) | 
                                      (col("NUMBER OF PEDESTRIANS KILLED") > 0))

# Group by contributing factor and count the number of pedestrian crashes
top_contributing_factors_pedestrians = pedestrian_crashes.groupBy("CONTRIBUTING FACTOR VEHICLE 1") \
    .count() \
    .orderBy(desc("count")) \
    .limit(5)

top_contributing_factors_pedestrians.show()

+-----------------------------+-----+
|CONTRIBUTING FACTOR VEHICLE 1|count|
+-----------------------------+-----+
|                  Unspecified|44325|
|         Driver Inattentio...|24973|
|         Failure to Yield ...|22492|
|             Backing Unsafely| 5006|
|                         NULL| 4037|
+-----------------------------+-----+



In [None]:
# Find the Top 5 Days with the Highest Number of Crashes Involving Injuries

from pyspark.sql.functions import to_date

# Convert the CRASH DATE column to date type
joined_df = joined_df.withColumn("CRASH_DATE", to_date(col("CRASH DATE"), "MM/dd/yyyy"))

# Filter crashes involving injuries
injury_crashes = joined_df.filter(col("NUMBER OF PERSONS INJURED") > 0)

# Group by crash date and count the number of injury crashes
top_injury_crash_days = injury_crashes.groupBy("CRASH_DATE") \
    .count() \
    .orderBy(desc("count")) \
    .limit(5)

top_injury_crash_days.show()


In [None]:
# Analyze the Relationship Between Vehicle Occupants and the Number of Persons Injured

# Filter rows with non-null vehicle occupants and non-null number of persons injured
valid_data = joined_df.filter(col("VEHICLE_OCCUPANTS").isNotNull() & col("NUMBER OF PERSONS INJURED").isNotNull())

# Group by the number of vehicle occupants and calculate the average number of persons injured
occupants_injury_analysis = valid_data.groupBy("VEHICLE_OCCUPANTS") \
    .agg(avg(col("NUMBER OF PERSONS INJURED")).alias("avg_persons_injured")) \
    .orderBy(desc("avg_persons_injured"))

occupants_injury_analysis.show()


In [None]:
# Write the joined DataFrame to a CSV file locally
joined_df.repartition(1).write.csv("hadoop_joined.csv", header=True)

# Stop the Spark session
spark.stop()