In [0]:
# ============================================================
# 01_bronze_ingest.py
#
# PURPOSE
# -------
# Ingest raw Olist CSV files into Unity Catalog "Bronze" tables.
#
# BRONZE DESIGN PRINCIPLES (WHY THIS IS BUILT THIS WAY)
# -----------------------------------------------------
# 1) Bronze is RAW and APPEND-SAFE:
#    - We want to land data exactly as received, with minimal assumptions.
#
# 2) NO schema inference in Bronze:
#    - Spark schema inference can guess wrong types (e.g., an INT column that
#      later contains a timestamp string). That causes failures downstream.
#    - Therefore, we store ALL columns as STRING in Bronze and cast types in Silver.
#
# 3) Unity Catalog managed tables:
#    - We write using saveAsTable() so Databricks/UC manages storage locations.
#    - This avoids LOCATION issues and works well with Serverless compute.
#
# 4) File-based incremental scaffolding:
#    - We maintain an ingest log table that tracks which source files were ingested.
#    - On incremental runs, we only ingest rows from files not seen before.
# ============================================================


# ----------------------------
# 0) Imports
# ----------------------------

from pyspark.sql.functions import current_timestamp  # Spark expression: current timestamp for ingestion audit
from pyspark.sql.functions import lit  # Spark expression: create constant-value columns
from pyspark.sql.functions import col  # Spark expression: reference DataFrame columns (incl. _metadata.file_path)


# ----------------------------
# 1) Configuration (paths + schemas)
# ----------------------------

# Raw CSV landing directory inside a Unity Catalog Volume
# This is where the Azureâ†’Volume ingestion notebook downloaded the CSVs.
RAW_VOLUME_DIR = "/Volumes/olist/stage/olist_stage/olist_raw"

# Unity Catalog schema for Bronze tables (raw landing tables)
BRONZE_SCHEMA = "olist.bronze"

# Unity Catalog schema for operational/control tables (ingest logs, checkpoints)
OPS_SCHEMA = "olist.ops"

# Ingest log table tracks which files have been ingested per Bronze table (for incremental runs)
INGEST_LOG_TABLE = f"{OPS_SCHEMA}.bronze_ingest_log"


# ----------------------------
# 2) Run mode (overwrite vs append)
# ----------------------------

# Overwrite is correct for the first full load / rebuild.
# Append is used for future incremental runs (only new files are ingested).
WRITE_MODE = "overwrite"  # Change to "append" after your first successful baseline load


# ----------------------------
# 3) Ensure schemas exist (idempotent)
# ----------------------------

# Create the Bronze schema if it does not exist (safe to re-run)
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {BRONZE_SCHEMA}")

# Create the Ops schema if it does not exist (safe to re-run)
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {OPS_SCHEMA}")


# ----------------------------
# 4) Create ingest log table if missing (idempotent)
# ----------------------------

