In [0]:
import dlt;
import pyspark.sql.functions as F

@dlt.table(
    name = "bronze_customers",
    table_properties = {
        "quality": "bronze"
    },
    comment = "Raw customers ingestion"
)
def create_bronze_customers():
    return (spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/Volumes/circuitbox/landing/operational_data/customers/")
        .select(
            "*",
            F.col("_metadata.file_path").alias("input_file_path"),
            F.current_timestamp().alias("ingest_timstamp")
        ) 
    )  

In [0]:
@dlt.table(
    name = "silver_customers_clean",
    comment = "Cleaned customers",
    table_properties = {
        "quality": "silver"
    }
)
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_customer_name", "customer_name IS NOT NULL")
@dlt.expect("valid_telephone", "LENGTH(telephone) >= 10")
@dlt.expect("valid_email", "email is NOT NULL")
@dlt.expect("valid_dob", "date_of_birth >= '1920-01-01'")
def create_silver_customers_clean():
    return (spark.readStream.table("LIVE.bronze_customers").select(
            "customer_id",
            "customer_name",
            F.col("date_of_birth").cast("date"),
            "telephone",
            "email",
            F.col("created_date").cast("date")
        )
    )

In [0]:
dlt.create_streaming_table(
    name = "silver_customers",
    comment = "SCP Type 1 customers",
    table_properties = {
        "quality": "silver"
    }
)
dlt.apply_changes(
    target = "silver_customers",
    source = "LIVE.silver_customers_clean",
    keys = ["customer_id"],
    sequence_by = "created_date",
    stored_as_scd_type = 1
)
