In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Tạo SparkSession
spark = SparkSession.builder \
    .appName("USTrafficAccidents") \
    .getOrCreate()

In [2]:
# Đọc file CSV
file_path = "dataset/US_Accidents.csv"  
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [3]:
# Kiểm tra schema của dữ liệu
df.printSchema()
df.show(5)

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): integer (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Visibilit

In [4]:
null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])

# Hiển thị kết quả
null_counts.show()

+---+------+--------+----------+--------+---------+---------+-------+-------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twil

In [5]:
# Bỏ 2 cột End_lat và End_lng
df = df.drop("End_lat", "End_lng")

# Hiển thị DataFrame sau khi bỏ cột
df.show()

+---------+-------+--------+----------------+----------------+------------------+------------+------------------+--------------------+----------------+------------+----------------+-----+----------+-------+-----------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|       ID| Source|Severity|      Start_Time|        End_Time|         Start_Lat|   Start_Lng|      Distance(mi)|         Description|          Street|        City|          County|State|   Zipcode|Country|   Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|J

In [6]:
# Loại bỏ các hàng có Weather_Timestamp là NULL
df = df.filter(df["Weather_Timestamp"].isNotNull())
# Loại bỏ các hàng có Weather_Timestamp là NULL
df = df.na.drop(subset=["Weather_Timestamp"])

# Hiển thị DataFrame sau khi loại bỏ các hàng
df.show()

+---------+-------+--------+----------------+----------------+------------------+------------+------------------+--------------------+----------------+------------+----------------+-----+----------+-------+-----------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|       ID| Source|Severity|      Start_Time|        End_Time|         Start_Lat|   Start_Lng|      Distance(mi)|         Description|          Street|        City|          County|State|   Zipcode|Country|   Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|J

In [7]:
null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])

# Hiển thị kết quả
null_counts.show()

+---+------+--------+----------+--------+---------+---------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|
+---

In [8]:
df = df.withColumn("Weather_Timestamp", to_timestamp("Weather_Timestamp", "M/d/yyyy H:mm"))

df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): integer (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Wind_Direction: string (nullable = tru

In [9]:
def fill_date_for_missing(df):
    # Lấy ngày tháng năm từ Weather_Timestamp
    df = df.withColumn("Start_Date", date_format("Weather_Timestamp", "M/d/yyyy"))
    df = df.withColumn("End_Date", date_format("Weather_Timestamp", "M/d/yyyy"))

    # Thêm ngày tháng năm vào Start_Time và End_Time nếu thiếu
    df = df.withColumn(
        "Start_Time",
        when(
            col("Start_Time").rlike("^[0-9]{1,2}:[0-9]{2}(\.[0-9]+)?$"),
            concat(col("Start_Date"), lit(" 00:"), regexp_replace(col("Start_Time"), r":[0-9]{2}(\.[0-9]+)?$", ""))
        ).otherwise(col("Start_Time"))
    )

    df = df.withColumn(
        "End_Time",
        when(
            col("End_Time").rlike("^[0-9]{1,2}:[0-9]{2}(\.[0-9]+)?$"),
            concat(col("End_Date"), lit(" 00:"), regexp_replace(col("End_Time"), r":[0-9]{2}(\.[0-9]+)?$", ""))
        ).otherwise(col("End_Time"))
    )

    # Chuyển đổi cột Start_Time_Filled và End_Time_Filled thành kiểu timestamp
    df = df.withColumn("Start_Time", to_timestamp("Start_Time", "M/d/yyyy H:mm"))
    df = df.withColumn("End_Time", to_timestamp("End_Time", "M/d/yyyy H:mm"))
    
    return df

# Áp dụng hàm vào DataFrame
df = fill_date_for_missing(df)
# Bỏ 2 cột Start_Date và End_Date
df = df.drop("Start_Date", "End_Date")


In [10]:
# Xem một vài dòng dữ liệu
df.show(5)

+---------+-------+--------+-------------------+-------------------+-----------+------------+------------+--------------------+----------------+-----------+----------------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|       ID| Source|Severity|         Start_Time|           End_Time|  Start_Lat|   Start_Lng|Distance(mi)|         Description|          Street|       City|          County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exi

In [11]:
null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])

# Hiển thị kết quả
null_counts.show()

+---+------+--------+----------+--------+---------+---------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|
+---

In [12]:
# Điền giá trị cho các cột chuỗi
df = df.fillna({
    "Description": "No Description",
    "Street": "Unknown",
    "City": "Unknown",
    "Weather_Condition": "Unknown",
    "Wind_Direction": "Unknown"
})

# Điền giá trị cho các cột boolean
boolean_columns = [
    "Amenity", "Bump", "Crossing", "Give_Way", "Junction", "No_Exit",
    "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming",
    "Traffic_Signal", "Turning_Loop"
]
for col_name in boolean_columns:
    df = df.withColumn(col_name, when(col(col_name).isNull(), lit(False)).otherwise(col(col_name)))

# Điền giá trị cho các cột số liệu với giá trị trung bình
numeric_columns = [
    "Distance(mi)", "Temperature(F)", "Wind_Chill(F)",
    "Humidity(%)", "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)",
    "Precipitation(in)"
]
for col_name in numeric_columns:
    mean_value = df.select(mean(col(col_name))).collect()[0][0]
    df = df.withColumn(col_name, when(col(col_name).isNull(), lit(mean_value)).otherwise(col(col_name)))


In [13]:
# Điền giá trị dựa trên cột Start_Time
df = df.withColumn(
    "Sunrise_Sunset",
    when(hour("Start_Time").between(6, 18), "Day").otherwise("Night")
)

df = df.withColumn(
    "Civil_Twilight",
    when(hour("Start_Time").between(5, 19), "Day").otherwise("Night")
)
df = df.withColumn(
    "Nautical_Twilight",
    when(hour("Start_Time").between(4, 20), "Day").otherwise("Night")
)
df = df.withColumn(
    "Astronomical_Twilight",
    when(hour("Start_Time").between(3, 21), "Day").otherwise("Night")
)

null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])

# Hiển thị kết quả
null_counts.show()

+---+------+--------+----------+--------+---------+---------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|
+---

In [14]:
# Định dạng cột start_time
df = df.withColumn("Start_Time", date_format("Start_Time", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("End_Time", date_format("End_Time", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("Weather_Timestamp", date_format("Weather_Timestamp", "yyyy-MM-dd HH:mm:ss"))

In [15]:
output_path = "dataset/CleanData"  

# Xuất DataFrame ra file CSV
df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

In [16]:
# Kết thúc SparkSession
spark.stop()