# This control table stores one row per (bronze_table, source_file) that was ingested.
# It enables "file-based idempotency" so incremental runs won't reprocess the same file.
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {INGEST_LOG_TABLE} (
  bronze_table STRING,      -- e.g. 'orders_raw'
  source_file  STRING,      -- file path from UC metadata (_metadata.file_path)
  ingested_at  TIMESTAMP    -- timestamp when that file was ingested
)
USING DELTA
""")


# ----------------------------
# 5) Helper: Read a CSV as ALL STRINGS + add metadata
# ----------------------------

def read_csv_all_strings(csv_filename: str):
    """
    Read a CSV from RAW_VOLUME_DIR with schema inference OFF.
    This forces ALL columns to STRING so Bronze is stable and raw.
    """

    # Build the full path to the CSV in the Volume
    csv_path = f"{RAW_VOLUME_DIR}/{csv_filename}"

    # Read the CSV file with header = True and inferSchema = False (everything becomes STRING)
    df = (
        spark.read
        .option("header", True)          # Use the first row as column names
        .option("inferSchema", False)    # Critical: keep raw columns as STRING
        .option("mode", "PERMISSIVE")    # Do not fail on malformed rows; keep best-effort parsing
        .csv(csv_path)                   # Load the file into a Spark DataFrame
    )

    # Add standard Bronze metadata columns:
    # - ingested_at: when this row landed in Bronze
    # - source_file: which input file produced this row (UC-safe metadata field)
    # - source_system: constant label describing where data came from
    df = (
        df
        .withColumn("ingested_at", current_timestamp())         # Audit timestamp for ingestion
        .withColumn("source_file", col("_metadata.file_path"))  # UC-safe file lineage (replaces input_file_name())
        .withColumn("source_system", lit("olist_kaggle"))       # Constant source label
    )

    # Return the DataFrame ready to be written to Bronze
    return df


# ----------------------------
# 6) Helper: Filter only NEW files for incremental runs
# ----------------------------

def filter_new_files_only(df, bronze_table: str):
    """
    If WRITE_MODE == 'append', drop rows whose source_file was already ingested (based on ingest log).
    If WRITE_MODE == 'overwrite', return df unchanged (full rebuild).
    """

    # Full rebuild: ingest everything
    if WRITE_MODE == "overwrite":
        return df

    # For incremental runs:
    # - Pull the set of already-ingested files for this bronze_table
    logged_files = (
        spark.table(INGEST_LOG_TABLE)                              # Read the ingest log control table
        .filter(col("bronze_table") == lit(bronze_table))           # Keep only logs for this bronze_table
        .select(col("source_file").alias("logged_source_file"))     # Select file paths; rename for join clarity
        .distinct()                                                 # Unique file paths only
    )

    # Left anti join = keep rows from df that DO NOT match logged_files
    df_new = df.join(
        logged_files,                                               # Already ingested file paths
        on=(df["source_file"] == logged_files["logged_source_file"]),# Match on source_file path
        how="left_anti"                                             # Keep only new/unseen files
    )

    # Return only rows from files that have not been processed before
    return df_new


# ----------------------------
# 7) Helper: Write Bronze table as Unity Catalog managed Delta table
# ----------------------------

def write_bronze_table_managed(df, bronze_table: str):
    """
    Write a DataFrame into a UC managed Delta table using saveAsTable().
    We do NOT specify a LOCATION because UC manages table storage.
    """

    # Build fully-qualified table name: olist.bronze.<table>
    full_table_name = f"{BRONZE_SCHEMA}.{bronze_table}"

    # Write the DataFrame as Delta into a managed UC table
    (
        df.write
        .format("delta")                      # Store in Delta format
        .mode(WRITE_MODE)                     # overwrite for baseline; append for incremental
        .option("overwriteSchema", "true")    # Allow schema changes on overwrite runs
        .saveAsTable(full_table_name)         # Managed UC table write
    )


# ----------------------------
# 8) Helper: Log ingested files into control table
# ----------------------------

def log_ingested_files(df_written, bronze_table: str):
    """
    Log the source_file values that were ingested in this run for this bronze_table.
    This is how we prevent re-processing the same files on incremental runs.
    """

    # If we rebuilt the Bronze table with overwrite, we also reset the log for that table
    if WRITE_MODE == "overwrite":
        spark.sql(f"DELETE FROM {INGEST_LOG_TABLE} WHERE bronze_table = '{bronze_table}'")

    # Build one log row per distinct source_file that contributed data in this run
    files_df = (
        df_written
        .select(
            lit(bronze_table).alias("bronze_table"),  # Which bronze table we ingested into
            col("source_file")                        # Which file path produced rows
        )
        .distinct()                                   # One row per file
        .withColumn("ingested_at", current_timestamp())# Log time
    )

    # Append these log rows into the ingest log table
    (
        files_df.write
        .format("delta")                 # Store log rows in Delta
        .mode("append")                  # Append-only log table
        .saveAsTable(INGEST_LOG_TABLE)   # Write to UC table
    )


# ----------------------------
# 9) Dataset map (Bronze table name -> CSV filename)
# ----------------------------

# Each entry defines:
# - the target Bronze table name (ending with _raw)
# - the source CSV file name in the Volume raw folder
datasets = {
    "orders_raw": "olist_orders_dataset.csv",
    "order_items_raw": "olist_order_items_dataset.csv",
    "order_payments_raw": "olist_order_payments_dataset.csv",
    "customers_raw": "olist_customers_dataset.csv",
    "products_raw": "olist_products_dataset.csv",
    "sellers_raw": "olist_sellers_dataset.csv",
    "order_reviews_raw": "olist_order_reviews_dataset.csv",
    "geolocation_raw": "olist_geolocation_dataset.csv",
    "category_translation_raw": "product_category_name_translation.csv",
}


# ----------------------------
# 10) Main ingestion loop (read -> filter -> write -> log)
# ----------------------------

for bronze_table, csv_file in datasets.items():
    # Read the raw CSV as all strings and add metadata columns
    df = read_csv_all_strings(csv_file)

    # Apply incremental filter (only new files) if WRITE_MODE is append
    df_new = filter_new_files_only(df, bronze_table)

    # If there is no new data to ingest, skip writing and logging to save compute
    if df_new.limit(1).count() == 0:
        print(f"SKIP: {BRONZE_SCHEMA}.{bronze_table} (no new files)")
        continue

    # Write the Bronze table as a UC managed Delta table
    write_bronze_table_managed(df_new, bronze_table)

    # Log which files were ingested so future runs can skip duplicates
    log_ingested_files(df_new, bronze_table)

    # Print a success message for visibility
    print(f"OK: {BRONZE_SCHEMA}.{bronze_table} written with mode={WRITE_MODE} from {csv_file}")


# ----------------------------
# 11) Validation (counts + ingest log preview)
# ----------------------------

# Show row counts for each Bronze table as a quick validation
for bronze_table in datasets.keys():
    display(
        spark.sql(
            f"SELECT '{BRONZE_SCHEMA}.{bronze_table}' AS table_name, COUNT(*) AS row_count "
            f"FROM {BRONZE_SCHEMA}.{bronze_table}"
        )
    )

# Show the most recent ingest-log rows so you can confirm incremental tracking works
display(
    spark.sql(
        f"SELECT * FROM {INGEST_LOG_TABLE} ORDER BY ingested_at DESC LIMIT 50"
    )
)


moving catalog from workspace to olist

In [0]:
%sql
-- Show which catalog your session is currently using (important for where schemas were created)
SELECT current_catalog(); --> showing workspace

-- Show which schema your session is currently using
SELECT current_schema(); --> showing default

-- -- List all catalogs you have access to (to confirm 'olist' doesn't already exist)
-- SHOW CATALOGS;

-- -- List schemas in the current catalog (to see where 'olist_bronze' and 'olist_ops' exist)
-- SHOW SCHEMAS;


In [0]:
%sql
-- Create the new catalog
CREATE CATALOG IF NOT EXISTS olist;

In [0]:
%sql
-- Create schemas inside the new catalog
CREATE SCHEMA IF NOT EXISTS olist.bronze;
CREATE SCHEMA IF NOT EXISTS olist.ops;


In [0]:
%sql
SHOW CATALOGS;
SHOW SCHEMAS IN olist;




In [0]:
%sql
CREATE TABLE olist.bronze.orders_raw
DEEP CLONE workspace.olist_bronze.brz_orders;

CREATE TABLE olist.bronze.order_items_raw
DEEP CLONE workspace.olist_bronze.brz_order_items;

CREATE TABLE olist.bronze.order_payments_raw
DEEP CLONE workspace.olist_bronze.brz_order_payments;

CREATE TABLE olist.bronze.customers_raw
DEEP CLONE workspace.olist_bronze.brz_customers;

CREATE TABLE olist.bronze.products_raw
DEEP CLONE workspace.olist_bronze.brz_products;

CREATE TABLE olist.bronze.sellers_raw
DEEP CLONE workspace.olist_bronze.brz_sellers;

CREATE TABLE olist.bronze.order_reviews_raw
DEEP CLONE workspace.olist_bronze.brz_order_reviews;

CREATE TABLE olist.bronze.geolocation_raw
DEEP CLONE workspace.olist_bronze.brz_geolocation;

CREATE TABLE olist.bronze.category_translation_raw
DEEP CLONE workspace.olist_bronze.brz_category_translation;


In [0]:
%sql
CREATE TABLE olist.ops.bronze_ingest_log
DEEP CLONE workspace.olist_ops.bronze_ingest_log;


In [0]:
%sql
SHOW TABLES IN olist.bronze;

In [0]:
display(dbutils.fs.ls("/Volumes/olist/stage/olist_stage/olist_raw/"))
dbutils.fs.ls("/Volumes/olist/stage/olist_stage/olist_raw")

