# Epic 1 – Data Foundation Platform

## Feature 1.1: Raw Data Ingestion (Sprint 1)


In [None]:
# CONFIG & INITIALIZATION
# Widgets allow parameterization when running as a Job
import re, pyspark.sql.functions as F
from pyspark.sql import DataFrame
from delta.tables import DeltaTable

# Unity Catalog catalog name (leave blank if not using UC)
dbutils.widgets.text("catalog", "", "Unity Catalog (optional)")
# Optional prefix to distinguish multiple student/dev environments
dbutils.widgets.text("env_prefix", "", "Env prefix (short)")
# Raw brand list (comma separated)
dbutils.widgets.text("brands", "contoso,eurostyle", "Brands CSV list")
# Overwrite mode flag (true/false) for idempotent reruns
dbutils.widgets.text("full_reload", "false", "Full reload?")

catalog = dbutils.widgets.get("catalog").strip()
env_prefix = dbutils.widgets.get("env_prefix").strip()
full_reload = dbutils.widgets.get("full_reload").lower() == "true"
brands = [b.strip().lower() for b in dbutils.widgets.get("brands").split(",") if b.strip()]

# Schemas (databases) expected for minimal path (no UC: hive_metastore)
SCHEMAS = ["raw", "bronze", "silver", "gold", "monitor", "ref"]

# Helper to qualify (catalog.schema.table) if catalog provided
def fq(schema: str, table: str) -> str:
    if catalog:
        return f"{catalog}.{schema}.{table}"
    return f"{schema}.{table}"

# Create schemas if they do not exist (works with or without UC)
for s in SCHEMAS:
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog + '.' if catalog else ''}{s}")

# Paths (DBFS). In later hardening you could externalize to ABFSS mounts.
RAW_BASE = "/FileStore/retail/raw"

spark.conf.set("spark.sql.shuffle.partitions", "auto")

print(f"Catalog: {catalog or 'N/A (hive_metastore)'}  | Env prefix: {env_prefix or 'none'}  | Full reload: {full_reload}")
print(f"Brands: {brands}")

# Generic CSV reader with light schema inference (can be replaced with explicit schema)
def load_raw_csv(path: str) -> DataFrame:
    df = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(path))
    return df

# Add lineage columns & standardize column names (snake_case)
def standardize(df: DataFrame, source_system: str) -> DataFrame:
    # Normalize column names
    renamed = df
    for c in df.columns:
        snake = re.sub(r"[^a-zA-Z0-9]+", "_", c.strip()).lower().strip("_")
        if snake != c:
            renamed = renamed.withColumnRenamed(c, snake)
    enriched = (renamed
                .withColumn("ingest_ts", F.current_timestamp())
                .withColumn("source_system", F.lit(source_system.upper())))
    return enriched

# Idempotent write (overwrite entire table or MERGE on set of keys)
def write_delta(df: DataFrame, schema: str, table: str, keys: list[str] | None = None):
    target = fq(schema, table)
    if not spark._jsparkSession.catalog().tableExists(target):
        (df.write
           .format("delta")
           .mode("overwrite")
           .option("overwriteSchema", "true")
           .saveAsTable(target))
        print(f"Created table {target}")
        return
    if full_reload or not keys:
        (df.write
           .format("delta")
           .mode("overwrite")
           .option("overwriteSchema", "true")
           .saveAsTable(target))
        print(f"Overwrote table {target}")
    else:
        # MERGE logic
        dt = DeltaTable.forName(spark, target)
        alias_src = "src"
        alias_tgt = "tgt"
        cond = " AND ".join([f"{alias_src}.{k} <=> {alias_tgt}.{k}" for k in keys])
        update_set = {c: f"{alias_src}.{c}" for c in df.columns}
        (dt.alias(alias_tgt)
           .merge(df.alias(alias_src), cond)
           .whenMatchedUpdate(set=update_set)
           .whenNotMatchedInsert(values=update_set)
           .execute())
        print(f"Merged into {target} on keys {keys}")

