In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # 01 â€” Data Ingestion (TPCâ€‘DS SF1000)
# MAGIC 
# MAGIC This notebook ingests selected TPCâ€‘DS SF1000 tables into managed Delta tables.
# MAGIC 
# MAGIC **Goals**
# MAGIC - Load raw TPCâ€‘DS tables from Databricks datasets
# MAGIC - Write them into your project schema as Delta (Bronze layer)
# MAGIC - Apply basic schema validation and logging
# MAGIC - Prepare data for downstream feature engineering
# MAGIC 
# MAGIC **Tables used**
# MAGIC - `store_sales`
# MAGIC - `customer`
# MAGIC - `item`
# MAGIC - `date_dim`
# MAGIC 
# MAGIC These tables support customerâ€‘level spend prediction.

# COMMAND ----------


In [0]:
catalog = "workspace"
schema = "ml_tpcds"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"USE SCHEMA {schema}")


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## Configuration

# COMMAND ----------

import mlflow
from pyspark.sql import DataFrame

# Your project schema (Unity Catalog or Hive Metastore)
catalog = "workspace"
schema = "ml_tpcds"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
print(f"Using schema: {catalog}.{schema}")


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## Helper: Load TPCâ€‘DS table and write as Delta

# COMMAND ----------

def ingest_table(source_table: str, target_table: str) -> DataFrame:
    """
    Reads a TPCâ€‘DS table and writes it as a Delta table.
    """
    print(f"ðŸ“¥ Ingesting {source_table} â†’ {target_table}")

    df = spark.read.table(source_table)

    # Basic validation
    if df.count() == 0:
        raise ValueError(f"Source table {source_table} is empty.")

    (
        df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(target_table)
    )

    print(f"âœ… Wrote {df.count():,} rows to {target_table}")
    return df


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## Ingest Raw TPCâ€‘DS Tables (Bronze Layer)

# COMMAND ----------

tables_to_ingest = {
    "samples.tpcds_sf1000.store_sales": f"{catalog}.{schema}.store_sales_bronze",
    "samples.tpcds_sf1000.customer": f"{catalog}.{schema}.customer_bronze",
    "samples.tpcds_sf1000.item": f"{catalog}.{schema}.item_bronze",
    "samples.tpcds_sf1000.date_dim": f"{catalog}.{schema}.date_dim_bronze",
}

bronze_tables = {}

for source, target in tables_to_ingest.items():
    bronze_tables[target] = ingest_table(source, target)


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## Quick Profiling

# COMMAND ----------

for table_name, df in bronze_tables.items():
    print(f"\nðŸ“Š {table_name}")
    df.printSchema()
    display(df.limit(5))


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC ## MLflow Logging

# COMMAND ----------

with mlflow.start_run(run_name="data_ingestion_tpcds"):
    for table_name, df in bronze_tables.items():
        mlflow.log_param(f"{table_name}_rows", df.count())
        mlflow.log_param(f"{table_name}_columns", len(df.columns))

    mlflow.log_param("catalog", catalog)
    mlflow.log_param("schema", schema)

print("Ingestion metadata logged to MLflow.")


In [0]:
# COMMAND ----------
# MAGIC %md
# MAGIC # Ingestion Complete
# MAGIC 
# MAGIC Your Bronze tables are now ready for feature engineering in the next notebook.
