In [0]:
## Loading Raw Tables

from pyspark.sql.functions import col, when, to_date, year, month, date_format, floor

df_raw = spark.table("crime_final_cluster.default.crime_data_2020_to_present")
df_raw.show(5)
df_raw.printSchema()


+---------+--------------------+--------------------+--------+----+-----------+-----------+--------+------+--------------------+--------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------+--------+--------------------+--------------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|AREA|  AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|         Crm Cd Desc|             Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|         Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|        Cross Street|    LAT|      LON|
+---------+--------------------+--------------------+--------+----+-----------+-----------+--------+------+--------------------+--------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+-----

In [0]:
## Rename columns to simpler names

df = (
    df_raw
    .withColumnRenamed("Date Rptd", "date_reported")
    .withColumnRenamed("DATE OCC", "date_occ")
    .withColumnRenamed("TIME OCC", "time_occ")
    .withColumnRenamed("AREA NAME", "area_name")
    .withColumnRenamed("Rpt Dist No", "rpt_dist_no")
    .withColumnRenamed("Part 1-2", "part_1_2")
    .withColumnRenamed("Crm Cd", "crm_cd")
    .withColumnRenamed("Crm Cd Desc", "crm_cd_desc")
    .withColumnRenamed("Vict Age", "vict_age")
    .withColumnRenamed("Vict Sex", "vict_sex")
    .withColumnRenamed("Vict Descent", "vict_descent")
    .withColumnRenamed("Premis Cd", "premis_cd")
    .withColumnRenamed("Premis Desc", "premis_desc")
    .withColumnRenamed("Weapon Used Cd", "weapon_used_cd")
    .withColumnRenamed("Weapon Desc", "weapon_desc")
    .withColumnRenamed("Status Desc", "status_desc")
    .withColumnRenamed("Crm Cd 1", "crm_cd_1")
    .withColumnRenamed("Crm Cd 2", "crm_cd_2")
    .withColumnRenamed("Crm Cd 3", "crm_cd_3")
    .withColumnRenamed("Crm Cd 4", "crm_cd_4")
    .withColumnRenamed("Cross Street", "cross_street")
)
df.show(5)
df.printSchema()


+---------+--------------------+--------------------+--------+----+-----------+-----------+--------+------+--------------------+--------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------+--------+--------------------+--------------------+-------+---------+
|    DR_NO|       date_reported|            date_occ|time_occ|AREA|  area_name|rpt_dist_no|part_1_2|crm_cd|         crm_cd_desc|             Mocodes|vict_age|vict_sex|vict_descent|premis_cd|         premis_desc|weapon_used_cd|         weapon_desc|Status| status_desc|crm_cd_1|crm_cd_2|crm_cd_3|crm_cd_4|            LOCATION|        cross_street|    LAT|      LON|
+---------+--------------------+--------------------+--------+----+-----------+-----------+--------+------+--------------------+--------------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+-----

In [0]:
from pyspark.sql.functions import (
    col,
    when,
    year,
    month,
    date_format,
    floor,
    try_to_date
)

df_clean = (
    df
    # parse dates safely
    .withColumn(
        "date_occ",
        try_to_date(
            col("date_occ"),
            "MM/dd/yyyy"
        )
    )
    .withColumn(
        "date_reported",
        try_to_date(
            col("date_reported"),
            "MM/dd/yyyy"
        )
    )
    # numeric casts
    .withColumn("time_occ", col("time_occ").cast("int"))
    .withColumn("AREA", col("AREA").cast("int"))
    .withColumn("rpt_dist_no", col("rpt_dist_no").cast("int"))
    .withColumn("crm_cd", col("crm_cd").cast("int"))
    .withColumn("vict_age", col("vict_age").cast("int"))
    .withColumn("premis_cd", col("premis_cd").cast("int"))
    .withColumn("weapon_used_cd", col("weapon_used_cd").cast("int"))
    .withColumn("LAT", col("LAT").cast("double"))
    .withColumn("LON", col("LON").cast("double"))
)

df_clean = df_clean.withColumn(
    "occ_hour",
    floor(col("time_occ") / 100)
)

df_clean = (
    df_clean
    .withColumn("occ_year", year(col("date_occ")))
    .withColumn("occ_month", month(col("date_occ")))
    .withColumn("occ_day_of_week", date_format(col("date_occ"), "E"))
)

df_clean = df_clean.withColumn(
    "age_group",
    when(col("vict_age").isNull(), "Unknown")
    .when(col("vict_age") < 0, "Unknown")
    .when(col("vict_age") < 18, "Child")
    .when(col("vict_age") < 30, "Young Adult")
    .when(col("vict_age") < 50, "Adult")
    .when(col("vict_age") < 70, "Middle-aged")
    .otherwise("Senior")
)

