# Processes Addresses data
- 1.Ingest the data into data lakehouse -- bronze_addresses
- 2.perform data quality checks and transform the data as required -- silver_addresses_clean 
- 3.Apply Changes to the addresses data (SCD type 2 )--silver_addresses

In [0]:
import dlt
import pyspark.sql.functions as f

In [0]:
@dlt.table(
    name = 'bronze_addresses',
    table_properties = {'quality':'bronze'},
    comment = 'raw address data ingested from the source system'
    )
def create_bronze_addresses():
    return(
        spark.readStream.
        format('cloudFiles').
        option('cloudFiles.format','csv').
        option('cloudFiles.inferColumnTypes','true').
        load('/Volumes/circuitbox/landing/operational_data/addresses/').
        select("*",
               f.col("_metadata.file_path").alias("input_file_path"),
            f.current_timestamp().alias("ingested_time_stamp"))
    )



In [0]:
@dlt.table(
    name="silver_address_cleaned",
    table_properties={"quality": "silver"},
    comment="cleaned address data"
)
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_address", "address_line_1 IS NOT NULL")
@dlt.expect("valid_postcode", "length(postcode) = 5")
def create_addresses_clean():
    return (
        dlt.read_stream("bronze_addresses")
        .select(
            "customer_id",
            "address_line_1",
            "city",
            "state",
            "postcode",
            f.col("created_date").cast("date").alias("created_date")
        )
    )

In [0]:
dlt.create_streaming_table(
    name = "silver_addresses",
    comment = "address data cleaned",
    table_properties = {"quality":"silver"}
)

In [0]:
dlt.create_auto_cdc_flow(
    target="silver_addresses",
    source="LIVE.silver_address_cleaned",            
    keys=["customer_id"],
    sequence_by="created_date",
    stored_as_scd_type=2
)