In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, length

# Define data and schema
CollisionRecords = [
    "20160924_CollisionRecords.txt",
    "20170112_CollisionRecords.txt",
    "20180925_CollisionRecords.txt",
    "20201024_CollisionRecords.txt"
]

collision_schema = StructType([
    StructField("CASE_ID", StringType(), True),
    StructField("ACCIDENT_YEAR", IntegerType(), True),
    StructField("PROC_DATE", StringType(), True),
    StructField("JURIS", StringType(), True),
    StructField("COLLISION_DATE", StringType(), True),
    StructField("COLLISION_TIME", StringType(), True),
    StructField("OFFICER_ID", StringType(), True),
    StructField("REPORTING_DISTRICT", StringType(), True),
    StructField("DAY_OF_WEEK", IntegerType(), True),
    StructField("CHP_SHIFT", StringType(), True),
    StructField("POPULATION", IntegerType(), True),
    StructField("CNTY_CITY_LOC", StringType(), True),
    StructField("SPECIAL_COND", StringType(), True),
    StructField("BEAT_TYPE", StringType(), True),
    StructField("CHP_BEAT_TYPE", StringType(), True),
    StructField("CITY_DIVISION_LAPD", StringType(), True),
    StructField("CHP_BEAT_CLASS", StringType(), True),
    StructField("BEAT_NUMBER", StringType(), True),
    StructField("PRIMARY_RD", StringType(), True),
    StructField("SECONDARY_RD", StringType(), True),
    StructField("DISTANCE", IntegerType(), True),
    StructField("DIRECTION", StringType(), True),
    StructField("INTERSECTION", StringType(), True),
    StructField("LATITUDE", DoubleType(), True),
    StructField("LONGITUDE", DoubleType(), True),
])

# Start Spark session
spark = SparkSession.builder.appName("CollisionRecords").getOrCreate()

# Load the data
collision_df = spark.read.csv(CollisionRecords, schema=collision_schema, header=False).cache()

# Drop rows where CASE_ID matches the header
header = collision_df.first().asDict()
collision_df = collision_df.filter(col("CASE_ID") != header["CASE_ID"])

# Step 1: Display non-numerical columns to the user
non_numerical_columns = [f.name for f in collision_schema.fields if isinstance(f.dataType, StringType)]
print("Non-Numerical Columns: ", non_numerical_columns)

# Step 2: Manually specify columns to drop (modify this list for your needs)
columns_to_drop = ["CHP_SHIFT", "CITY_DIVISION_LAPD", "SPECIAL_COND", "CITY_DIVISION_LAPD", "CHP_BEAT_CLASS", "BEAT_NUMBER" ]  # Example input

# Drop the specified columns
collision_df = collision_df.drop(*columns_to_drop)

# Step 3: Drop columns with all null values
columns_with_all_nulls = [col_name for col_name in collision_df.columns if collision_df.filter(col(col_name).isNotNull()).count() == 0]
collision_df = collision_df.drop(*columns_with_all_nulls)

# Handle nulls in critical columns
collision_df = collision_df.fillna({
    "ACCIDENT_YEAR": 0,
    "DAY_OF_WEEK": 0,
    "POPULATION": 0,
    "DISTANCE": 0
})

# Ensure DAY_OF_WEEK values are valid
collision_df = collision_df.filter((col("DAY_OF_WEEK") >= 1) & (col("DAY_OF_WEEK") <= 7))

# Debug: Check for corrupted data
collision_df.filter(col("CASE_ID").rlike("[^a-zA-Z0-9]")).show(5)
collision_df.filter(length(col("CASE_ID")) > 50).show(5)

# Debug: Reduce dataset for testing
collision_df = collision_df.limit(10)

# Show cleaned data
print("Cleaned Data Info:")
collision_df.printSchema()
collision_df.show(10)


