In [0]:
import dlt
from pyspark.sql.functions import current_timestamp, col, to_date, trim, upper
from pyspark.sql.types import IntegerType, DoubleType, DateType

@dlt.table(
    table_properties={
        "delta.columnMapping.mode": "name"
    }
)
def material_bronze():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("delimiter", "|")  # Add this line for pipe-delimited
            .option("header", "true")
            .option("inferSchema", "true")
            .load("/Volumes/workspace/damg7370/datastore/DBC10/")
            .select(
                "*",
                current_timestamp().alias("load_dt"),
                col("_metadata.file_path").alias("source_file_path"),
                col("_metadata.file_name").alias("source_file_name")
            )
    )

In [0]:
import dlt
from pyspark.sql.functions import current_timestamp, col, to_date, trim, upper, regexp_replace
from pyspark.sql.types import IntegerType, DoubleType, DateType

# Data quality rules for silver layer
silver_rules = {
    "valid_material_id": "material_id IS NOT NULL",
    "valid_date": "last_updated IS NOT NULL"
}

@dlt.table(
    table_properties={
        "delta.columnMapping.mode": "name",
        "quality": "silver"
    }
)
@dlt.expect_all_or_drop(silver_rules)
def material_silver():
    return (
        dlt.read_stream("material_bronze")
            .select(
               
                regexp_replace(col("material_id"), "M", "").cast(IntegerType()).alias("material_id"),
                trim(col("material_name")).alias("material_name"),
                trim(upper(col("category"))).alias("category"),
                trim(col("sub_category")).alias("sub_category"),
                trim(upper(col("uom"))).alias("uom"),
                col("unit_cost").cast(DoubleType()).alias("unit_cost"),
                trim(col("supplier_name")).alias("supplier_name"),
                trim(col("country")).alias("country"),
                trim(col("plant")).alias("plant"),
                trim(upper(col("status"))).alias("status"),
                to_date(col("last_updated"), "yyyy-MM-dd").alias("last_updated"),
                col("lead_time_days").cast(IntegerType()).alias("lead_time_days"),
                col("safety_stock").cast(IntegerType()).alias("safety_stock"),
                col("reorder_level").cast(IntegerType()).alias("reorder_level"),
                col("load_dt"),
                col("source_file_path"),
                col("source_file_name")
            )
    )