# Using Medallion Architecture

In [0]:
df = spark.table("samples.tpch.orders")
df.printSchema()

root
 |-- o_orderkey: long (nullable = true)
 |-- o_custkey: long (nullable = true)
 |-- o_orderstatus: string (nullable = true)
 |-- o_totalprice: decimal(18,2) (nullable = true)
 |-- o_orderdate: date (nullable = true)
 |-- o_orderpriority: string (nullable = true)
 |-- o_clerk: string (nullable = true)
 |-- o_shippriority: integer (nullable = true)
 |-- o_comment: string (nullable = true)



In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BronzeLayer") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

df = spark.table("samples.tpch.orders")

# Create Bronze table
spark.sql("""
    CREATE TABLE IF NOT EXISTS retailsc_medallion.orders_bronze (
        o_orderkey LONG,
        o_custkey LONG,
        o_orderstatus STRING,
        o_totalprice DECIMAL(18,2),
        o_orderdate DATE,
        o_orderpriority STRING,
        o_clerk STRING,
        o_shippriority INTEGER,
        o_comment STRING,
        ingest_time TIMESTAMP
    )
    USING DELTA
    COMMENT 'Raw order data with ingestion timestamp.';
""")

# Insert data into Bronze table
bronze_orders = df.withColumn("ingest_time", F.current_timestamp())
bronze_orders.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("retailsc_medallion.orders_bronze")

# Expected result for one record
bronze_result = spark.sql("SELECT * FROM retailsc_medallion.orders_bronze LIMIT 1").collect()[0]
print("Bronze Layer Result (One Record):")
print(bronze_result)

Bronze Layer Result (One Record):
Row(o_orderkey=13710944, o_custkey=227285, o_orderstatus='O', o_totalprice=Decimal('162169.66'), o_orderdate=datetime.date(1995, 10, 11), o_orderpriority='1-URGENT', o_clerk='Clerk#000000432', o_shippriority=0, o_comment='accounts. ruthlessly regular accounts alongside of the car', ingest_time=datetime.datetime(2025, 2, 15, 11, 29, 25, 534388))


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SilverLayer") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

# Create Silver table
spark.sql("""
    CREATE TABLE IF NOT EXISTS retailsc_medallion.orders_silver (
        o_orderkey LONG,
        o_custkey LONG,
        order_status STRING,
        o_totalprice DECIMAL(18,2),
        o_orderdate DATE,
        o_orderpriority STRING,
        o_clerk STRING,
        o_shippriority INTEGER,
        o_comment STRING,
        ingest_time TIMESTAMP
    )
    USING DELTA
    COMMENT 'Cleaned and normalized order data.';
""")

# Clean and transform data for Silver layer
silver_orders = spark.table("retailsc_medallion.orders_bronze") \
    .filter(F.col("o_totalprice") > 0) \
    .withColumn("order_status", 
        F.when(F.col("o_orderstatus") == "F", "Fulfilled")
         .when(F.col("o_orderstatus") == "O", "Open")
         .otherwise("Unknown")
    )

# Insert data into Silver table
silver_orders.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("retailsc_medallion.orders_silver")

# Expected result for one record
silver_result = spark.sql("SELECT * FROM retailsc_medallion.orders_silver LIMIT 1").collect()[0]
print("Silver Layer Result (One Record):")
print(silver_result)

Silver Layer Result (One Record):
Row(o_orderkey=11396166, o_custkey=179329, order_status='Open', o_totalprice=Decimal('187683.10'), o_orderdate=datetime.date(1996, 6, 22), o_orderpriority='4-NOT SPECIFIED', o_clerk='Clerk#000004689', o_shippriority=0, o_comment='ep fluffily regular packages. regular, final courts ag', ingest_time=datetime.datetime(2025, 2, 15, 11, 29, 25, 534388), o_orderstatus='O')


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CopperLayer") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

# Create Copper table
spark.sql("""
    CREATE TABLE IF NOT EXISTS retailsc_medallion.orders_copper (
        order_key LONG,
        customer_key LONG,
        order_status STRING,
        total_price DECIMAL(18,2),
        order_date DATE,
        order_priority STRING,
        clerk STRING,
        ship_priority INTEGER,
        comment STRING,
        ingest_time TIMESTAMP,
        order_year INTEGER,
        order_month INTEGER,
        order_day INTEGER,
        order_value_segment STRING
    )
    USING DELTA
    COMMENT 'Advanced transformations including derived fields for time-based analysis and segmentation.';
""")

# Transform data for Copper layer
copper_orders = spark.table("retailsc_medallion.orders_silver") \
    .withColumnRenamed("o_orderkey", "order_key") \
    .withColumnRenamed("o_custkey", "customer_key") \
    .withColumnRenamed("o_totalprice", "total_price") \
    .withColumnRenamed("o_orderdate", "order_date") \
    .withColumnRenamed("o_orderpriority", "order_priority") \
    .withColumnRenamed("o_clerk", "clerk") \
    .withColumnRenamed("o_shippriority", "ship_priority") \
    .withColumnRenamed("o_comment", "comment") \
    .withColumn("order_year", F.year("order_date")) \
    .withColumn("order_month", F.month("order_date")) \
    .withColumn("order_day", F.dayofweek("order_date")) \
    .withColumn("order_value_segment", 
        F.when(F.col("total_price") > 500, "High")
         .when(F.col("total_price") > 100, "Medium")
         .otherwise("Low")
    )

