In [0]:
%sql
select * from delta.`/mnt/bronze/multiplexbronzetable` where source='shipments';

In [0]:
df=spark.readStream.format('delta').load("/mnt/bronze/multiplexbronzetable").filter("source=='shipments'")

In [0]:
from pyspark.sql.types import *

shipment_schema = StructType([
    StructField("shipment_id", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("shipment_date", TimestampType(), True),
    StructField("expected_delivery_date", TimestampType(), True),
    StructField("carrier", StringType(), True),
    StructField("tracking_number", StringType(), True),
    StructField("shipment_status", StringType(), True),
    StructField("warehouse_location", StringType(), True),
    StructField("shipping_cost", DoubleType(), True),
    StructField("delivery_type", StringType(), True)
])

In [0]:
from pyspark.sql.functions import from_json,cast,col

parsed_df=df.withColumn('payload',from_json(col('raw_payload').cast('String'),shipment_schema)).select('payload.*','ingesttimestamp')
deduped_df=parsed_df\
        .withWatermark("ingesttimestamp", "10 hours")\
        .dropDuplicates(["shipment_id"])


In [0]:
display(deduped_df)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS shipments_silver (
  shipment_id STRING,
  order_id STRING,
  shipment_date TIMESTAMP,
  expected_delivery_date TIMESTAMP,
  carrier STRING,
  tracking_number STRING,
  shipment_status STRING,
  warehouse_location STRING,
  shipping_cost DOUBLE,
  delivery_type STRING,
  ingesttimestamp TIMESTAMP
)
USING DELTA
PARTITIONED BY (ingesttimestamp)
LOCATION '/mnt/silver/shipments_silver';

In [0]:
deduped_df.writeStream\
    .format("delta")\
        .option("checkpointLocation", "/mnt/checkpoints/silver/shipments")\
        .option("mergeSchema",'true')\
            .partitionBy('ingesttimestamp')\
                .table('shipments_silver')

In [0]:
%sql
select * from shipments_silver;