In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from functools import reduce

Bronze Layer 

In [0]:

checkin_bronze = (
    spark.read
    .json("/Volumes/workspace/default/yelp-reviews/yelp_academic_dataset_checkin.json")
    .withColumn("_ingest_ts", current_timestamp())  # optional for dedup recency
)

checkin_bronze.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("bronzecheckin")

In [0]:
display(checkin_bronze)

Silver Layer

In [0]:
from pyspark.sql.functions import (
    col,
    split,
    explode,
    to_timestamp,
    to_date,
    coalesce,
    lit,
    sum,
    year,
    month
)

checkin_silver = (
    spark.table("bronzecheckin")
    .filter(col("business_id").isNotNull())
    .withColumn(
        "date_array",
        split(col("date"), ",")
    )
    .select(
        "business_id",
        explode(col("date_array")).alias("checkin_key")
    )
    .withColumn("checkin_ts", col("checkin_key").cast("timestamp"))
    .withColumn(
        "checkin_date",
        coalesce(
            to_date(col("checkin_key"), "yyyy-MM-dd"),
            to_date(col("checkin_ts"))
        )
    )
    .withColumn(
        "checkin_count",
        lit(1).cast("long")
    )
    .filter(col("checkin_count") >= 0)
    .groupBy("business_id", "checkin_date")
    .agg(
        sum("checkin_count").alias("checkin_count")
    )
    .withColumn("checkin_year", year(col("checkin_date")))
    .withColumn("checkin_month", month(col("checkin_date")))
)

In [0]:
from pyspark.sql.functions import col, to_date

display(
    checkin_silver.withColumn(
        "timestamp_column",
        to_date(col("timestamp_column"))
    )
)

In [0]:

checkin_silver.write.format("delta").mode("overwrite") \
.saveAsTable("silvercheckin")