In [0]:
# Databricks notebook cell: Create GOLD table gold_sales_monthly (AdventureWorksDW2017, idempotent)

# --- Widgets: Set up CATALOG, SILVER_SCHEMA, GOLD_SCHEMA for notebook parameterization ---
dbutils.widgets.text("CATALOG", "AdventureWorksDW2017", "Catalog Name")
dbutils.widgets.text("SILVER_SCHEMA", "silver", "Silver Schema Name")
dbutils.widgets.text("GOLD_SCHEMA", "gold", "Gold Schema Name")

In [0]:
# --- Params (set these via dbutils.widgets or manually) ---
try:
    CATALOG = dbutils.widgets.get("CATALOG")
except Exception:
    CATALOG = "AdventureWorksDW2017"

try:
    SILVER_SCHEMA = dbutils.widgets.get("SILVER_SCHEMA")
except Exception:
    SILVER_SCHEMA = "silver"

try:
    GOLD_SCHEMA = dbutils.widgets.get("GOLD_SCHEMA")
except Exception:
    GOLD_SCHEMA = "gold"

GOLD_TABLE = "gold_sales_monthly"
GOLD_PATH = f"{CATALOG}.{GOLD_SCHEMA}.{GOLD_TABLE}"

# --- Ensure schema exists ---
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{GOLD_SCHEMA}")

# --- Get available tables (from information_schema) ---
silver_tables = set(
    row.table_name for row in spark.sql(
        f"SELECT table_name FROM {CATALOG}.information_schema.tables WHERE table_schema = '{SILVER_SCHEMA}'"
    ).collect()
)

# --- Detect SALES_BASE or build it inline ---
if "SALES_BASE" in {t.upper() for t in silver_tables}:
    sales_base_sql = f"SELECT * FROM {CATALOG}.{SILVER_SCHEMA}.SALES_BASE"
else:
    # Compose union of FactInternetSales and FactResellerSales, joined to dims for all keys
    sales_base_sql = f"""
      SELECT
        CAST(CONCAT(d.CalendarYear, LPAD(d.MonthNumberOfYear,2,'0')) AS STRING) AS yyyymm,
        'Internet' AS channel,
        pc.EnglishProductCategoryName AS product_category,
        psc.EnglishProductSubcategoryName AS product_subcategory,
        fs.SalesAmount,
        fs.OrderQuantity,
        fs.SalesOrderNumber,
        fs.TotalProductCost
      FROM {CATALOG}.{SILVER_SCHEMA}.FactInternetSales fs
      LEFT JOIN {CATALOG}.{SILVER_SCHEMA}.DimDate d ON fs.OrderDateKey = d.DateKey
      LEFT JOIN {CATALOG}.{SILVER_SCHEMA}.DimProduct p ON fs.ProductKey = p.ProductKey
      LEFT JOIN {CATALOG}.{SILVER_SCHEMA}.DimProductSubcategory psc ON p.ProductSubcategoryKey = psc.ProductSubcategoryKey
      LEFT JOIN {CATALOG}.{SILVER_SCHEMA}.DimProductCategory pc ON psc.ProductCategoryKey = pc.ProductCategoryKey

      UNION ALL

      SELECT
        CAST(CONCAT(d.CalendarYear, LPAD(d.MonthNumberOfYear,2,'0')) AS STRING) AS yyyymm,
        'Reseller' AS channel,
        pc.EnglishProductCategoryName AS product_category,
        psc.EnglishProductSubcategoryName AS product_subcategory,
        fs.SalesAmount,
        fs.OrderQuantity,
        fs.SalesOrderNumber,
        fs.TotalProductCost
      FROM {CATALOG}.{SILVER_SCHEMA}.FactResellerSales fs
      LEFT JOIN {CATALOG}.{SILVER_SCHEMA}.DimDate d ON fs.OrderDateKey = d.DateKey
      LEFT JOIN {CATALOG}.{SILVER_SCHEMA}.DimProduct p ON fs.ProductKey = p.ProductKey
      LEFT JOIN {CATALOG}.{SILVER_SCHEMA}.DimProductSubcategory psc ON p.ProductSubcategoryKey = psc.ProductSubcategoryKey
      LEFT JOIN {CATALOG}.{SILVER_SCHEMA}.DimProductCategory pc ON psc.ProductCategoryKey = pc.ProductCategoryKey
    """

# --- Create or replace the GOLD Delta table ---
spark.sql(f"""
CREATE OR REPLACE TABLE {GOLD_PATH}
COMMENT 'Monthly aggregate sales by channel, product_category, product_subcategory (grain: yyyymm, channel, product_category, product_subcategory). Metrics: revenue=sum SalesAmount, units=sum OrderQuantity, orders=count distinct SalesOrderNumber, gross_margin=sum(SalesAmount-TotalProductCost), avg_order_value=revenue/orders.'
TBLPROPERTIES (
  delta.enableChangeDataFeed = true,
  delta.autoOptimize.optimizeWrite = true,
  delta.autoOptimize.autoCompact = true
)
PARTITIONED BY (channel, yyyymm)
AS
SELECT
  yyyymm,
  channel,
  product_category,
  product_subcategory,
  SUM(SalesAmount) AS revenue,
  SUM(OrderQuantity) AS units,
  COUNT(DISTINCT SalesOrderNumber) AS orders,
  SUM(SalesAmount - TotalProductCost) AS gross_margin,
  CASE WHEN COUNT(DISTINCT SalesOrderNumber) = 0 THEN NULL ELSE SUM(SalesAmount)/COUNT(DISTINCT SalesOrderNumber) END AS avg_order_value
FROM ({sales_base_sql})
GROUP BY yyyymm, channel, product_category, product_subcategory
""")

# --- Verification: Table exists, row count > 0, preview top 10 ---
show_df = spark.sql(f"SHOW TABLES IN {CATALOG}.{GOLD_SCHEMA} LIKE '{GOLD_TABLE}'")
assert show_df.count() == 1, f"Table {GOLD_PATH} not found after creation!"

row_count = spark.sql(f"SELECT COUNT(*) AS n FROM {GOLD_PATH}").collect()[0]['n']
assert row_count > 0, f"{GOLD_PATH} has no rows!"

display(spark.sql(f"SELECT * FROM {GOLD_PATH} ORDER BY yyyymm DESC, revenue DESC LIMIT 10"))

In [0]:
%sql
SELECT
  CustomerKey,
  SUM(SalesAmount) AS total_sales_amount,
  SUM(OrderQuantity) AS total_order_quantity,
  COUNT(DISTINCT SalesOrderNumber) AS orders,
  SUM(TotalProductCost) AS total_product_cost,
  MIN(OrderDateKey) AS first_order_date,
  MAX(OrderDateKey) AS last_order_date
  -- Add more aggregations as needed
FROM vibecoding.silver.FactInternetSales
GROUP BY CustomerKey
ORDER BY CustomerKey