In [0]:
CATALOG = "workspace"
SCHEMA  = "material_master"
VOLUME  = "datastore"   # <-- includes the space exactly

base = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/"
display(dbutils.fs.ls(base))  # you should see material_master_1k.csv


In [0]:
# DLT notebook: dlt_material_master
# Catalog/Schema/Volume based on your setup:
#   catalog  = workspace
#   schema   = material_master
#   volume   = "datastore"   
#   file     = material_master_1k.csv

import dlt
import pyspark.sql.functions as F

CATALOG = "workspace"
SCHEMA  = "material_master"
VOLUME  = "datastore"
FILE    = "material_master_1k.csv"

SOURCE_CSV = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/{FILE}"


# =========================
# BRONZE — RAW AS-IS (CSV|)
# =========================
@dlt.table(
    name="material_master_bronze",
    comment="Bronze: raw pipe-delimited Material Master from Volume (as-is)."
)
def material_master_bronze():
    return (
        spark.read
            .option("header", True)
            .option("delimiter", "|")
            .csv(SOURCE_CSV)
    )


# ============================================
# SILVER — CLEAN, STANDARDIZE, FIX DATATYPES
# - trims strings
# - normalizes column names to lower_snake
# - casts numeric/date fields
# - drops bad rows (incl. expectation)
# ============================================
@dlt.table(
    name="material_master_silver",
    comment="Silver: cleaned & standardized Material Master with datatype fixes and row-level validation."
)
@dlt.expect_or_drop("not_null_material_id", "material_id IS NOT NULL")   # required expectation
def material_master_silver():
    df = dlt.read("material_master_bronze")

    # 1) Normalize column names to lower-case once
    df = df.select([F.col(c).alias(c.lower()) for c in df.columns])

    # 2) Trim all string columns
    for c, t in df.dtypes:
        if t == "string":
            df = df.withColumn(c, F.trim(F.col(c)))

    # 3) Datatype fixes (apply only if the columns exist in your file)
    cols = set(df.columns)

    # ---- Numeric examples ----
    # price -> DECIMAL(18,2)
    if "price" in cols:
        df = (df
              .withColumn("price_raw", F.col("price"))
              .withColumn("price", F.col("price").cast("decimal(18,2)"))
             )
        # Drop rows where a non-null raw price failed to cast (invalid type)
        df = df.filter(~(F.col("price_raw").isNotNull() & F.col("price").isNull()))
        # Optional: keep only non-negative prices
        df = df.filter((F.col("price").isNull()) | (F.col("price") >= F.lit(0)))

    # weight -> DOUBLE
    if "weight" in cols:
        df = (df
              .withColumn("weight_raw", F.col("weight"))
              .withColumn("weight", F.col("weight").cast("double"))
             )
        df = df.filter(~(F.col("weight_raw").isNotNull() & F.col("weight").isNull()))

    # ---- Date examples ----
    # Try common date columns in order; cast the first one that exists
    for dcol in ["created_date", "create_date", "effective_date", "date"]:
        if dcol in cols:
            # Adjust the format if yours differs (e.g., "MM/dd/yyyy")
            df = df.withColumn(dcol, F.to_date(F.col(dcol), "yyyy-MM-dd"))
            # If a non-null raw value failed to parse, drop that row
            df = df.filter(~(F.col(dcol).isNotNull() & F.to_date(F.col(dcol), "yyyy-MM-dd").isNull()))
            break

    # 4) (Already handled by expectation) material_id must be NOT NULL.
    #    You can add more rules as expectations if you want, e.g.:
    # @dlt.expect("valid_price_nonnegative", "price >= 0")  # optional

    return df