Non-Numerical Columns:  ['CASE_ID', 'PROC_DATE', 'JURIS', 'COLLISION_DATE', 'COLLISION_TIME', 'OFFICER_ID', 'REPORTING_DISTRICT', 'CHP_SHIFT', 'CNTY_CITY_LOC', 'SPECIAL_COND', 'BEAT_TYPE', 'CHP_BEAT_TYPE', 'CITY_DIVISION_LAPD', 'CHP_BEAT_CLASS', 'BEAT_NUMBER', 'PRIMARY_RD', 'SECONDARY_RD', 'DIRECTION', 'INTERSECTION']
+-------+-------------+---------+-----+--------------+--------------+----------+------------------+-----------+----------+-------------+---------+-------------+----------+------------+--------+---------+------------+
|CASE_ID|ACCIDENT_YEAR|PROC_DATE|JURIS|COLLISION_DATE|COLLISION_TIME|OFFICER_ID|REPORTING_DISTRICT|DAY_OF_WEEK|POPULATION|CNTY_CITY_LOC|BEAT_TYPE|CHP_BEAT_TYPE|PRIMARY_RD|SECONDARY_RD|DISTANCE|DIRECTION|INTERSECTION|
+-------+-------------+---------+-----+--------------+--------------+----------+------------------+-----------+----------+-------------+---------+-------------+----------+------------+--------+---------+------------+
| 449906|         2002| 20030

In [3]:


# Fix negative distance values 
collision_df = collision_df.filter(col("DISTANCE") >= 0)
collision_df.describe().show()



+-------+--------------------+------------------+--------------------+------------------+--------------------+------------------+----------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+------------------+------------------+------------------+-----------+--------------------+--------------------+------------------+---------+------------+--------+---------+
|summary|             CASE_ID|     ACCIDENT_YEAR|           PROC_DATE|             JURIS|      COLLISION_DATE|    COLLISION_TIME|OFFICER_ID|REPORTING_DISTRICT|       DAY_OF_WEEK|         CHP_SHIFT|        POPULATION|     CNTY_CITY_LOC|        SPECIAL_COND|         BEAT_TYPE|     CHP_BEAT_TYPE|CITY_DIVISION_LAPD|    CHP_BEAT_CLASS|BEAT_NUMBER|          PRIMARY_RD|        SECONDARY_RD|          DISTANCE|DIRECTION|INTERSECTION|LATITUDE|LONGITUDE|
+-------+--------------------+------------------+--------------------+------------------+---------------

In [4]:
# Filter rows where PRIMARY_RD or SECONDARY_RD contains "..."
invalid_rd_cases = collision_df.filter(
    (col("PRIMARY_RD").contains("...")) | (col("SECONDARY_RD").contains("..."))
)

invalid_rd_cases.show(truncate=False)



+-------+-------------+---------+-----+--------------+--------------+----------+------------------+-----------+---------+----------+-------------+------------+---------+-------------+------------------+--------------+-----------+----------+------------+--------+---------+------------+--------+---------+
|CASE_ID|ACCIDENT_YEAR|PROC_DATE|JURIS|COLLISION_DATE|COLLISION_TIME|OFFICER_ID|REPORTING_DISTRICT|DAY_OF_WEEK|CHP_SHIFT|POPULATION|CNTY_CITY_LOC|SPECIAL_COND|BEAT_TYPE|CHP_BEAT_TYPE|CITY_DIVISION_LAPD|CHP_BEAT_CLASS|BEAT_NUMBER|PRIMARY_RD|SECONDARY_RD|DISTANCE|DIRECTION|INTERSECTION|LATITUDE|LONGITUDE|
+-------+-------------+---------+-----+--------------+--------------+----------+------------------+-----------+---------+----------+-------------+------------+---------+-------------+------------------+--------------+-----------+----------+------------+--------+---------+------------+--------+---------+
+-------+-------------+---------+-----+--------------+--------------+----------+-----

In [5]:


# remove all "not stated - values" and null
filterCondition = (col(collision_df.columns[0]) != "-")
filterCondition = filterCondition | (col(collision_df.columns[0]) != None)


for c in collision_df.columns[1:]:
    filterCondition = filterCondition | (col(c) != "-")
    filterCondition = filterCondition | (col(c) != None)

