In [0]:
use catalog data_modeling;

In [0]:
%python

from pyspark.sql.functions import (
    lit, col, count, sum, avg, round, max, min, stddev,
    countDistinct, datediff, when, current_timestamp,
    desc, row_number
)
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from datetime import datetime

In [0]:
-- create or replace table gold.dim_customer(
--     key bigint GENERATED ALWAYS AS IDENTITY,
--     UserId int,
--     PhoneNumber string,
--     CustName string,
--     Gender string,
--     Age int,
--     MaritalStatus string,
--     State string,
--     Zone string,
--     Occupation string,
--     StartDate date,
--     EndDate date,
--     ActiveFlag string
-- )
-- using delta;

create or replace temporary view silver_customer as
select *, UserId as mergkey
from silver.customer
union all
select src.*, 0 as mergkey
from silver.customer as src
inner join gold.dim_customer as tgt on src.UserId = tgt.UserId and tgt.ActiveFlag = 'Y'
where src.PhoneNumber <> tgt.PhoneNumber
OR src.CustName <> tgt.CustName
OR src.Gender <> tgt.Gender
OR src.Age <> tgt.Age
OR src.MaritalStatus <> tgt.MaritalStatus
OR src.State <> tgt.State
OR src.Occupation <> tgt.Occupation
OR src.Zone <> tgt.Zone;

MERGE INTO gold.dim_customer AS tgt
USING silver_customer AS src
ON tgt.UserId = src.mergkey
AND tgt.ActiveFlag = 'Y'
WHEN MATCHED AND (
    src.CustName <> tgt.CustName
    OR src.PhoneNumber <> tgt.PhoneNumber
    OR src.Gender <> tgt.Gender
    OR src.Age <> tgt.Age
    OR src.MaritalStatus <> tgt.MaritalStatus
    OR src.State <> tgt.State
    OR src.Occupation <> tgt.Occupation
    OR src.Zone <> tgt.Zone
)
THEN UPDATE SET
    EndDate = current_date(),
    ActiveFlag = 'N'
WHEN NOT MATCHED
THEN INSERT (
    UserId,
    PhoneNumber,
    CustName,
    Gender,
    Age,
    MaritalStatus,
    State,
    Occupation,
    Zone,
    StartDate,
    EndDate,
    ActiveFlag
)
VALUES (
    src.UserId,
    src.PhoneNumber,
    src.CustName,
    src.Gender,
    src.Age,
    src.MaritalStatus,
    src.State,
    src.Occupation,
    src.Zone,
    current_date(),
    date('9999-12-31'),
    'Y'
);

-- select * from gold.dim_customer ;

In [0]:
-- create or replace table gold.dim_product(
--     key bigint GENERATED ALWAYS AS IDENTITY,
--     ProductId int,
--     ProductCode string,
--     ProductDesc string,
--     ProductCategory string,
--     ProductSubCategory string,
--     ProductCost double,
--     StartDate date,
--     EndDate date,
--     ActiveFlag string
-- )
-- using delta;

create or replace temporary view silver_product as
select *, ProductId as mergkey
from silver.product
union all
select src.*, NULL as mergkey
from silver.product as src
inner join gold.dim_product as tgt on src.ProductId = tgt.ProductId and tgt.ActiveFlag = 'Y'
where src.ProductCode <> tgt.ProductCode
OR src.ProductDesc <> tgt.ProductDesc
OR src.ProductCategory <> tgt.ProductCategory
OR src.ProductSubCategory <> tgt.ProductSubCategory
OR src.ProductCost <> tgt.ProductCost;

MERGE INTO gold.dim_product AS tgt
USING silver_product AS src
ON tgt.ProductId = src.mergkey
AND tgt.ActiveFlag = 'Y'
WHEN MATCHED AND (
    src.ProductDesc <> tgt.ProductDesc
    OR src.ProductCode <> tgt.ProductCode
    OR src.ProductCategory <> tgt.ProductCategory
    OR src.ProductSubCategory <> tgt.ProductSubCategory
    OR src.ProductCost <> tgt.ProductCost
)
THEN UPDATE SET
    EndDate = current_date(),
    ActiveFlag = 'N'