# Simple utility for row count persistence
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {fq('monitor','dq_bronze_daily')} (
  run_date DATE,
  brand STRING,
  table_name STRING,
  row_count BIGINT,
  ingestion_ts TIMESTAMP
) USING delta PARTITIONED BY (run_date)
""")
print("Initialized monitoring table dq_bronze_daily (if not exists).")

### 1) 🟥 Create raw landing folders in DBFS (`/FileStore/retail/raw/contoso/`, `/FileStore/retail/raw/eurostyle/`) and document paths in the runbook.  
[DBX-DE-Assoc][Medallion][Platform]


In [None]:
# Step 1: Ensure raw brand folders exist in DBFS
for b in brands:
    path = f"{RAW_BASE}/{b}"
    dbutils.fs.mkdirs(path)
    print(f"Ensured {path}")

display(dbutils.fs.ls(RAW_BASE))

In [0]:
%sql
CREATE EXTERNAL LOCATION IF NOT EXISTS loc_raw
  URL 'abfss://raw@stescontosoma.dfs.core.windows.net/'
  WITH (STORAGE CREDENTIAL `ws_es_contoso_ma`);

DESCRIBE EXTERNAL LOCATION loc_raw;



In [0]:
# Create directories in DBFS
dbutils.fs.mkdirs("/FileStore/retail/raw/contoso")
dbutils.fs.mkdirs("/FileStore/retail/raw/eurostyle")

# Verify that they exist
display(dbutils.fs.ls("/FileStore/retail/raw"))

### 2. 🟥 Upload Contoso CSVs to the raw path; note file names, counts, and approximate sizes.
[DBX-DE-Assoc][Medallion] 

### 3. 🟥 Ingest Contoso to Delta Bronze with lineage columns (ingest_ts, source_system='CONTOSO') as bronze.sales_contoso
[DBX-DE-Assoc][Delta-Basics][Autoloader][CopyInto][Medallion]

In [0]:
# Step 3: Ingest Contoso raw CSV(s) to bronze
contoso_path = f"{RAW_BASE}/contoso"
# Example: assume files uploaded manually (UI) to contoso_path
contoso_df = load_raw_csv(contoso_path)
contoso_std = standardize(contoso_df, "CONTOSO")
print(f"Loaded Contoso rows: {contoso_std.count()}")

# Example business keys (adjust if schema differs)
contoso_keys = [c for c in ["order_id", "sku", "customer_id", "order_date"] if c in contoso_std.columns]
write_delta(contoso_std, "bronze", "sales_contoso", keys=contoso_keys)

spark.sql(f"COMMENT ON TABLE {fq('bronze','sales_contoso')} IS 'Raw ingested Contoso sales with lineage columns.'")

display(spark.table(fq('bronze','sales_contoso')).limit(20))

### 4. 🟥 Create a BI-friendly Contoso view bronze.v_sales_contoso with trimmed/typed columns for Power BI DirectQuery.
[DBX-DA-Assoc][SQL-Basics][Dashboards]

In [0]:
# Step 4: Create BI view (trim/typed) for DirectQuery
spark.sql(f"""
CREATE OR REPLACE VIEW {fq('bronze','v_sales_contoso')} AS
SELECT
  order_id,
  customer_id,
  sku,
  CAST(order_date AS DATE) AS order_date,
  TRY_CAST(total_amount AS DOUBLE) AS total_amount,
  currency,
  source_system,
  ingest_ts
FROM {fq('bronze','sales_contoso')}
""")
print("Created/Updated view bronze.v_sales_contoso")

display(spark.sql(f"SELECT * FROM {fq('bronze','v_sales_contoso')} LIMIT 10"))

### 5. 🟥 Register tables/views in the metastore (Unity Catalog or workspace) and add table comments.
[DBX-DE-Assoc][UC-Permissions]

In [0]:
# Step 5: (Optional) Additional comments / metadata tagging
spark.sql(f"COMMENT ON VIEW {fq('bronze','v_sales_contoso')} IS 'BI-friendly Contoso sales view for DirectQuery (trimmed & typed).'")

print("Tables & Views in bronze schema:")
bronze_items = spark.sql(f"SHOW TABLES IN {(catalog + '.bronze') if catalog else 'bronze'}")
display(bronze_items)

### 6. 🟥 Validate Contoso types (dates/numerics), address corrupt records if any, and record issues.
[DBX-DE-Assoc][Delta-Basics]

In [0]:
# Step 6: Validate types & basic quality
from pyspark.sql.functions import col, count, sum as Fsum
bronze_contoso = spark.table(fq('bronze','sales_contoso'))

row_count = bronze_contoso.count()
null_counts = bronze_contoso.select([Fsum(col(c).isNull().cast('int')).alias(c) for c in bronze_contoso.columns])
print(f"Row count: {row_count}")
print("Null counts per column:")
display(null_counts)

# Simple date sanity check if order_date present
if 'order_date' in bronze_contoso.columns:
    date_min, date_max = bronze_contoso.select(F.min('order_date'), F.max('order_date')).first()
    print(f"Date range: {date_min} -> {date_max}")

### 7. 🟨 Perform a Power BI DirectQuery smoke test to bronze.v_sales_contoso; capture steps/screenshot in the README.
[DBX-DA-Assoc][Dashboards][MS-PL300][Visualize]

### 8) 🟥 Upload EuroStyle CSVs to the raw path and capture source metadata (provenance, obtained date). 
[DBX-DE-Assoc][Medallion]  

### 9) 🟥 Ingest EuroStyle to Delta Bronze with lineage columns (`ingest_ts`, `source_system='EUROSTYLE'`) as `bronze.sales_eurostyle`.  
[DBX-DE-Assoc][Delta-Basics][Autoloader][CopyInto][Medallion] 

### 10) 🟥 Create and check in `docs/column_mapping.csv` with `source_name, unified_name, target_type`.  
[DBX-DE-Prof][Modeling]


### 11) 🟥 Apply initial schema alignment across brands using the mapping and naming conventions (snake_case, consistent date/decimal types); update the runbook.  
[DBX-DE-Prof][Modeling]  



### 12) 🟥 Reconcile raw→Bronze row counts per brand (±1% tolerance or explained variance) and persist counts to `monitor.dq_bronze_daily`.  
[DBX-DE-Prof][Monitoring-Logs]  


### 13) 🟥 Compute a basic DQ summary: null rates on keys, duplicate rate on `(order_id, sku, customer_id, order_date)`, top countries/currencies; publish a one-pager.  
[DBX-DE-Prof][Monitoring-Logs]  



### 14) 🟥 Enforce basic Delta constraints where feasible (NOT NULL on business keys, simple CHECKs); record violations.  
[DBX-DE-Assoc][Delta-Basics]  



### 15) 🟥 [DBX-DE-Assoc][Delta-MERGE][Delta-Basics][Medallion]  
Implement an idempotent re-run strategy (deterministic overwrite by date window via `replaceWhere` or `MERGE` on business keys) and verify repeatability.  