## Configuration


In [0]:
dbutils.widgets.text("catalog", "dev")
dbutils.widgets.text("silver_schema", "02_silver")
dbutils.widgets.text("gold_schema", "03_gold_supply")

In [0]:
catalog = dbutils.widgets.get("catalog")
silver_schema = dbutils.widgets.get("silver_schema")
gold_schema = dbutils.widgets.get("gold_schema")

## Create Gold Schema

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{gold_schema}")

## B2C Overall KPIs


In [0]:
spark.sql(f"""
CREATE OR REPLACE VIEW {catalog}.{gold_schema}.b2c_overall_kpis AS
WITH base AS (
    SELECT
        coi.order_item_id,
        coi.product_id,
        coi.quantity,
        coi.unit_price AS selling_price,
        coi.unit_price - p.unit_price AS margin,
        coi.unit_price * coi.quantity AS total_amount,
        ((coi.unit_price - p.unit_price) / coi.unit_price) * 100 AS profit_margin,
        coi.order_id AS s_oid,
        p.product_name,
        p.department,
        p.category,
        p.brand,
        p.retail_price,
        p.unit_price AS cost_price,
        p.release_date,
        co.order_id AS c_oid,
        co.order_date AS ordered_date,
        co.order_status,
        co.order_total_amount AS total_amount_order,
        co.payment_method AS payment_method_usage,
        c.consumer_id,
        c.name AS consumer_name,
        c.gender,
        c.age,
        c.registration_date,
        c.is_active,
        c.city,
        c.state,
        c.country,
        i.location_name,
        i.inventory_id,
        i.quantity_on_hand,
        i.inventory_status,
        i.reorder_level
    FROM {catalog}.{silver_schema}.consumer_orders_mv coi
    LEFT JOIN {catalog}.{silver_schema}.consumer_orders_mv co ON coi.order_id = co.order_id
    LEFT JOIN {catalog}.{silver_schema}.consumer_mv c ON co.consumer_id = c.consumer_id
    LEFT JOIN {catalog}.{silver_schema}.product_mv p ON coi.product_id = p.product_id
    LEFT JOIN {catalog}.{silver_schema}.inventory_mv i ON p.product_id = i.product_id
) ,
total_metrics AS (
    SELECT
        COALESCE(SUM(order_total_amount), 0) AS total_consumer_sales,
        COALESCE(AVG(order_total_amount), 0) AS average_order_value,
        COALESCE(COUNT(DISTINCT order_id), 0) AS total_order_count
    FROM (
        SELECT DISTINCT order_id, order_total_amount
        FROM {catalog}.{silver_schema}.consumer_orders_mv
    ) distinct_orders
) ,
order_frequency AS (
    SELECT COALESCE(AVG(order_count), 0) AS order_frequency
    FROM (
        SELECT consumer_id, COUNT(DISTINCT order_id) AS order_count
        FROM {catalog}.{silver_schema}.consumer_orders_mv
        GROUP BY consumer_id
    ) sub
) ,
active_consumers AS (
    SELECT COALESCE(COUNT(DISTINCT consumer_id), 0) AS active_consumer_count
    FROM {catalog}.{silver_schema}.consumer_mv
    WHERE is_active = TRUE
) ,
orders_sales AS (
    SELECT
        consumer_id,
        COUNT(DISTINCT c_oid) AS total_orders,
        SUM(total_amount) AS total_sales_revenue,
        ROUND(SUM(total_amount) / NULLIF(COUNT(DISTINCT c_oid), 0)) AS avg_order_value
    FROM base
    GROUP BY consumer_id
) ,
sales_category AS (
    SELECT consumer_id, category, SUM(total_amount) AS sales_by_category
    FROM base
    GROUP BY consumer_id, category
) ,
sales_brand AS (
    SELECT consumer_id, brand, SUM(total_amount) AS sales_by_brand
    FROM base
    GROUP BY consumer_id, brand
) ,
sales_product AS (
    SELECT consumer_id, product_id, SUM(total_amount) AS sales_by_product
    FROM base
    GROUP BY consumer_id, product_id
) ,
top_selling AS (
    SELECT
        consumer_id,
        product_id,
        SUM(quantity) AS total_qty,
        RANK() OVER (PARTITION BY consumer_id ORDER BY SUM(quantity) DESC) AS rnk
    FROM base
    GROUP BY consumer_id, product_id
) ,
top_selling_product AS (
    SELECT
        t.consumer_id,
        t.product_id AS top_product_id,
        p.product_name AS top_product_name
    FROM top_selling t
    JOIN {catalog}.{silver_schema}.product_mv p ON t.product_id = p.product_id
    WHERE rnk = 1
) ,
returned_products AS (
    SELECT DISTINCT consumer_id, product_id
    FROM base
    WHERE c_oid IN (
        SELECT order_id
        FROM {catalog}.{silver_schema}.consumer_orders_mv
        WHERE order_status = 'Returned'
    )
) ,
low_stock AS (
    SELECT DISTINCT product_id
    FROM {catalog}.{silver_schema}.inventory_mv
    WHERE quantity_on_hand < reorder_level
) ,
payment_method_usage AS (
    SELECT category, payment_method_usage
    FROM base
    GROUP BY category, payment_method_usage
) ,
monthly_sales AS (
    SELECT DATE_TRUNC('month', ordered_date) AS month, SUM(total_amount) AS monthly_sales
    FROM base
    GROUP BY DATE_TRUNC('month', ordered_date)
) ,
clv AS (
    SELECT consumer_id, SUM(total_amount) AS customer_lifetime_value
    FROM base
    GROUP BY consumer_id
) ,
repeat_purchase AS (
    SELECT 
        COUNT(DISTINCT consumer_id) FILTER (WHERE order_count > 1)::decimal / 
        NULLIF(COUNT(DISTINCT consumer_id), 0) AS repeat_purchase_rate
    FROM (
        SELECT consumer_id, COUNT(DISTINCT order_id) AS order_count
        FROM {catalog}.{silver_schema}.consumer_orders_mv
        GROUP BY consumer_id
    ) sub
) ,
new_customers AS (
    SELECT 
        DATE_TRUNC('month', registration_date) AS month,
        COUNT(DISTINCT consumer_id) AS new_customers
    FROM {catalog}.{silver_schema}.consumer_mv
    GROUP BY DATE_TRUNC('month', registration_date)
) ,
fulfillment AS (
    SELECT 
        COUNT(*) FILTER (WHERE order_status = 'Delivered')::decimal / 
        NULLIF(COUNT(*),0) AS fulfillment_rate
    FROM {catalog}.{silver_schema}.consumer_orders_mv
) ,
orders_per_payment AS (
    SELECT payment_method, COUNT(order_id) AS orders_per_payment
    FROM {catalog}.{silver_schema}.consumer_orders_mv
    GROUP BY payment_method
)
SELECT
    p.order_item_id,
    p.product_id,
    p.quantity,
    p.selling_price,
    p.margin,
    p.total_amount,
    p.profit_margin,
    p.s_oid,
    p.product_name,
    p.department,
    p.category,
    p.brand,
    p.retail_price,
    p.cost_price,
    p.release_date,
    p.c_oid,
    p.ordered_date,
    p.order_status,
    p.total_amount_order,
    p.payment_method_usage,
    p.consumer_id,
    p.consumer_name,
    p.gender,
    p.age,
    p.registration_date,
    p.is_active,
    p.city,
    p.state,
    p.country,
    p.location_name,
    p.inventory_id,
    p.quantity_on_hand,
    p.inventory_status,
    p.reorder_level,
    o.total_orders,
    o.total_sales_revenue,
    o.avg_order_value,
    sc.sales_by_category,
    sb.sales_by_brand,
    sp.sales_by_product,
    ts.top_product_id,
    ts.top_product_name,
    rp.repeat_purchase_rate,
    nc.new_customers,
    f.fulfillment_rate,
    op.orders_per_payment,
    CASE WHEN rp.product_id IS NOT NULL THEN 1 ELSE 0 END AS is_returned_product,
    CASE WHEN ls.product_id IS NOT NULL THEN 1 ELSE 0 END AS is_low_stock,
    pmu.payment_method_usage AS payment_method_category_usage,
    ms.month,
    ms.monthly_sales,
    c.customer_lifetime_value,
    tm.total_consumer_sales,
    ROUND(tm.average_order_value) AS global_average_order_value,
    tm.total_order_count,
    ROUND(of.order_frequency) AS avg_order_frequency_per_consumer,
    ac.active_consumer_count
FROM base p
LEFT JOIN repeat_purchase rp ON 1=1
LEFT JOIN new_customers nc ON DATE_TRUNC('month', p.ordered_date) = nc.month
LEFT JOIN fulfillment f ON 1=1
LEFT JOIN orders_per_payment op ON p.payment_method_usage = op.payment_method
LEFT JOIN orders_sales o ON p.consumer_id = o.consumer_id
LEFT JOIN sales_category sc ON p.consumer_id = sc.consumer_id AND p.category = sc.category
LEFT JOIN sales_brand sb ON p.consumer_id = sb.consumer_id AND p.brand = sb.brand
LEFT JOIN sales_product sp ON p.consumer_id = sp.consumer_id AND p.product_id = sp.product_id
LEFT JOIN top_selling_product ts ON p.consumer_id = ts.consumer_id
LEFT JOIN returned_products rp ON p.consumer_id = rp.consumer_id AND p.product_id = rp.product_id
LEFT JOIN low_stock ls ON p.product_id = ls.product_id
LEFT JOIN payment_method_usage pmu ON p.category = pmu.category AND p.payment_method_usage = pmu.payment_method_usage
LEFT JOIN monthly_sales ms ON DATE_TRUNC('month', p.ordered_date) = ms.month
LEFT JOIN clv c ON p.consumer_id = c.consumer_id
CROSS JOIN total_metrics tm
CROSS JOIN order_frequency of
CROSS JOIN active_consumers ac;
""")