WHEN NOT MATCHED
THEN INSERT (
    ProductId,
    ProductCode,
    ProductDesc,
    ProductCategory,
    ProductSubCategory,
    ProductCost,
    StartDate,
    EndDate,
    ActiveFlag
)
VALUES (
    src.ProductId,
    src.ProductCode,
    src.ProductDesc,
    src.ProductCategory,
    src.ProductSubCategory,
    src.ProductCost,
    current_date(),
    date('9999-12-31'),
    'Y'
);

-- select * from gold.dim_product ;

In [0]:
-- create or replace table gold.dim_location(
--     key bigint GENERATED ALWAYS AS IDENTITY,
--     StoreID int,
--     State string,
--     Zone string,
--     Country string,
--     Region string,
--     StartDate date,
--     EndDate date,
--     ActiveFlag string
-- )
-- using delta;

create or replace temporary view silver_loc as
select *, StoreID as mergkey
from silver.loc
union all
select src.*, 0 as mergkey
from silver.loc as src
inner join gold.dim_location as tgt on src.StoreID = tgt.StoreID AND tgt.ActiveFlag = 'Y'
WHERE src.State <> tgt.State 
   OR src.Zone <> tgt.Zone
   OR src.Country <> tgt.Country
   OR src.Region <> tgt.Region
;

MERGE INTO gold.dim_location AS tgt
USING silver_loc AS src
ON tgt.StoreID = src.mergkey
AND tgt.ActiveFlag = 'Y'
WHEN MATCHED AND (
        src.State <> tgt.State 
        OR src.Zone <> tgt.Zone
        OR src.Country <> tgt.Country
        OR src.Region <> tgt.Region
)
THEN UPDATE SET
    EndDate = current_date(),
    ActiveFlag = 'N'
WHEN NOT MATCHED
THEN INSERT (
    StoreID,
    State,
    Zone,
    Country,
    Region,
    StartDate,
    EndDate,
    ActiveFlag
)
VALUES (
    src.StoreID,
    src.State,
    src.Zone,
    src.Country,
    src.Region,
    current_date(),
    date('9999-12-31'),
    'Y'
);

-- select * from gold.dim_location ;

In [0]:
create or replace table gold.fact_sales(
RunID string,
TransactionId long,
UserId long,
ProductId long,
StoreID long,
OrderTms timestamp,
OrderDate date,
OrderCount double,
TotalAmount double,
CreatedDate timestamp
)
using delta;

-- select * from silver.sales;

In [0]:
%python

sales_df = spark.table("silver.sales")

runid = sales_df.select("RunId").distinct().first()["RunId"]

spark.sql(f"delete from gold.fact_sales where RunId = {runid}")

sales_df.write.mode("append").saveAsTable("gold.fact_sales")

In [0]:
%python

# KPI TABLE 1: gold.kpi_store
# Create schema
# spark.sql("""
# create or replace table gold.kpi_store(
#     RunID string,
#     StoreID long,
#     TransactionCount long,
#     TotalOrders double,
#     TotalRevenue double,
#     AvgTransactionValue double,
#     MaxTransaction double,
#     MinTransaction double,
#     StdDevRevenue double,
#     CreatedDate timestamp
# )
# using delta;
# """)

# Calculate KPI
kpi_store_df = (
    sales_df
    .groupBy("StoreID")
    .agg(
    count("TransactionId").alias("TransactionCount"),
    sum("OrderCount").alias("TotalOrders"),
    sum("TotalAmount").alias("TotalRevenue"),
    round(avg("TotalAmount"), 2).alias("AvgTransactionValue"),
    max("TotalAmount").alias("MaxTransaction"),
    min("TotalAmount").alias("MinTransaction"),
    round(stddev("TotalAmount"), 2).alias("StdDevRevenue"))
    .withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select("RunID", "StoreID", "TransactionCount", "TotalOrders", "TotalRevenue",
            "AvgTransactionValue", "MaxTransaction", "MinTransaction", "StdDevRevenue", "CreatedDate")
)

# Incremental append with deduplication
spark.sql(f"delete from gold.kpi_store where RunID = '{runid}'")
kpi_store_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_store")

# KPI TABLE 2: gold.kpi_product

