# Simplify ETL with Delta Live Table

DLT makes Data Engineering accessible for all. Just declare your transformations in SQL or Python, and DLT will handle the Data Engineering complexity for you.

<img style="float:right" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-1.png" width="700"/>

**Accelerate ETL development** <br/>
Enable analysts and data engineers to innovate rapidly with simple pipeline development and maintenance

**Remove operational complexity** <br/>
By automating complex administrative tasks and gaining broader visibility into pipeline operations

**Trust your data** <br/>
With built-in quality controls and quality monitoring to ensure accurate and useful BI, Data Science, and ML

**Simplify batch and streaming** <br/>
With self-optimization and auto-scaling data pipelines for batch or streaming processing

## Our Delta Live Table pipeline

We'll be using as input a raw dataset containing information on our customers Loan and historical transactions.

Our goal is to ingest this data in near real time and build table for our Analyst team while ensuring data quality.

**Your DLT Pipeline is ready!** Your pipeline was started using the SQL notebook and is <a dbdemos-pipeline-id="dlt-loans" href="/#joblist/pipelines/8bd6a281-9662-4a17-92c6-8bbafd2e4632">available here</a>.

<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-engineering&org_id=4214571749987147&notebook=%2F02-DLT-Loan-pipeline-PYTHON&demo_name=dlt-loans&event=VIEW&path=%2F_dbdemos%2Fdata-engineering%2Fdlt-loans%2F02-DLT-Loan-pipeline-PYTHON&version=1">


Our datasets are coming from 3 different systems and saved under a cloud storage folder (S3/ADLS/GCS):

* `loans/raw_transactions` (loans uploader here in every few minutes)
* `loans/ref_accounting_treatment` (reference table, mostly static)
* `loans/historical_loan` (loan from legacy system, new data added every week)

Let's ingest this data incrementally, and then compute a couple of aggregates that we'll need for our final Dashboard to report our KPI.

In [None]:
# Uncomment to explore the raw data
# %fs ls /Volumes/pds/dbdemos_sharing_airlinedata/raw_data/raw_transactions


## Bronze layer: incrementally ingest data leveraging Databricks Autoloader

<img style="float: right; padding-left: 10px" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-2.png" width="600"/>

Our raw data is being sent to a blob storage.

Autoloader simplify this ingestion, including schema inference, schema evolution while being able to scale to millions of incoming files.

Autoloader is available in Python using the `cloud_files` format and can be used with a variety of format (json, csv, avro...):


#### STREAMING LIVE TABLE
Defining tables as `STREAMING` will guarantee that you only consume new incoming data. Without `STREAMING`, you will scan and ingest all the data available at once. See the [documentation](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-incremental-data.html) for more details

In [None]:
import dlt
from pyspark.sql import functions as F


@dlt.table(
    comment="New raw loan data incrementally ingested from cloud object storage landing zone"
)
def raw_txs():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/Volumes/pds/dbdemos_sharing_airlinedata/raw_data/raw_transactions")
    )


In [None]:

@dlt.table(comment="Lookup mapping for accounting codes")
def ref_accounting_treatment():
    return spark.read.format("delta").load(
        "/Volumes/pds/dbdemos_sharing_airlinedata/raw_data/ref_accounting_treatment"
    )


In [None]:

@dlt.table(comment="Raw historical transactions")
def raw_historical_loans():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/Volumes/pds/dbdemos_sharing_airlinedata/raw_data/historical_loans")
    )



## Silver layer: joining tables while ensuring data quality

<img style="float: right; padding-left: 10px" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-3.png" width="600"/>

Once the bronze layer is defined, we'll create the sliver layers by Joining data. Note that bronze tables are referenced using the `LIVE` spacename.

To consume only increment from the Bronze layer like `BZ_raw_txs`, we'll be using the `read_stream` function: `dlt.read_stream("BZ_raw_txs")`

Note that we don't have to worry about compactions, DLT handles that for us.

#### Expectations
By defining expectations (`@dlt.expect`), you can enforce and track your data quality. See the [documentation](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-expectations.html) for more details

In [None]:

@dlt.view(comment="Livestream of new transactions")
def new_txs():
    txs = dlt.read_stream("raw_txs").alias("txs")
    ref = dlt.read("ref_accounting_treatment").alias("ref")
    return txs.join(
        ref, F.col("txs.accounting_treatment_id") == F.col("ref.id"), "inner"
    ).selectExpr("txs.*", "ref.accounting_treatment as accounting_treatment")


In [None]:

@dlt.table(comment="Livestream of new transactions, cleaned and compliant")
@dlt.expect("Payments should be this year", "(next_payment_date > date('2020-12-31'))")
@dlt.expect_or_drop(
    "Balance should be positive", "(balance > 0 AND arrears_balance > 0)"
)
@dlt.expect_or_fail("Cost center must be specified", "(cost_center_code IS NOT NULL)")
def cleaned_new_txs():
    return dlt.read_stream("new_txs")


In [None]:

# This is the inverse condition of the above statement to quarantine incorrect data for further analysis.
@dlt.table(comment="Incorrect transactions requiring human analysis")
@dlt.expect("Payments should be this year", "(next_payment_date <= date('2020-12-31'))")
@dlt.expect_or_drop(
    "Balance should be positive", "(balance <= 0 OR arrears_balance <= 0)"
)
def quarantine_bad_txs():
    return dlt.read_stream("new_txs")


In [None]:

@dlt.table(comment="Historical loan transactions")
@dlt.expect("Grade should be valid", "(grade in ('A', 'B', 'C', 'D', 'E', 'F', 'G'))")
@dlt.expect_or_drop("Recoveries shoud be int", "(CAST(recoveries as INT) IS NOT NULL)")
def historical_txs():
    history = dlt.read_stream("raw_historical_loans").alias("l")
    ref = dlt.read("ref_accounting_treatment").alias("ref")
    return history.join(
        ref, F.col("l.accounting_treatment_id") == F.col("ref.id"), "inner"
    ).selectExpr("l.*", "ref.accounting_treatment as accounting_treatment")



## Gold layer

<img style="float: right; padding-left: 10px" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-4.png" width="600"/>

Our last step is to materialize the Gold Layer.

Because these tables will be requested at scale using a SQL Endpoint, we'll add Liquid Clustering at the table level to organize data for faster queries, and DLT will handle the rest.

In [None]:

@dlt.table(
    comment="Combines historical and new loan data for unified rollup of loan balances",
    cluster_by=["location_code"],
)
def total_loan_balances():
    return (
        dlt.read("historical_txs")
        .groupBy("addr_state")
        .agg(F.sum("revol_bal").alias("bal"))
        .withColumnRenamed("addr_state", "location_code")
        .union(
            dlt.read("cleaned_new_txs")
            .groupBy("country_code")
            .agg(F.sum("balance").alias("bal"))
            .withColumnRenamed("country_code", "location_code")
        )
    )


In [None]:

@dlt.table(
    comment="Live table of new loan balances for consumption by different cost centers"
)
def new_loan_balances_by_cost_center():
    return (
        dlt.read("cleaned_new_txs")
        .groupBy("cost_center_code")
        .agg(F.sum("balance").alias("sum_balance"))
    )


In [None]:

@dlt.table(comment="Live table of new loan balances per country")
def new_loan_balances_by_country():
    return (
        dlt.read("cleaned_new_txs")
        .groupBy("country_code")
        .agg(F.sum("count").alias("sum_count"))
    )


## Next steps

Your DLT pipeline is ready to be started. <a dbdemos-pipeline-id="dlt-loans" href="/#joblist/pipelines/8bd6a281-9662-4a17-92c6-8bbafd2e4632">Click here to access the pipeline</a> created for you using the SQL notebook.

To create a new one using this notebook, open the DLT menu, create a pipeline and select this notebook to run it. To generate sample data, please run the [companion notebook]($./_resources/00-Loan-Data-Generator) (make sure the path where you read and write the data are the same!)

Datas Analyst can start using DBSQL to analyze data and track our Loan metrics.  Data Scientist can also access the data to start building models to predict payment default or other more advanced use-cases.

## Tracking data quality

Expectations stats are automatically available as system table.

This information let you monitor your data ingestion quality.

You can leverage DBSQL to request these table and build custom alerts based on the metrics your business is tracking.


See [how to access your DLT metrics]($./03-Log-Analysis)

<img width="500" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/retail/resources/images/retail-dlt-data-quality-dashboard.png">

<a dbdemos-dashboard-id="dlt-expectations" href='/sql/dashboardsv3/01eff48352781e3fa01816771dcc313d' target="_blank">Data Quality Dashboard example</a>

Google Collab Content

In [None]:
# prompt: i am planning to cover dlt in databricks
# would you help me with some sample code or notebooks

import dlt
from pyspark.sql import functions as F

# Define the path to your data.  Replace with your actual paths.
# These paths assume you've uploaded the data to your cloud storage
# and mounted the storage to Databricks.
raw_transactions_path = "/mnt/your_data_path/raw_transactions"
ref_accounting_treatment_path = "/mnt/your_data_path/ref_accounting_treatment"
historical_loans_path = "/mnt/your_data_path/historical_loans"


@dlt.table(
    comment="New raw loan data incrementally ingested from cloud object storage"
)
def raw_txs():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(raw_transactions_path)
    )


@dlt.table(comment="Lookup mapping for accounting codes")
def ref_accounting_treatment():
    return spark.read.format("delta").load(ref_accounting_treatment_path)


@dlt.table(comment="Raw historical transactions")
def raw_historical_loans():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(historical_loans_path)
    )


@dlt.view(comment="Livestream of new transactions")
def new_txs():
    # Example using a hardcoded date - replace with appropriate logic
    txs = dlt.read_stream("raw_txs").alias("txs")
    ref = dlt.read("ref_accounting_treatment").alias("ref")
    return txs.join(
        ref, F.col("txs.accounting_treatment_id") == F.col("ref.id"), "inner"
    ).selectExpr("txs.*", "ref.accounting_treatment as accounting_treatment")


# ... (rest of your DLT pipeline code)


In [None]:
from google.colab import drive
drive.mount('/content/drive')