In [0]:
from pyspark.sql.types import *

orders_schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("order_amount", DoubleType()),
    StructField("order_status", StringType()),
    StructField("event_time", TimestampType())
])

In [0]:
from pyspark.sql.functions import *
from schemas.orders_schema import orders_schema

df_bronze = spark.readStream.table("prod.bronze.orders_raw")

df_silver = (
    df_bronze
    .select(from_json(col("value"), orders_schema).alias("data"))
    .select("data.*")
    .filter("order_id IS NOT NULL")
    .withWatermark("event_time", "10 minutes")
    .dropDuplicates(["order_id"])
)

(
    df_silver.writeStream
        .format("delta")
        .option("checkpointLocation", "abfss://checkpoints/silver/orders")
        .outputMode("append")
        .toTable("prod.silver.orders_clean")
)