# spark.sql("""
# create or replace table gold.kpi_product(
#     RunID string,
#     ProductId long,
#     TransactionCount long,
#     TotalOrders double,
#     TotalRevenue double,
#     AvgTransactionValue double,
#     UniqueCustomers long,
#     StoresDistribution long,
#     CreatedDate timestamp
# )
# using delta;
# """)


kpi_product_df = (
    sales_df
    .groupBy("ProductId")
    .agg(
        count("TransactionId").alias("TransactionCount"),
        sum("OrderCount").alias("TotalOrders"),
        sum("TotalAmount").alias("TotalRevenue"),
        round(avg("TotalAmount"), 2).alias("AvgTransactionValue"),
        countDistinct("UserId").alias("UniqueCustomers"),
        countDistinct("StoreID").alias("StoresDistribution"))
    .withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select("RunID", "ProductId", "TransactionCount", "TotalOrders", "TotalRevenue",
            "AvgTransactionValue", "UniqueCustomers", "StoresDistribution", "CreatedDate")
)

spark.sql(f"delete from gold.kpi_product where RunID = '{runid}'")
kpi_product_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_product")

# KPI TABLE 3: gold.kpi_customer

# spark.sql("""
# create or replace table gold.kpi_customer(
#     RunID string,
#     UserId long,
#     TransactionCount long,
#     TotalOrders double,
#     TotalSpent double,
#     AvgTransactionValue double,
#     UniqueProductsBought long,
#     StoresVisited long,
#     FirstPurchaseDate date,
#     LastPurchaseDate date,
#     CreatedDate timestamp
# )
# using delta;
# """)

print("✓ Schema created: gold.kpi_customer")

kpi_customer_df = (
    sales_df
    .groupBy("UserId")
    .agg(
        count("TransactionId").alias("TransactionCount"),
        sum("OrderCount").alias("TotalOrders"),
        sum("TotalAmount").alias("TotalSpent"),
        round(avg("TotalAmount"), 2).alias("AvgTransactionValue"),
        countDistinct("ProductId").alias("UniqueProductsBought"),
        countDistinct("StoreID").alias("StoresVisited"),
        min(col("OrderDate")).alias("FirstPurchaseDate"),
        max(col("OrderDate")).alias("LastPurchaseDate"))
    .withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select("RunID", "UserId", "TransactionCount", "TotalOrders", "TotalSpent", 
            "AvgTransactionValue", "UniqueProductsBought", "StoresVisited", 
            "FirstPurchaseDate", "LastPurchaseDate", "CreatedDate")
)

spark.sql(f"delete from gold.kpi_customer where RunID = '{runid}'")
kpi_customer_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_customer")

# KPI TABLE 4: gold.kpi_store_product

# spark.sql("""
# create or replace table gold.kpi_store_product(
#     RunID string,
#     StoreID long,
#     ProductId long,
#     TransactionCount long,
#     TotalOrders double,
#     TotalRevenue double,
#     AvgTransactionValue double,
#     UniqueCustomers long,
#     CreatedDate timestamp
# )
# using delta;
# """)

kpi_store_product_df = (
    sales_df
    .groupBy("StoreID", "ProductId")
    .agg(
        count("TransactionId").alias("TransactionCount"),
        sum("OrderCount").alias("TotalOrders"),
        sum("TotalAmount").alias("TotalRevenue"),
        round(avg("TotalAmount"), 2).alias("AvgTransactionValue"),
        countDistinct("UserId").alias("UniqueCustomers")
        ).withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select("RunID", "StoreID", "ProductId", "TransactionCount", "TotalOrders", 
            "TotalRevenue", "AvgTransactionValue", "UniqueCustomers", "CreatedDate")
)

spark.sql(f"delete from gold.kpi_store_product where RunID = '{runid}'")
kpi_store_product_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_store_product")

# KPI TABLE 5: gold.kpi_customer_product

# spark.sql("""
# create or replace table gold.kpi_customer_product(
#     RunID string,
#     UserId long,
#     ProductId long,
#     PurchaseFrequency long,
#     TotalOrders double,
#     TotalSpent double,
#     AvgPurchaseValue double,
#     CreatedDate timestamp
# )
# using delta;
# """)