filtered_df = collision_df.filter(filterCondition)
filtered_df.head(5)

[Row(CASE_ID='0100010101011401155', ACCIDENT_YEAR=2001, PROC_DATE='20010416', JURIS='0100', COLLISION_DATE='20010101', COLLISION_TIME='0114', OFFICER_ID='1155', REPORTING_DISTRICT='0', DAY_OF_WEEK=1, CHP_SHIFT='5', POPULATION=4, CNTY_CITY_LOC='0198', SPECIAL_COND='-', BEAT_TYPE='0', CHP_BEAT_TYPE='0', CITY_DIVISION_LAPD=' ', CHP_BEAT_CLASS='0', BEAT_NUMBER='073', PRIMARY_RD='DUBLIN BL', SECONDARY_RD='SCARLETT CT', DISTANCE=267, DIRECTION='W', INTERSECTION='N', LATITUDE=None, LONGITUDE=None),
 Row(CASE_ID='0100010103174503131', ACCIDENT_YEAR=2001, PROC_DATE='20010416', JURIS='0100', COLLISION_DATE='20010103', COLLISION_TIME='1745', OFFICER_ID='3131', REPORTING_DISTRICT='10', DAY_OF_WEEK=3, CHP_SHIFT='5', POPULATION=4, CNTY_CITY_LOC='0198', SPECIAL_COND='-', BEAT_TYPE='0', CHP_BEAT_TYPE='0', CITY_DIVISION_LAPD=' ', CHP_BEAT_CLASS='0', BEAT_NUMBER='073', PRIMARY_RD='DOUGHERTY RD', SECONDARY_RD='AMADOR VLY BL', DISTANCE=80, DIRECTION='N', INTERSECTION='N', LATITUDE=None, LONGITUDE=None),
 

In [6]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
encoded_and_index = ["INTERSECTION","SPECIAL_COND","CHP_BEAT_TYPE","BEAT_TYPE","DIRECTION","CHP_BEAT_CLASS","CHP_SHIFT"]

indexers = [StringIndexer(inputCol=c, outputCol=c+"_index") for c in encoded_and_index]
encoders = [OneHotEncoder(inputCol=c+"_index", outputCol=c+"_vec") for c in encoded_and_index]

filtered_df2 = filtered_df

for col_name in encoded_and_index:
    
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index")
    print(indexer)
    print(col_name)
    indexedDf = indexer.fit(filtered_df2).transform(filtered_df2)
   
    encoder = OneHotEncoder(inputCol=col_name + "_index", outputCol=col_name + "_vec")
    encoded_df = encoder.fit(indexedDf).transform(indexedDf)

encoded_df.show(truncate=False)

StringIndexer_bfda5106ea14
INTERSECTION
StringIndexer_745817e33d76
SPECIAL_COND
StringIndexer_829359bc01de
CHP_BEAT_TYPE
StringIndexer_f08fbf8d7f65
BEAT_TYPE
StringIndexer_7e8a3c59aee0
DIRECTION
StringIndexer_77f47662b3fb
CHP_BEAT_CLASS
StringIndexer_a76943035ea8
CHP_SHIFT
+-------------------+-------------+---------+-----+--------------+--------------+----------+------------------+-----------+---------+----------+-------------+------------+---------+-------------+------------------+--------------+-----------+---------------+---------------+--------+---------+------------+--------+---------+---------------+-------------+
|CASE_ID            |ACCIDENT_YEAR|PROC_DATE|JURIS|COLLISION_DATE|COLLISION_TIME|OFFICER_ID|REPORTING_DISTRICT|DAY_OF_WEEK|CHP_SHIFT|POPULATION|CNTY_CITY_LOC|SPECIAL_COND|BEAT_TYPE|CHP_BEAT_TYPE|CITY_DIVISION_LAPD|CHP_BEAT_CLASS|BEAT_NUMBER|PRIMARY_RD     |SECONDARY_RD   |DISTANCE|DIRECTION|INTERSECTION|LATITUDE|LONGITUDE|CHP_SHIFT_index|CHP_SHIFT_vec|
+---------------