In [0]:
import dlt
import pyspark.sql.functions as F
from pyspark.sql.types import *


In [0]:
@dlt.table( name="bronze_addresses", table_properties={'quality':'bronze'},
           comment="This is the raw address data ingested")
def create_bronze_address():
  return (
      spark.readStream.format("cloudFiles").
      option("cloudFiles.format", "csv").
      option("cloudFiles.inferSchema",True).
      load("/Volumes/circuitbox/landing/operational_data/addresses/").
      select(
          "*",
          F.col("_metadata.file_path").alias("input_file_path"),
          F.current_timestamp().alias("file_modification_time")
      )
  )

In [0]:
@dlt.table(name="silver_addresses_clean", table_properties={'quality': 'silver'},
           comment="Cleaned address data ingested")
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_address_line_1", "address_line_1 IS NOT NULL")
@dlt.expect("valid_postcode", "LENGTH(postcode) = 5")
def create_silver_addresses_clean():
  return (
      spark.readStream.table("LIVE.bronze_addresses").
      select(
          "customer_id", 
          "address_line_1",
          "city",
          "state", 
          "postcode",
          F.col("created_date").alias("date")
      )
  )

In [0]:
dlt.create_streaming_table(
    name="silver_addresses",
    comment="This is the silver address data ingested",
    table_properties={'quality':'silver'}
    )

In [0]:
dlt.apply_changes(
    target="silver_addresses",
    source="silver_addresses_clean",
    keys=["customer_id"],
    sequence_by="date",
    stored_as_scd_type=2,
)