# Insert data into Copper table
copper_orders.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("retailsc_medallion.orders_copper")

# Expected result for one record
copper_result = spark.sql("SELECT * FROM retailsc_medallion.orders_copper LIMIT 1").collect()[0]
print("Copper Layer Result (One Record):")
print(copper_result)

Copper Layer Result (One Record):
Row(order_key=5611649, customer_key=687736, order_status='Open', total_price=Decimal('51905.72'), order_date=datetime.date(1996, 6, 4), order_priority='5-LOW', clerk='Clerk#000002954', ship_priority=0, comment='ls! bold, regular accounts ', ingest_time=datetime.datetime(2025, 2, 15, 11, 29, 25, 534388), order_year=1996, order_month=6, order_day=3, order_value_segment='High', o_orderstatus='O')


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GoldLayer") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

# Daily Sales Aggregation
spark.sql("""
    CREATE TABLE IF NOT EXISTS retailsc_medallion.orders_gold_daily_sales (
        order_year INTEGER,
        order_month INTEGER,
        order_day INTEGER,
        total_sales DECIMAL(18,2),
        order_count BIGINT
    )
    USING DELTA
    COMMENT 'Aggregates daily sales for BI reporting.';
""")

gold_daily_sales = spark.table("retailsc_medallion.orders_copper").groupBy("order_year", "order_month", "order_day") \
    .agg(
        F.sum("total_price").cast("decimal(18,2)").alias("total_sales"),
        F.count("order_key").cast("bigint").alias("order_count")
    )

gold_daily_sales.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("retailsc_medallion.orders_gold_daily_sales")

# Customer Segmentation
spark.sql("""
    CREATE TABLE IF NOT EXISTS retailsc_medallion.orders_gold_customer_segment (
        customer_key LONG,
        order_year INTEGER,
        order_month INTEGER,
        order_value_segment STRING,
        orders_per_segment BIGINT,
        avg_order_value DECIMAL(18,2)
    )
    USING DELTA
    COMMENT 'Aggregates customer orders for segmentation analysis.';
""")

gold_customer_segment = spark.table("retailsc_medallion.orders_copper").groupBy("customer_key", "order_year", "order_month", "order_value_segment") \
    .agg(
        F.count("order_key").cast("bigint").alias("orders_per_segment"),
        F.avg("total_price").cast("decimal(18,2)").alias("avg_order_value")
    )

gold_customer_segment.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("retailsc_medallion.orders_gold_customer_segment")

# Expected results for one record each
gold_daily_result = spark.sql("SELECT * FROM retailsc_medallion.orders_gold_daily_sales LIMIT 1").collect()[0]
gold_customer_result = spark.sql("SELECT * FROM retailsc_medallion.orders_gold_customer_segment LIMIT 1").collect()[0]

print("Gold Layer Daily Sales Result (One Record):")
print(gold_daily_result)
print("Gold Layer Customer Segmentation Result (One Record):")
print(gold_customer_result)

Gold Layer Daily Sales Result (One Record):
Row(order_year=1995, order_month=12, order_day=4, total_sales=Decimal('1880689027.98'), order_count=12450)
Gold Layer Customer Segmentation Result (One Record):
Row(customer_key=183637, order_year=1994, order_month=1, order_value_segment='High', orders_per_segment=1, avg_order_value=Decimal('163460.83'))


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PlatinumLayer") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

# If the focus is on customer behavior or order attributes without product specifics:
spark.sql("""
    CREATE TABLE IF NOT EXISTS retailsc_medallion.orders_platinum_customer_insights (
        customer_key LONG,
        order_year INTEGER,
        order_month INTEGER,
        order_value_segment STRING,
        orders_per_segment BIGINT,
        avg_order_value DECIMAL(18,2),
        total_purchase_value DECIMAL(18,2)
    )
    USING DELTA
    COMMENT 'Specialized dataset for customer insights, focusing on behavior and purchase patterns.';
""")

# Explicitly cast the total_purchase_value to ensure type consistency
platinum_customer_insights = spark.table("retailsc_medallion.orders_gold_customer_segment") \
    .withColumn("total_purchase_value", (F.col("avg_order_value") * F.col("orders_per_segment")).cast("decimal(18,2)"))

# Ensure all columns match the schema before writing
platinum_customer_insights = platinum_customer_insights.select(
    F.col("customer_key"),
    F.col("order_year"),
    F.col("order_month"),
    F.col("order_value_segment"),
    F.col("orders_per_segment").cast("bigint"),
    F.col("avg_order_value").cast("decimal(18,2)"),
    F.col("total_purchase_value").cast("decimal(18,2)")
)

platinum_customer_insights.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("retailsc_medallion.orders_platinum_customer_insights")

# Expected result for one record
platinum_result = spark.sql("SELECT * FROM retailsc_medallion.orders_platinum_customer_insights LIMIT 1").collect()[0]
print("Platinum Layer Customer Insights Result (One Record):")
print(platinum_result)

Platinum Layer Customer Insights Result (One Record):
Row(customer_key=183637, order_year=1994, order_month=1, order_value_segment='High', orders_per_segment=1, avg_order_value=Decimal('163460.83'), total_purchase_value=Decimal('163460.83'))
