In [0]:
import dlt
import pyspark.sql.functions as f

In [0]:
%python

@dlt.table(
    name="bronze_addresses",
    comment= "Raw data from the source system",
     table_properties={
         "pipelines.autoOptimize.managed": "true",
         "quality":"bronze"
    }
    
)
def bronze_addresses():
    return(
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format","csv")
        .option("cloudFiles.inferColumnTypes","true")
        .option("cloudFiles.schemaEvolutionMode","addNewColumns")
        .load("/Volumes/dlt_pipeline_catalog/landing_schema/project_dlt_volume/addresses/")
        .select(
            "*",
            f.col("_metadata.file_path").alias("file_path"),
            f.current_timestamp().alias("ingested_timestamp")
        )


    )


In [0]:
@dlt.table(
    name="silver_addresses_clean",
    comment="Cleaned data from the bronze table",
    table_properties={
        "pipelines.autoOptimize.managed": "true",
        "quality":"silver"
    }

)
@dlt.expect_or_fail("valid_cusomter_id","customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_addresses","address_line_1 IS NOT NULL")
@dlt.expect("valid_postal_code","LENGTH(postcode) = 5")

def silver_addresses_clean():
    return(
        spark.readStream.table("Live.bronze_addresses")
        .select(
            "customer_id"
            ,"address_line_1"
            ,"city"
            ,"state"
            ,"postcode",
            f.col("created_date").cast("date")
        )

        
    )

In [0]:
dlt.create_streaming_table(
    name="silver_addresses",
    comment="SCD Type 2 Adress data",
    table_properties={
        "pipelines.autoOptimize.managed": "true",
        "quality":"silver"
    }
)


In [0]:
dlt.apply_changes(
    target="silver_addresses",
    source="silver_addresses_clean",
    keys=["customer_id"],
    sequence_by="created_date",
    stored_as_scd_type =2


)