# Process Addresses Data
1. Ingest the data into the data lakehouse - bronze_addresses
2. Perform data quality checks and transform the data as required - silver_adresses_clean
3. Apply changes to the Adresses data (SCD Type 2) - silver_addresses

Note: Within a DLT pipeline you can have some notebooks in Python, some notebooks in SQL.  But within a notebook, you can't mix languages, either all python or all SQL.  Can't have magic commands.

1. Ingest the data into the data lakehouse - bronze_addresses

In [0]:
# # Retrieve the storage account key from Databricks secrets
# storage_account_key = "Yijcr+xeZyHzFPhfzZdbw4HXMniSbaSnT5j6hOx3jK3WpEmL0PCZdEms2KGrHytdSnwtcZMIRY/R+AStummUzw=="

# # Set the Spark configuration for the Azure storage account
# spark.conf.set(
#     "fs.azure.account.key.udemyttstorage.dfs.core.windows.net",
#     storage_account_key
# )

In [0]:
# SAS_TOKEN = 'sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-06-14T02:10:03Z&st=2025-06-13T18:10:03Z&spr=https&sig=3oMVpiX9vnoQQG47dsYkgbp9ppXff0r4yBLzc8N9Fa4%3D'
# STORAGE_ACCOUNT_NAME = 'udemyttstorage'

# spark.conf.set(f"fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "SAS")
# spark.conf.set(f"fs.azure.sas.token.provider.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
# spark.conf.set(f"fs.azure.sas.fixed.token.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", SAS_TOKEN)

In [0]:
import dlt
import pyspark.sql.functions as F

@dlt.table(
    name="bronze_addresses",
    table_properties={"quality": "bronze"},
    comment="Raw addresses data ingested from the source system",
)
def create_bronze_addresses():
    data = spark.readStream \
        .format("cloudFiles") \
        .option("cloudFiles.format", "csv") \
        # .option("cloudFiles.schemaLocation","/mnt/circuitbox/landing/operational_data/addresses/_schema") \
        .option("cloudFiles.inferColumnTypes", "true") \
        .load("/mnt/circuitbox/landing/operational_data/addresses")

    return data.select(
        "*",
        F.col("_metadata.file_path").alias("input_file_path"),
        F.current_timestamp().alias("ingestion_date"),
    )

2. Perform data quality checks and transformthe data as required silver_addresses_clean

In [0]:
@dlt.table(
    name="silver_addresses_clean",
    comment = "Cleaned addresses data",
    table_properties = "{'quality': 'silver'}",
)
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_address", "address_line_1 IS NOT NULL")
@dlt.expect_or_drop("valid_postcode", "LENGTH(postcode) = 5")
def create_silver_addresses_clean():
    return spark.readStream.table("LIVE.bronze_addresses") \
        .select(
            "customer_id",
            "address_line_1",
            "city",
            "state",
            "postcode",
            F.col("created_date").cast("date")
        )

3. Apply changes to the Addresses data (SCD Type 2) - silver_addresses

In [0]:
dlt.create_streaming_table(
    name="silver_addresses",
    comment = "SCD Type 2 addresses data",
    table_properties = "{'quality': 'silver'}",
)

dlt.apply_changes(
    target = "silver_addresses",
    source = "silver_addresses_clean",
    keys = ["customer_id"],
    sequence_by = "created_date",
    stored_as_scd_type = 2,
)