
#### Process Addresses Data
1. Ingest the data into the data lakehouse - stg_addresses
2. Perform data quality checks and transform the data as required - stg_addresses_clean
3. Apply changes to the Addresses data (SCD Type 2) - raw_addresses

In [None]:
import dlt
from pyspark.sql.functions import col, current_timestamp, current_date

#### 1. Ingest the data into the data lakehouse - bronze_addresses

In [None]:
@dlt.table(
    name="stg_address",  # you can assign a schema name here as well: <schema_name>.bronze_address
    comment="The address data ingested from the address's data lakehouse.",
    table_properties={
        "quality": "staging",
        "delta.autoOptimize.optimizeWrite": "true"
    }
)

def stg_address():
    """
    This function represents the ingestion pipeline for the address data from a data
    lakehouse. The function uses Delta Live Tables (DLT) and processes streaming CSV
    files to create a bronze-quality Delta table. Additional metadata columns are
    added, including the file path of input data, the ingestion timestamp, and the
    load date. The table is configured with specific properties for optimization.

    :return: A Spark DataFrame representing the transformed address data.
    :rtype: pyspark.sql.DataFrame
    """

    df_stg_address = spark \
        .readStream \
        .format("cloudFiles") \
        .option("cloudFiles.format", "csv") \
        .option("cloudFiles.inferSchema", "true") \
        .option("cloudFiles.inferColumnTypes", "true") \
        .option("cloudFiles.schemaLocation", "/Volumes/circuitbox/landing/operational_data/schema/address/") \
        .load("/Volumes/circuitbox/landing/operational_data/address/")

    df_stg_address = df_stg_address\
    .withColumn("input_file_path", col("_metadata.file_path"))\
    .withColumn("ingest_timestamp", current_timestamp())\
    .withColumn("load_date", current_date())

    return df_stg_address

#### 2. Perform data quality checks and transform the data as required - stg_addresses_clean

In [None]:

@dlt.table(
    name="stg_address_clean",
    comment="The address data after data quality checks and transformation.",
    table_properties={
        "quality": "staging",
        "delta.autoOptimize.optimizeWrite": "true"
    }
)
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_address", "address_line_1 IS NOT NULL")
@dlt.expect("valid_postcode", "LENGTH(postcode) = 5")
def stg_address_clean():
    """
    Reads and processes the "LIVE.stg_address" table as a streaming DataFrame, applying
    specified transformations such as selecting specific columns, casting the creation
    date to `date` type, and adding ingestion and load timestamps. This functionality
    is intended for cleaning and enriching address-related data.

    :rtype: pyspark.sql.dataframe.DataFrame
    :return: A PySpark DataFrame containing the cleaned and transformed address data
             with added ingestion and load timestamps.
    """

    df_stg_address_clean = spark\
    .readStream\
    .table("LIVE.stg_address")\
    .select(
        "customer_id",
        "address_line_1",
        "city",
        "state",
        "postcode",
        col("created_date").cast("date")
    )\
    .withColumn("ingest_timestamp", current_timestamp())\
    .withColumn("load_date", current_date())

    return df_stg_address_clean

#### 3. Apply changes to the Addresses data (SCD Type 2) - raw_addresses

In [None]:
dlt.create_streaming_table(
    name = "raw_addresses",
    comment = "SCD Type 2 addresses data",
    table_properties = {'quality' : 'raw'}
)

In [None]:
dlt.apply_changes(
    target = "raw_addresses",
    source = "stg_address_clean",
    keys = ["customer_id"],
    sequence_by = "created_date",
    stored_as_scd_type = 2
)