In [None]:
file_path = str(spark.conf.get("bundle.sourcePath", "."))

%pip install {file_path}/dist/*.whl

# Data ingestion with Delta Live Tables


#### Load libraries and functions


In [None]:
import dlt
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

### Set parameters


In [None]:
table_name = spark.conf.get("table_name")
autoloader_path = spark.conf.get("autoloader_path")

## Ingest data with the Auto Loader


In [None]:
@dlt.table(
    name=f"raw_{table_name}",
    # temporary=True,
    table_properties={"quality": "raw"},
    comment="Data loaded from Landing Zone",
)
def ingest() -> DataFrame:
    """Ingest raw data from the landing zone using the Auto Loader."""
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .load(autoloader_path)
        .withColumn("load_date", F.current_timestamp())
        .select("*", "_metadata")
    )

## Set [expectations](https://learn.microsoft.com/en-us/azure/databricks/delta-live-tables/expectations)


In [None]:
expectations = {}

In [None]:
@dlt.table(
    name=f"bronze_{table_name}",
    # temporary=True,
    table_properties={"quality": "bronze"},
    comment="Data transformed from raw",
)
@dlt.expect_all_or_drop(expectations)
def set_expectations() -> DataFrame:
    """Set expectations on raw data to set bronze quality."""
    return dlt.read_stream(f"raw_{table_name}")

## Calculate SCD2


In [None]:
# Set the date_column as a string
date_column = "registration_dttm"

# Set the key_columns, track_columns, and exclude_columns as a list of strings
key_columns = ["id"]
track_columns = [
    "first_name",
    "last_name",
    "email",
    "gender",
    "ip_address",
    "cc",
    "country",
    "birthdate",
    "salary",
    "title",
]
except_columns = ["comments", "_metadata"]

In [None]:
dlt.create_streaming_table(
    name=f"silver_{table_name}",
    comment="SCD2 implemented",
    table_properties={"quality": "silver"},
)

dlt.apply_changes(
    target=f"silver_{table_name}",
    source=f"bronze_{table_name}",
    keys=key_columns,
    sequence_by=F.col(date_column),
    except_column_list=except_columns,
    ignore_null_updates=False,
    track_history_column_list=track_columns,
    stored_as_scd_type="2",
)