kpi_customer_product_df = (
    sales_df
    .groupBy("UserId", "ProductId")
    .agg(
        count("TransactionId").alias("PurchaseFrequency"),
        sum("OrderCount").alias("TotalOrders"),
        sum("TotalAmount").alias("TotalSpent"),
        round(avg("TotalAmount"), 2).alias("AvgPurchaseValue")
        )
    .withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select("RunID", "UserId", "ProductId", "PurchaseFrequency", "TotalOrders",
            "TotalSpent", "AvgPurchaseValue", "CreatedDate")
)

spark.sql(f"delete from gold.kpi_customer_product where RunID = '{runid}'")
kpi_customer_product_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_customer_product")

# KPI TABLE 6: gold.kpi_daily

# spark.sql("""
# create or replace table gold.kpi_daily(
#     RunID string,
#     OrderDate date,
#     TransactionCount long,
#     TotalOrders double,
#     DailyRevenue double,
#     AvgTransactionValue double,
#     UniqueCustomers long,
#     StoresActive long,
#     CreatedDate timestamp
# )
# using delta;
# """)

kpi_daily_df = (
    sales_df
    .groupBy("OrderDate").
    agg(
        count("TransactionId").alias("TransactionCount"),
        sum("OrderCount").alias("TotalOrders"),
        sum("TotalAmount").alias("DailyRevenue"),
        round(avg("TotalAmount"), 2).alias("AvgTransactionValue"),
        countDistinct("UserId").alias("UniqueCustomers"),
        countDistinct("StoreID").alias("StoresActive")
        ).withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select("RunID", "OrderDate", "TransactionCount", "TotalOrders", "DailyRevenue",
            "AvgTransactionValue", "UniqueCustomers", "StoresActive", "CreatedDate")
)

spark.sql(f"delete from gold.kpi_daily where RunID = '{runid}'")
kpi_daily_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_daily")

# # KPI TABLE 7: gold.kpi_rfm
# spark.sql("""
# create or replace table gold.kpi_rfm(
#     RunID string,
#     UserId long,
#     Recency int,
#     Frequency long,
#     Monetary double,
#     CustomerSegment string,
#     CreatedDate timestamp
# )
# using delta;
# """)


max_date = sales_df.agg(max("OrderDate")).collect()[0][0]

kpi_rfm_df = (
    sales_df
    .groupBy("UserId")
    .agg(
        datediff(lit(max_date), max("OrderDate")).alias("Recency"),
        count("TransactionId").alias("Frequency"),
        sum("TotalAmount").alias("Monetary"))
    .withColumn(
        "CustomerSegment",
        when((col("Recency") <= 15) & (col("Frequency") >= 3) & (col("Monetary") >= 20000), "High Value")
        .when((col("Recency") <= 30) & (col("Frequency") >= 2), "Active")
        .when((col("Recency") > 30), "At Risk")
        .otherwise("Normal"))
    .withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select("RunID", "UserId", "Recency", "Frequency", "Monetary", "CustomerSegment", "CreatedDate")
)

spark.sql(f"delete from gold.kpi_rfm where RunID = '{runid}'")
kpi_rfm_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_rfm")

# # KPI TABLE 8: gold.kpi_product_ranking
# spark.sql("""
# create or replace table gold.kpi_product_ranking(
#     RunID string,
#     ProductId long,
#     TransactionCount long,
#     TotalUnits double,
#     TotalRevenue double,
#     AvgPrice double,
#     CustomerReach long,
#     UnitsPerCustomer double,
#     RevenueRank int,
#     VolumeRank int,
#     CreatedDate timestamp
# )
# using delta;
# """)

kpi_product_ranking_df = (
    sales_df
    .groupBy("ProductId")
    .agg(
        count("TransactionId").alias("TransactionCount"),
        sum("OrderCount").alias("TotalUnits"),
        sum("TotalAmount").alias("TotalRevenue"),
        round(avg("TotalAmount"), 2).alias("AvgPrice"),
        countDistinct("UserId").alias("CustomerReach"))
    .withColumn(
        "UnitsPerCustomer",round(col("TotalUnits") / col("CustomerReach"), 2))
    .withColumn(
        "RevenueRank", row_number().over(Window.orderBy(desc("TotalRevenue"))))
    .withColumn(
        "VolumeRank", row_number().over(Window.orderBy(desc("TotalUnits"))))
    .withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select("RunID", "ProductId", "TransactionCount", "TotalUnits", "TotalRevenue", "AvgPrice", "CustomerReach", "UnitsPerCustomer", "RevenueRank", "VolumeRank", "CreatedDate")
)

