## Bronze

In [0]:
import pyspark.pipelines as dp
import pyspark.sql.functions as sf
import re

@dp.table()
def cpl1_bronze():
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("/Volumes/workspace/damg7370_crimes/crimes_input/Crime_Data_from_2020_to_Present_20251117.csv")
    for column in df.columns:
        df = df.withColumnRenamed(column, re.sub(r'\s+', '_', column).lower())
    return df


## Silver

In [0]:
@dp.table()
@dp.expect_or_drop("crm_cd_nn", "crm_cd is not null")
@dp.expect_or_drop("rpt_dist_nn", "rpt_dist_no is not null")
def cpl2_silver():
    df = spark.readStream.table('cpl1_bronze')

    df = df.withColumn('date_rptd', sf.to_date(sf.col('date_rptd'), "yyyy MMM dd hh:mm:ss a"))
    df = df.withColumn('date_occ', sf.to_date(sf.col('date_occ'), "yyyy MMM dd hh:mm:ss a"))

    df = df.withColumn('crime_code', sf.explode(sf.array_compact(sf.array("crm_cd_1", "crm_cd_2", "crm_cd_3", "crm_cd_4"))))
    df = df.withColumn('location', sf.regexp_replace('location', r'\s+', ' '))
    df = df.withColumn('cross_street', sf.regexp_replace('cross_street', r'\s+', ' '))

    df = df.withColumn('last_updated', sf.current_timestamp())

    df = df.fillna(-1, ["weapon_used_cd"])
    df = df.fillna("UNK", ["status", "location", "cross_street"])
    df = df.fillna("X", ["vict_sex", "vict_descent"])

    return df

## Gold

### Crime codes

In [0]:
dp.create_streaming_table(
  name="dim_crime_code",
  schema="""
    crime_code_key bigint generated always as identity (start with 1 increment by 1),
    crime_code int,
    crime_code_desc string,
    last_updated timestamp
  """,
  table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def cpl3_gold_crime_code():
  df = spark.readStream.table('cpl2_silver')

  df = df.selectExpr(
    "crime_code",
    "case when crime_code = crm_cd then crm_cd_desc else null END AS crime_code_desc",
    "last_updated"
  )

  return df


dp.create_auto_cdc_flow(
  target="dim_crime_code",
  source="cpl3_gold_crime_code",
  keys=["crime_code"],
  sequence_by="last_updated",
  ignore_null_updates=True
)

### Status codes


In [0]:
dp.create_streaming_table(
  name="dim_status",
  schema="""
    status_key bigint generated always as identity (start with 1 increment by 1),
    status string,
    status_desc string,
    last_updated timestamp
  """,
  table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def cpl3_gold_status():
  df = spark.readStream.table('cpl2_silver')

  df = df.selectExpr(
    "status as status",
    "status_desc",
    "last_updated"
  )

  return df


dp.create_auto_cdc_flow(
  target="dim_status",
  source="cpl3_gold_status",
  keys=["status"],
  sequence_by="last_updated",
  ignore_null_updates=True
)

### Weapons

In [0]:
dp.create_streaming_table(
  name="dim_weapon",
  schema="""
    weapon_key bigint generated always as identity (start with 1 increment by 1),
    weapon_used_code int,
    weapon_desc string,
    last_updated timestamp
  """,
  table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def cpl3_gold_weapon():
  df = spark.readStream.table('cpl2_silver')

  df = df.selectExpr(
    "weapon_used_cd as weapon_used_code",
    "weapon_desc",
    "last_updated"
  )

  return df


dp.create_auto_cdc_flow(
  target="dim_weapon",
  source="cpl3_gold_weapon",
  keys=["weapon_used_code"],
  sequence_by="last_updated",
  ignore_null_updates=True
)

### Location

In [0]:
dp.create_streaming_table(
  name="dim_location",
  schema="""
    location_key bigint generated always as identity (start with 1 increment by 1),
    area_id int,
    area_name string,
    rpt_dist_no int,
    location_text string,
    cross_street string,
    last_updated timestamp
  """,
  table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def cpl3_gold_location():
  df = spark.readStream.table('cpl2_silver')

  df = df.selectExpr(
    "area as area_id",
    "area_name",
    "rpt_dist_no",
    "location as location_text",
    "cross_street",
    "last_updated"
  )

  return df


dp.create_auto_cdc_flow(
  target="dim_location",
  source="cpl3_gold_location",
  keys=["rpt_dist_no", "location_text", "cross_street"],
  sequence_by="last_updated",
  ignore_null_updates=True
)

### Fact

In [0]:
dp.create_streaming_table(
  name="fact_crime_incident",
  schema="""
    crime_incident_key bigint generated always as identity (start with 1 increment by 1),
    dr_no int,
    date_key int,
    time_occ int,
    location_key bigint,
    status_key bigint,
    crime_code_key bigint,
    weapon_key bigint,
    vict_age int,
    vict_sex string,
    vict_descent string,
    lat double,
    lon double,
    created_at timestamp
  """,
  table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.append_flow(
    target="fact_crime_incident",
    name="fact_crime_incident_flow"
)
def cpl3_gold_fact_crime_incident():
  df = spark.sql("""
    SELECT
        s.dr_no,
        cast(date_format(s.date_occ, 'yyyyMMdd') as int) as date_key,
        s.time_occ,
        l.location_key,
        st.status_key,
        cc.crime_code_key,
        w.weapon_key,
        s.vict_age,
        s.vict_sex,
        s.vict_descent,
        s.lat,
        s.lon,
        s.last_updated as created_at
    FROM STREAM(cpl2_silver) s
    LEFT JOIN dim_location l ON s.rpt_dist_no = l.rpt_dist_no AND s.location = l.location_text AND s.cross_street = l.cross_street
    LEFT JOIN dim_status st ON s.status = st.status
    LEFT JOIN dim_crime_code cc ON s.crime_code = cc.crime_code
    LEFT JOIN dim_weapon w ON s.weapon_used_cd = w.weapon_used_code
  """)
  return df