## B2B Overall KPIs


In [0]:
spark.sql(f"""
CREATE OR REPLACE VIEW {catalog}.{gold_schema}.b2b_overall_kpis AS
WITH base AS (
    SELECT
        dso.item_no AS purchase_item_id,
        dso.product_id,
        dso.order_quantity AS quantity,
        dso.unit_price AS selling_price,
        dso.unit_price - p.unit_price AS margin,
        dso.item_total_amount AS total_amount,
        ((dso.unit_price - p.unit_price) / dso.unit_price) * 100 AS profit_margin,
        dso.order_id AS s_pid,
        p.product_name,
        p.department,
        p.category,
        p.brand,
        p.retail_price,
        p.unit_price AS cost_price,
        p.release_date,
        p.product_status,
        dso.order_id AS c_pid,
        dso.order_date AS ordered_date,
        NULL AS order_status,
        NULL AS total_amount_order,
        di.distributor_id,
        di.distributor_name,
        di.city,
        di.state,
        di.country,
        i.location_name,
        i.inventory_id,
        i.quantity_on_hand,
        i.inventory_status,
        i.reorder_level
    FROM {catalog}.{silver_schema}.distributor_sale_order_mv dso
    LEFT JOIN {catalog}.{silver_schema}.distributor_mv di ON dso.distributor_id = di.distributor_id
    LEFT JOIN {catalog}.{silver_schema}.product_mv p ON dso.product_id = p.product_id
    LEFT JOIN {catalog}.{silver_schema}.inventory_mv i ON p.product_id = i.product_id
) ,
total_metrics AS (
    SELECT
        COALESCE(SUM(order_total), 0) AS total_distributor_sales,
        COALESCE(AVG(order_total), 0) AS average_order_value,
        COALESCE(COUNT(DISTINCT order_id), 0) AS total_order_count
    FROM (
        SELECT order_id, SUM(item_total_amount) AS order_total
        FROM {catalog}.{silver_schema}.distributor_sale_order_mv
        GROUP BY order_id
    ) order_totals
) ,
order_frequency AS (
    SELECT COALESCE(AVG(order_count), 0) AS order_frequency
    FROM (
        SELECT distributor_id, COUNT(DISTINCT order_id) AS order_count
        FROM {catalog}.{silver_schema}.distributor_sale_order_mv
        GROUP BY distributor_id
    ) sub
) ,
active_distributors AS (
    SELECT COALESCE(COUNT(DISTINCT distributor_id), 0) AS active_distributor_count
    FROM {catalog}.{silver_schema}.distributor_mv
) ,
orders_sales AS (
    SELECT
        distributor_id,
        COUNT(DISTINCT c_pid) AS total_orders,
        SUM(total_amount) AS total_sales_revenue,
        ROUND(SUM(total_amount) / NULLIF(COUNT(DISTINCT c_pid), 0)) AS avg_order_value
    FROM base
    GROUP BY distributor_id
) ,
sales_category AS (
    SELECT distributor_id, category, SUM(total_amount) AS sales_by_category
    FROM base
    GROUP BY distributor_id, category
) ,
sales_brand AS (
    SELECT distributor_id, brand, SUM(total_amount) AS sales_by_brand
    FROM base
    GROUP BY distributor_id, brand
) ,
sales_product AS (
    SELECT distributor_id, product_id, SUM(total_amount) AS sales_by_product
    FROM base
    GROUP BY distributor_id, product_id
) ,
top_selling AS (
    SELECT
        distributor_id,
        product_id,
        SUM(quantity) AS total_qty,
        RANK() OVER (PARTITION BY distributor_id ORDER BY SUM(quantity) DESC) AS rnk
    FROM base
    GROUP BY distributor_id, product_id
) ,
top_selling_product AS (
    SELECT
        t.distributor_id,
        t.product_id AS top_product_id,
        p.product_name AS top_product_name
    FROM top_selling t
    JOIN {catalog}.{silver_schema}.product_mv p ON t.product_id = p.product_id
    WHERE rnk = 1
) ,
low_stock AS (
    SELECT DISTINCT product_id
    FROM {catalog}.{silver_schema}.inventory_mv
    WHERE quantity_on_hand < reorder_level
) ,
store_revenue AS (
    SELECT location_name, SUM(total_amount) AS revenue
    FROM base
    GROUP BY location_name
) ,
monthly_sales AS (
    SELECT DATE_TRUNC('month', ordered_date) AS month, SUM(total_amount) AS monthly_sales
    FROM base
    GROUP BY DATE_TRUNC('month', ordered_date)
) ,
clv AS (
    SELECT distributor_id, SUM(total_amount) AS customer_lifetime_value
    FROM base
    GROUP BY distributor_id
)
SELECT
    p.purchase_item_id,
    p.product_id,
    p.quantity,
    p.selling_price,
    p.total_amount,
    p.s_pid,
    p.product_name,
    p.department,
    p.category,
    p.brand,
    p.retail_price,
    p.cost_price,
    p.release_date,
    p.product_status,
    p.c_pid,
    p.ordered_date,
    p.order_status,
    p.total_amount_order,
    p.distributor_id,
    p.distributor_name,
    p.city,
    p.state,
    p.country,
    p.location_name,
    p.inventory_id,
    p.quantity_on_hand,
    p.inventory_status,
    p.reorder_level,
    o.total_orders,
    o.total_sales_revenue,
    o.avg_order_value,
    sc.sales_by_category,
    sb.sales_by_brand,
    sp.sales_by_product,
    ts.top_product_id,
    ts.top_product_name,
    CASE WHEN ls.product_id IS NOT NULL THEN 1 ELSE 0 END AS is_low_stock,
    sr.revenue AS store_revenue,
    ms.month,
    ms.monthly_sales,
    c.customer_lifetime_value,
    tm.total_distributor_sales,
    ROUND(tm.average_order_value) AS global_average_order_value,
    tm.total_order_count,
    ROUND(of.order_frequency) AS avg_order_frequency_per_distributor,
    ad.active_distributor_count
FROM base p
LEFT JOIN orders_sales o ON p.distributor_id = o.distributor_id
LEFT JOIN sales_category sc ON p.distributor_id = sc.distributor_id AND p.category = sc.category
LEFT JOIN sales_brand sb ON p.distributor_id = sb.distributor_id AND p.brand = sb.brand
LEFT JOIN sales_product sp ON p.distributor_id = sp.distributor_id AND p.product_id = sp.product_id
LEFT JOIN top_selling_product ts ON p.distributor_id = ts.distributor_id
LEFT JOIN low_stock ls ON p.product_id = ls.product_id
LEFT JOIN store_revenue sr ON p.location_name = sr.location_name
LEFT JOIN monthly_sales ms ON DATE_TRUNC('month', p.ordered_date) = ms.month
LEFT JOIN clv c ON p.distributor_id = c.distributor_id
CROSS JOIN total_metrics tm
CROSS JOIN order_frequency of
CROSS JOIN active_distributors ad;
""")