In [0]:
import dlt
from pyspark.sql.functions import col,current_timestamp

In [0]:
@dlt.table(
    name = "bronze_addresses",
    table_properties = {"quality" : "bronze"},
    comment = "this is addresses bronze table"
)
def create_bronze_addresses():
    return(
        spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "csv")
         .option("cloudFiles.inferColumnTypes", "true")
         .load("/Volumes/circuitbox/landing/operationaldata/addresses/")
         .withColumn("input_file_name",col("_metadata.file_path"))
         .withColumn("ingestion_timestamp",current_timestamp())
)

In [0]:
@dlt.table(
    name = "sliver_addresses_clean",
    table_properties = {"quality" : "silver"},
    comment = "this is clean addresses silver table"
)
@dlt.expect_or_fail("valid_customer_id","customer_id is not null")
@dlt.expect_or_drop("valid_addresses","address_line_1 is not null")
@dlt.expect("valid_postcode","length(postcode) >= 5")
def sliver_addresses_clean():
    return(
        spark.readStream.table("bronze_addresses")
             .withColumn("created_date",col("created_date").cast("date"))
             .drop(col("rescue"))
    )

In [0]:
dlt.create_streaming_table(
    name = "sliver_addresses",
    table_properties = {"quality" : "silver"},
    comment = "this is addresses silver table"
)

In [0]:
dlt.apply_changes(
    target = "sliver_addresses",
    source = "sliver_addresses_clean",
    keys = ["customer_id"],
    sequence_by = "created_date",
    stored_as_scd_type = 2
)