Process Addresses Data
- 1.Ingest teh data into the data lakehouse- bronze.addresses

In [0]:
import dlt
import pyspark.sql.functions as F

In [0]:
@dlt.table(
    name = "circuitbox.bronze.addresses",
    table_properties = {"quality":"bronze"},
    comment = 'raw addresses data ingested from the source system')
def bronze_addresses():
    return (
            spark.readStream.format('cloudFiles').option('cloudFiles.format',"csv")
            .option('cloudFiles.inferColumnTypes',"true")
            .load("/Volumes/circuitbox/landing/operational_data/addresses/")
            .select(
                "*",
                F.col("_metadata.file_path").alias("file_path"),
                F.current_timestamp().alias("ingest_time")
            )

    )

### 2.Perform data quality checks and transform the data as required silver_addresses_clean

In [0]:
@dlt.table(
    name = "circuitbox.silver.addresses_clean",
    comment = "this table cleaned addresses data",
    table_properties = {"quality":"silver"}
)
@dlt.expect_or_fail("valid_customer_id","customer_id is not null")
@dlt.expect_or_drop("valid_address","address_line_1 is not null")
@dlt.expect("valid_postcode","length(postcode)<5")
def silver_addresses_clean():
    return(
        spark.readStream.table("circuitbox.bronze.addresses")
        .select(
            "customer_id",
            "address_line_1",
            "city",
            "state",
            "postcode",
            F.col("created_date").cast("date")
        )
    )

### 3.Apply changes to the addresses data(SCD TYPE2)-- SILVER_ADDRESSES

In [0]:
dlt.create_streaming_table(
    name = "circuitbox.silver.addresses",
    comment = "This table contains SCD type 2 addresses data",
    table_properties = {"quality":"silver"}
)

In [0]:
# Apply Auto CDC for SCD Type 2
dlt.create_auto_cdc_flow(
    target="circuitbox.silver.addresses",
    source="circuitbox.silver.addresses_clean",
    keys=["customer_id"],  # Replace with your primary key(s)
    sequence_by=F.col("created_date"), 
    stored_as_scd_type=2  # SCD Type 2
)