#address data processing

In [0]:

import dlt
import pyspark.sql.functions as F

In [0]:
@dlt.table(
        name="bronze_addresses",
        table_properties = {'QUALITY': 'bronze'},
        comment ='RAW ADDRESS DATA INGESTED FROM THE SOURCE'
)
def create_bronze_addresses():
     return (
             spark.readStream.format("cloudFiles")
             .option("cloudFiles.format", "csv")
             .option("cloudFiles.inferColumnTypes", "true")
             .load("/Volumes/circuitbox/landing/operational_data/addresses")
             .select(
                     "*",
                     F.col("_metadata.file_path").alias("file_path"),
                     F.current_timestamp().alias("ingest_timestam")
             )
     )

##implemntin DLT expectation -data quality checks

In [0]:
@dlt.table(
        name="silver_addresses_clean",
        table_properties = {'QUALITY': 'seilver'},
        comment ='RAW ADDRESS DATA INGESTED FROM THE SOURCE'
)
@dlt.expect_or_fail("valid_cutsomer_id" , "customer_id is not null")
@dlt.expect_or_drop("valid_address" , "address_line_1 is not null")
@dlt.expect("valid_postcode" , "LENGTH(postcode)=5")

##constraint valid_customer_id EXPECT (customer_id is not null) on violation fail update,
def create_silver_addresses_clean():
     return (
             spark.readStream.table("LIVE.bronze_addresses")
             .select(
                 "customer_id",
                 "address_line_1",
                 "city",
                 "state",
                 "postcode",  
                 F.col("created_date").cast("date").alias("created_date")    
             )
     )       

#Apply chnages SCD type2-silver_addresses

In [0]:
dlt.create_streaming_table(
        name="silver_addresses",
        table_properties = {'quality' : 'silver'},
        comment ="scd type 2"
)

In [0]:
dlt.apply_changes(
        target = "silver_addresses",
        source = "silver_addresses_clean",
        keys = ["customer_id"],
        sequence_by = "created_date",
        stored_as_scd_type = 2
)    