In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import * 

In [0]:
order_details_schema = StructType([
    StructField("item_id", StringType(), True),
    StructField("item_name", StringType(), True),
    StructField("price", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("amount", IntegerType(), True)
])

order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("order_details", ArrayType(order_details_schema), True),
    StructField("branch", StringType(), True),
    StructField("total_order_amount", IntegerType(), True),
    StructField("mode_of_payment", StringType(), True)
])

In [0]:
df_silver_source = spark.readStream.table("hj_orders.bronze.orders_raw")

In [0]:
df_parsed = df_silver_source.select(
    from_json(col("value").cast("string"), order_schema).alias("data")
).select("data.*")

In [0]:
df_normalized = df_parsed.withColumn("item", explode(col("order_details")))

In [0]:
df_final = df_normalized.select(
    col("order_id"),
    col("timestamp").cast("timestamp"), # Convert string to proper timestamp
    col("branch"),
    col("total_order_amount"),
    col("mode_of_payment"),
    col("item.item_id").alias("item_id"),
    col("item.item_name").alias("item_name"),
    col("item.price").alias("unit_price"),
    col("item.quantity").alias("quantity"),
    col("item.amount").alias("line_item_amount")
)

In [0]:
%sql
USE CATALOG hj_orders;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS silver
MANAGED LOCATION 'abfss://processed@himalayanjavadl.dfs.core.windows.net/silver/';

In [0]:
%sql
CREATE EXTERNAL VOLUME IF NOT EXISTS silver.checkpoints
LOCATION 'abfss://processed@himalayanjavadl.dfs.core.windows.net/checkpoints/silver_ingest/';

In [0]:
df_silver_final = df_final.withColumn("ingestion_timestamp", current_timestamp())

In [0]:
checkpoint_silver = "/Volumes/hj_orders/silver/checkpoints/silver_normalization"

query = (df_silver_final.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_silver)
    .option("mergeSchema", "true")
    .outputMode("append")
    .toTable("hj_orders.silver.orders_processed"))

print("Ingestion Stream started! Waiting for first batch...")

In [0]:
display(df_silver_final)