In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Step 1: Spark Session
spark = SparkSession.builder \
    .appName("US Accidents Big Data Analysis") \
    .getOrCreate()

# Step 2: Load Dataset
df = spark.read.csv("/content/US_Accidents_March23.csv", header=True, inferSchema=True)

# Step 3: Clean and Transform
df = df.dropna(subset=["Start_Time", "End_Time", "City", "State", "Severity"])
df = df.withColumn("Start_Time", to_timestamp("Start_Time")) \
       .withColumn("End_Time", to_timestamp("End_Time")) \
       .withColumn("Duration_Minutes",
                   (unix_timestamp("End_Time") - unix_timestamp("Start_Time"))/60)

# Step 4: Filter extreme durations
df = df.filter((col("Duration_Minutes") > 0) & (col("Duration_Minutes") < 600))

# Step 5: Group by State for accident count
state_accidents = df.groupBy("State") \
                    .agg(count("*").alias("Accident_Count")) \
                    .orderBy(desc("Accident_Count"))

# Step 6: Average duration by Severity
severity_duration = df.groupBy("Severity") \
                      .agg(avg("Duration_Minutes").alias("Avg_Duration_Min")) \
                      .orderBy("Severity")

# Step 7: Top cities with most accidents
top_cities = df.groupBy("City").count().orderBy(desc("count")).limit(10)

# Step 8: Time-of-day analysis
df = df.withColumn("Hour", hour("Start_Time"))
hourly_accidents = df.groupBy("Hour").count().orderBy("Hour")

# Show results
state_accidents.show()
severity_duration.show()
top_cities.show()
hourly_accidents.show()

# Stop Spark
spark.stop()


+-----+--------------+
|State|Accident_Count|
+-----+--------------+
|   CA|         73858|
|   FL|         22028|
|   TX|         13475|
|   VA|         11412|
|   NY|         10717|
|   PA|         10562|
|   NC|          8982|
|   MN|          8948|
|   SC|          8498|
|   GA|          8362|
|   AZ|          6080|
|   OR|          5812|
|   OH|          5017|
|   MI|          4964|
|   NJ|          4710|
|   IL|          4379|
|   CO|          4270|
|   MD|          3162|
|   LA|          2845|
|   CT|          2463|
+-----+--------------+
only showing top 20 rows

+--------+------------------+
|Severity|  Avg_Duration_Min|
+--------+------------------+
|       2|111.89258248336485|
|       4|132.02096805515075|
+--------+------------------+

+-----------+-----+
|       City|count|
+-----------+-----+
|Los Angeles| 5696|
|      Miami| 5227|
|     Dallas| 3260|
|    Atlanta| 3132|
|  San Diego| 2758|
| Sacramento| 2740|
|     Austin| 2729|
|  Charlotte| 2275|
|    Orlando| 2246|
|

In [3]:
# Big Data Analysis using PySpark on US Accidents Dataset

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Step 1: Create Spark Session
spark = SparkSession.builder \
    .appName("US Accidents Big Data Analysis") \
    .getOrCreate()

# Step 2: Load Dataset
# Make sure the CSV is downloaded from Kaggle and placed in your working directory
df = spark.read.csv("/content/US_Accidents_March23.csv", header=True, inferSchema=True)

# Step 3: Initial Exploration
print("Total Rows:", df.count())
print("Schema:")
df.printSchema()

# Step 4: Data Cleaning and Feature Engineering
df = df.dropna(subset=["Start_Time", "End_Time", "City", "State", "Severity"])
df = df.withColumn("Start_Time", to_timestamp("Start_Time")) \
       .withColumn("End_Time", to_timestamp("End_Time")) \
       .withColumn("Duration_Minutes",
                   (unix_timestamp("End_Time") - unix_timestamp("Start_Time"))/60)

df = df.filter((col("Duration_Minutes") > 0) & (col("Duration_Minutes") < 600))

# Step 5: Analysis - Accidents per State
accidents_by_state = df.groupBy("State") \
    .agg(count("ID").alias("Accident_Count")) \
    .orderBy(desc("Accident_Count"))

# Step 6: Analysis - Severity vs Duration
severity_duration = df.groupBy("Severity") \
    .agg(avg("Duration_Minutes").alias("Avg_Duration")) \
    .orderBy("Severity")

# Step 7: Analysis - Top Cities by Accident Count
top_cities = df.groupBy("City") \
    .agg(count("ID").alias("City_Accidents")) \
    .orderBy(desc("City_Accidents")) \
    .limit(10)

# Step 8: Time-of-Day Analysis
df = df.withColumn("Hour", hour("Start_Time"))
hourly_accidents = df.groupBy("Hour") \
    .agg(count("ID").alias("Accident_Count")) \
    .orderBy("Hour")

# Step 9: Show Results
print("\nAccidents by State:")
accidents_by_state.show()

print("\nAverage Duration by Severity:")
severity_duration.show()

print("\nTop 10 Cities with Most Accidents:")
top_cities.show()

print("\nAccidents by Hour of Day:")
hourly_accidents.show()

# Stop Spark
spark.stop()

Total Rows: 246633
Schema:
root
 |-- _c0: integer (nullable = true)
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature_Range(F): string (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double