display(df_clean)
df_clean.printSchema()

DR_NO,date_reported,date_occ,time_occ,AREA,area_name,rpt_dist_no,part_1_2,crm_cd,crm_cd_desc,Mocodes,vict_age,vict_sex,vict_descent,premis_cd,premis_desc,weapon_used_cd,weapon_desc,Status,status_desc,crm_cd_1,crm_cd_2,crm_cd_3,crm_cd_4,LOCATION,cross_street,LAT,LON,occ_hour,occ_year,occ_month,occ_day_of_week,age_group
190326475,,,2130,7,Wilshire,784,1,510,VEHICLE - STOLEN,,0,M,O,101,STREET,,,AA,Adult Arrest,510,998.0,,,1900 S LONGWOOD AV,,34.0375,-118.3506,21,,,,Child
200106753,,,1800,1,Central,182,1,330,BURGLARY FROM VEHICLE,1822 1402 0344,47,M,O,128,BUS STOP/LAYOVER (ALSO QUERY 124),,,IC,Invest Cont,330,998.0,,,1000 S FLOWER ST,,34.0444,-118.2628,18,,,,Adult
200320258,,,1700,3,Southwest,356,1,480,BIKE - STOLEN,0344 1251,19,X,X,502,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",,,IC,Invest Cont,480,,,,1400 W 37TH ST,,34.021,-118.3002,17,,,,Young Adult
200907217,,,2037,9,Van Nuys,964,1,343,SHOPLIFTING-GRAND THEFT ($950.01 & OVER),0325 1501,19,M,O,405,CLOTHING STORE,,,IC,Invest Cont,343,,,,14000 RIVERSIDE DR,,34.1576,-118.4387,20,,,,Young Adult
220614831,,,1200,6,Hollywood,666,2,354,THEFT OF IDENTITY,1822 1501 0930 2004,28,M,H,102,SIDEWALK,,,IC,Invest Cont,354,,,,1900 TRANSIENT,,34.0944,-118.3277,12,,,,Young Adult
231808869,,,2300,18,Southeast,1826,2,354,THEFT OF IDENTITY,1822 0100 0930 0929,41,M,H,501,SINGLE FAMILY DWELLING,,,IC,Invest Cont,354,,,,9900 COMPTON AV,,33.9467,-118.2463,23,,,,Adult
230110144,,,900,1,Central,182,2,354,THEFT OF IDENTITY,0930 0929,25,M,H,502,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",,,IC,Invest Cont,354,,,,1100 S GRAND AV,,34.0415,-118.262,9,,,,Young Adult
220314085,,,1110,3,Southwest,303,2,354,THEFT OF IDENTITY,0100,27,F,B,248,CELL PHONE STORE,,,IC,Invest Cont,354,,,,2500 S SYCAMORE AV,,34.0335,-118.3537,11,,,,Young Adult
231309864,,,1400,13,Newton,1375,2,354,THEFT OF IDENTITY,0100,24,F,B,750,CYBERSPACE,,,IC,Invest Cont,354,,,,1300 E 57TH ST,,33.9911,-118.2521,14,,,,Young Adult
211904005,,,1220,19,Mission,1974,2,624,BATTERY - SIMPLE ASSAULT,0416,26,M,H,502,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",400.0,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",IC,Invest Cont,624,,,,9000 CEDROS AV,,34.2336,-118.4535,12,,,,Young Adult


root
 |-- DR_NO: long (nullable = true)
 |-- date_reported: date (nullable = true)
 |-- date_occ: date (nullable = true)
 |-- time_occ: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- area_name: string (nullable = true)
 |-- rpt_dist_no: integer (nullable = true)
 |-- part_1_2: long (nullable = true)
 |-- crm_cd: integer (nullable = true)
 |-- crm_cd_desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- vict_age: integer (nullable = true)
 |-- vict_sex: string (nullable = true)
 |-- vict_descent: string (nullable = true)
 |-- premis_cd: integer (nullable = true)
 |-- premis_desc: string (nullable = true)
 |-- weapon_used_cd: integer (nullable = true)
 |-- weapon_desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- status_desc: string (nullable = true)
 |-- crm_cd_1: long (nullable = true)
 |-- crm_cd_2: long (nullable = true)
 |-- crm_cd_3: long (nullable = true)
 |-- crm_cd_4: long (nullable = true)
 |-- LOCATION: string (n

In [0]:
df_clean.write.mode("overwrite").saveAsTable("crime_final_cluster.default.clean_crime")