spark.sql(f"delete from gold.kpi_product_ranking where RunID = '{runid}'")
kpi_product_ranking_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_product_ranking")

# KPI TABLE 9: gold.kpi_store_performance

# spark.sql("""
# create or replace table gold.kpi_store_performance(
#     RunID string,
#     StoreID long,
#     TransactionCount long,
#     TotalRevenue double,
#     UniqueCustomers long,
#     AvgTransactionValue double,
#     RevenuePerCustomer double,
#     TransactionFrequency double,
#     PerformanceRank int,
#     CreatedDate timestamp
# )
# using delta;
# """)

print("✓ Schema created: gold.kpi_store_performance")

kpi_store_performance_df = (
    sales_df
    .groupBy("StoreID")
    .agg(
        count("TransactionId").alias("TransactionCount"),
        sum("TotalAmount").alias("TotalRevenue"),
        countDistinct("UserId").alias("UniqueCustomers"),
        round(avg("TotalAmount"), 2).alias("AvgTransactionValue")
    )
    .withColumn(
        "RevenuePerCustomer",
        round(col("TotalRevenue") / col("UniqueCustomers"), 2)
    )
    .withColumn(
        "TransactionFrequency",
        round(col("TransactionCount") / col("UniqueCustomers"), 2)
    )
    .withColumn(
        "PerformanceRank",
        row_number().over(Window.orderBy(desc("TotalRevenue")))
    )
    .withColumn("RunID", lit(runid))
    .withColumn("CreatedDate", current_timestamp())
    .select(
        "RunID", "StoreID", "TransactionCount", "TotalRevenue", "UniqueCustomers",
        "AvgTransactionValue", "RevenuePerCustomer", "TransactionFrequency",
        "PerformanceRank", "CreatedDate"
    )
)

spark.sql(f"delete from gold.kpi_store_performance where RunID = '{runid}'")
kpi_store_performance_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_store_performance")

# KPI TABLE 10: gold.kpi_summary

# spark.sql("""
# create or replace table gold.kpi_summary(
#     RunID string,
#     MetricName string,
#     MetricValue string,
#     MetricType string,
#     CreatedDate timestamp
# )
# using delta;
# """)


total_revenue = sales_df.agg(sum("TotalAmount")).collect()[0][0]
total_transactions = sales_df.count()
total_orders = sales_df.agg(sum("OrderCount")).collect()[0][0]
unique_customers = sales_df.agg(countDistinct("UserId")).collect()[0][0]
unique_products = sales_df.agg(countDistinct("ProductId")).collect()[0][0]
unique_stores = sales_df.agg(countDistinct("StoreID")).collect()[0][0]

summary_data = [
    (runid, "Total Revenue", f"{total_revenue:,.2f}", "currency", datetime.now()),
    (runid, "Total Transactions", f"{total_transactions:,}", "count", datetime.now()),
    (runid, "Total Orders", f"{total_orders:,}", "count", datetime.now()),
    (runid, "Unique Customers", f"{unique_customers:,}", "count", datetime.now()),
    (runid, "Unique Products", f"{unique_products:,}", "count", datetime.now()),
    (runid, "Active Stores", f"{unique_stores:,}", "count", datetime.now()),
    (runid, "Avg Transaction Value", f"{total_revenue/total_transactions:,.2f}", "currency", datetime.now()),
    (runid, "Avg Orders per Transaction", f"{total_orders/total_transactions:.2f}", "numeric", datetime.now()),
    (runid, "Revenue per Customer", f"{total_revenue/unique_customers:,.2f}", "currency", datetime.now()),
    (runid, "Transactions per Customer", f"{total_transactions/unique_customers:.2f}", "numeric", datetime.now()),
]

kpi_summary_df = spark.createDataFrame(
    summary_data, 
    ["RunID", "MetricName", "MetricValue", "MetricType", "CreatedDate"]
)

spark.sql(f"delete from gold.kpi_summary where RunID = '{runid}'")
kpi_summary_df.write.mode("append").option("mergeSchema", "true").saveAsTable("gold.kpi_summary")