In [1]:
import os
os.environ["HADOOP_HOME"] = os.path.expanduser("~/hadoop")
os.environ["HADOOP_COMMON_HOME"] = os.environ["HADOOP_HOME"]
os.environ["PATH"] = os.environ["HADOOP_HOME"] + "/bin:" + os.environ["HADOOP_HOME"] + "/sbin:" + os.environ["PATH"]

In [2]:
# Read data
!hdfs dfs -mkdir -p /data/us_accidents/raw 
!hdfs dfs -put US_Accidents_March23_sampled_500k.csv /data/us_accidents/raw/

put: `/data/us_accidents/raw/US_Accidents_March23_sampled_500k.csv': File exists


In [6]:
import csv
import json
import math
from typing import Dict, Any, Optional, List
from pyspark.sql import SparkSession

def clean_us_accidents_spark_core(
    spark: SparkSession,
    hdfs_input_csv: str,
    hdfs_output_dir: str,
    min_partitions: int = 8,
) -> None:
    sc = spark.sparkContext
    
    US_STATES = {
    "AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA","KS","KY","LA",
    "ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC","ND","OH","OK",
    "OR","PA","RI","SC","SD","TN","TX","UT","VT","VA","WA","WV","WI","WY","DC"
}

    WEATHER_BOUNDS = {
        "Temperature(F)": (-70.0, 130.0),
        "Wind_Chill(F)": (-50.0, 130.0),
        "Humidity(%)": (0.0, 100.0),
        "Pressure(in)": (15.0, 32.0),
        "Visibility(mi)": (0.0, 100.0),
        "Wind_Speed(mph)": (0.0, 200.0),
        "Precipitation(in)": (0.0, 50.0),
    }

    COLS_TO_DROP = {"ID", "Source", "Zipcode", "Timezone", "Airport_Code", "End_Lat", "End_Lng"}

    WIND_DIR_MAP = {
        "VARIABLE": "VAR",
        "VAR": "VAR",
        "CALM": "CALM",
        "NORTH": "N",
        "SOUTH": "S",
        "EAST": "E",
        "WEST": "W",
    }

    TWILIGHT_COLS = ["Sunrise_Sunset","Civil_Twilight","Nautical_Twilight","Astronomical_Twilight"]

    BOOL_COLS = [
        "Amenity","Bump","Crossing","Give_Way","Junction","No_Exit","Railway",
        "Roundabout","Station","Stop","Traffic_Calming","Traffic_Signal","Turning_Loop"
    ]
    
    def to_float(x: Any) -> Optional[float]: 
        x = strip(x) 
        if x is None: return None 
        return float(x) 

    def to_int_round(x: Any) -> Optional[int]: 
        f = to_float(x) 
        if f is None: return None 
        return int(round(f))
    
    def strip(s: Any) -> Optional[str]:
        if s is None:
            return None
        s = str(s).strip()
        return s if s != "" else None

    def in_bounds(v: Optional[float], lo: float, hi: float) -> Optional[float]:
        if v is None:
            return None
        if v < lo or v > hi:
            return None
        return v


    def normalise_twilight(v: Any) -> str:
        v = strip(v)
        if v is None:
            return "Unknown"
        v = v.title()
        return v if v in ("Day", "Night") else "Unknown"

    def parse_bool(v: Any) -> Optional[bool]:
        if v is None:
            return None
        if isinstance(v, bool):
            return v
        s = strip(v)
        if s is None:
            return None
        s = s.lower()
        if s == "true":
            return True
        if s == "false":
            return False
        return None


    def clean_row(row: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        # Trim all strings
        for k, v in list(row.items()):
            if isinstance(v, str):
                row[k] = v.strip()

        # Remove rows with missing start time
        if row.get("Start_Time") is None:
            return None

        # Bound serverity
        if "Severity" in row:
            severity = to_int_round(row.get("Severity"))
            if severity is None or severity < 1 or severity > 4:
                return None 
            row["Severity"] = severity

        # Bound coordinates
        lat = in_bounds(to_float(row.get("Start_Lat")), -90.0, 90.0)
        lng = in_bounds(to_float(row.get("Start_Lng")), -180.0, 180.0)
        if lat is None or lng is None:
            return None
        
        # Remove rows with missing start_lat and start_lng
        row["Start_Lat"] = lat
        row["Start_Lng"] = lng

        # Filter states not in US_STATES
        if "State" in row:
            st = strip(row.get("State"))
            st = st.upper() if st else None
            row["State"] = st if (st in US_STATES) else None

        # Bound weather data
        for c, (lo, hi) in WEATHER_BOUNDS.items():
            if c in row:
                row[c] = in_bounds(to_float(row.get(c)), lo, hi)

        # Standarize wind direction
        if "Wind_Direction" in row:
            wd = strip(row.get("Wind_Direction"))
            wd = wd.upper() if wd else None
            if wd is None:
                row["Wind_Direction"] = None
            else:
                row["Wind_Direction"] = WIND_DIR_MAP.get(wd, wd)

        # Normalise twilight cols
        for c in TWILIGHT_COLS:
            if c in row:
                row[c] = normalise_twilight(row.get(c))

        # Assume missing booleans as false
        for c in BOOL_COLS:
            if c in row:
                b = parse_bool(row.get(c))
                row[c] = False if b is None else b

        # Drop redundant columns
        for c in COLS_TO_DROP:
            row.pop(c, None)

        return row

    # Read raw CSV lines from HDFS
    lines = sc.textFile(hdfs_input_csv, minPartitions=min_partitions)
    header = lines.first()

    # CSV parsing helper
    def parse_csv_line(line: str) -> List[str]:
        return next(csv.reader([line]))

    header_cols = parse_csv_line(header)
    data_lines = lines.filter(lambda x: x != header)
    rows = data_lines.map(parse_csv_line).map(lambda vals: dict(zip(header_cols, vals)))

    # Drop duplicated IDs
    def row_id(r: Dict[str, Any]) -> str:
        rid = strip(r.get("ID"))
        return rid if rid is not None else ""

    keyed = rows.map(lambda r: (row_id(r), r))
    deduped = keyed.reduceByKey(lambda a, b: a).values()

    # Clean rows
    cleaned = deduped.map(clean_row).filter(lambda r: r is not None)

    weather_cols = [c for c in WEATHER_BOUNDS.keys() if c in header_cols]

    if len(weather_cols) > 0:
        # Emit (col, value) for non-missing weather values
        def emit_weather_vals(r: Dict[str, Any]):
            for c in weather_cols:
                v = r.get(c)
                if v is None:
                    continue
                fv = float(v)
                if not math.isnan(fv):
                    yield (c, fv)

        col_to_vals = (
            cleaned
            .flatMap(emit_weather_vals)
            .groupByKey()
            .mapValues(lambda it: list(it))
            .collect()
        )

        # Compute medians
        medians = {}
        for c, vals in col_to_vals:
            if not vals:
                medians[c] = None
                continue
            vals.sort()
            n = len(vals)
            mid = n // 2
            if n % 2 == 1:
                medians[c] = vals[mid]
            else:
                medians[c] = (vals[mid - 1] + vals[mid]) / 2.0

        medians_bc = sc.broadcast(medians)

        # Fill missing values with median
        def fill_weather_medians(r: Dict[str, Any]) -> Dict[str, Any]:
            m = medians_bc.value
            for c in weather_cols:
                if r.get(c) is None and m.get(c) is not None:
                    r[c] = float(m[c])
            return r
    cleaned = cleaned.map(fill_weather_medians)
    
    # Write JSONL to HDFS 
    cleaned.map(lambda r: json.dumps(r, ensure_ascii=False)).saveAsTextFile(hdfs_output_dir)

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[*]") 
    .config("spark.python.worker.reuse", "true") 
    .getOrCreate()
)

# Please change input and output directory 
clean_us_accidents_spark_core(
    spark,
    hdfs_input_csv="hdfs://LAPTOP-5DR87JF2:9900/data/us_accidents/raw/US_Accidents_March23_sampled_500k.csv",
    hdfs_output_dir="hdfs://LAPTOP-5DR87JF2:9900/data/us_accidents/cleaned_dedup_jsonl",
    min_partitions=8,
)

                                                                                