# Delta live table - Unit testing

## Why testing?

Deploying tests on your DLT pipelines will guarantee that your ingestion is always stable and future proof.

The tests can be deployed as part of traditional CI/CD pipeline and can be run before a new version deployment, ensuring that a new version won't introduce a regression.

This is critical in the Lakehouse ecosystem, as the data we produce will then leveraged downstream:

* By Data Analyst for reporting/BI
* By Data Scientists to build ML model for downstream applications

## Unit testing strategy with DLT

Delta Live Table logic can be unit tested leveraging Expectation.

At a high level, the DLT pipelines can be constructed as following:

* The ingestion step (first step of the pipeline on the left) is written in a separate notebook. This correspond to the left **green** (prod) and **blue** (test) input sources.
   * The Production pipeline is defined with the PROD ingestion notebook:[./ingestion_profile/DLT-ingest_prod]($./ingestion_profile/DLT-ingest_prod) and connects to the live datasource (ex: kafka server, staging blob storage)
   * The Test pipeline (only used to run the unit test) is defined with the TEST ingestion notebook: [./ingestion_profile/DLT-ingest_test]($./ingestion_profile/DLT-ingest_test) and can consume from local files used for our unit tests (ex: adhoc csv file)
* A common DLT pipeline logic is used for both the prod and the test pipeline (the **yellow** in the graph)
* An additional notebook containing all the unit tests is used in the TEST pipeline (the **blue `TEST_xxx` tables** in the image on the right side)


<div><img width="1100" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-advanecd/DLT-advanced-unit-test-0.png"/></div>

## Accessing the DLT pipeline

Your pipeline has been created! You can directly access the <a dbdemos-pipeline-id="dlt-test" href="#joblist/pipelines/fcdb3d8e-104c-422b-a0a1-1c30a57f1650">Delta Live Table Pipeline for unit-test demo</a>.

<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-engineering&org_id=1765512908890676&notebook=%2FDLT-pipeline-to-test&demo_name=dlt-unit-test&event=VIEW&path=%2F_dbdemos%2Fdata-engineering%2Fdlt-unit-test%2FDLT-pipeline-to-test&version=1">


## Main Pipeline definition

<img style="float: right" width="700px" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-advanecd/DLT-advanced-unit-test-2.png"/>

This notebook contains the main pipeline definition, the one we want to test (in yellow in the diagram).

For this example, we centralized our main expectations in a metadata table that we'll use in the table definition.

Theses expectations are your usual expectations, used to ensure and track data quality during the ingestion process. 

We can then build DBSQL dashboard on top of it and triggers alarms when we see error in our data (ex: incompatible schema, increasing our expectation count)

In [0]:
# In this example, we'll store our rules as a delta table for more flexibility & reusability. 
# While this isn't directly related to Unit test, it can also help for programatical analysis/reporting.
catalog = "main"
schema = dbName = db = "dbdemos_dlt_unit_test"

data = [
 # tag/table name      name              constraint
 ("user_bronze_dlt",  "correct_schema", "_rescued_data IS NULL"),
 ("user_silver_dlt",  "valid_id",       "id IS NOT NULL AND id > 0"),
 ("spend_silver_dlt", "valid_id",       "id IS NOT NULL AND id > 0"),
 ("user_gold_dlt",    "valid_age",      "age IS NOT NULL"),
 ("user_gold_dlt",    "valid_income",   "annual_income IS NOT NULL"),
 ("user_gold_dlt",    "valid_score",    "spending_core IS NOT NULL")
]
#Typically only run once, this doesn't have to be part of the DLT pipeline.
spark.createDataFrame(data=data, schema=["tag", "name", "constraint"]).write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.expectations")

In [0]:
#Return the rules matching the tag as a format ready for DLT annotation.
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from csv file
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.table(f"{catalog}.{schema}.expectations").where(f"tag = '{tag}'")
  for row in df.collect():
    rules[row['name']] = row['constraint']
  return rules


### 1/ Loading our data

This is the first step of the pipeline. Note that we consume the data from the `raw_user_data` view.

This view is defined in the ingestion notebooks:
* For PROD: [./ingestion_profile/DLT-ingest_prod]($./ingestion_profile/DLT-ingest_prod), reading from prod system (ex: kafka)
* For TEST: [./ingestion_profile/DLT-ingest_test]($./ingestion_profile/DLT-ingest_test), consuming the test dataset (csv files)

Start by reviewing the notebooks to see how the data is ingested.


*Note: DLT is available as SQL or Python, this example will use Python*

In [0]:
import dlt

@dlt.table(comment="Raw user data")
@dlt.expect_all_or_drop(get_rules('user_bronze_dlt')) #get the rules from our centralized table.
def user_bronze_dlt():
  return dlt.read_stream("raw_user_data")

### 2/ Customer Silver layer
The silver layer is consuming **incremental** data from the bronze one, and cleaning up some information.

We're also adding an expectation on the ID. As the ID will be used in the next join operation, ID should never be null and be positive.

Note that the expectations have been defined in the metadata expectation table under `user_silver_dlt`

In [0]:
from pyspark.sql.functions import *

@dlt.table(comment="User data cleaned and anonymized for analysis.")
@dlt.expect_all_or_drop(get_rules('user_silver_dlt'))
def user_silver_dlt():
  return (
    dlt.read_stream("user_bronze_dlt").select(
      col("id").cast("int"),
      sha1("email").alias("email"),
      to_timestamp(col("creation_date"),"MM-dd-yyyy HH:mm:ss").alias("creation_date"),
      to_timestamp(col("last_activity_date"),"MM-dd-yyyy HH:mm:ss").alias("last_activity_date"),
      "firstname", 
      "lastname", 
      "address", 
      "city", 
      "last_ip", 
      "postcode"
    )
  )

### 3/ Ingest spend information

This is the same logic as for the customer data, we consume from the view defined in the TEST/PROD ingestion notebooks.

We're also adding an expectation on the ID column as we'll join the 2 tables based on this field, and we want to track it's data quality

In [0]:
@dlt.table(comment="Spending score from raw data")
@dlt.expect_all_or_drop(get_rules('spend_silver_dlt'))
def spend_silver_dlt():
    return dlt.read_stream("raw_spend_data")

### 4/ Joining the 2 tables to create the gold layer
We can now join the 2 tables on customer ID to create our final gold table.

As our ML model will be using `age`, `annual_income` and `spending_score` we're adding expectation to only keep valid entries 

In [0]:
@dlt.table(comment="Final user table with all information for Analysis / ML")
@dlt.expect_all_or_drop(get_rules('user_gold_dlt'))
def user_gold_dlt():
  return dlt.read_stream("user_silver_dlt").join(dlt.read("spend_silver_dlt"), ["id"], "left")

# Our pipeline is now ready to be tested!

Our pipeline now entirely defined.

Here are a couple of example we might want to test:

* Are we safely handling wrong data type as entry (ex: customer ID is sent as an incompatible STRING)
* Are we resilient to NULL values in our primary keys
* Are we enforcing uniqueness in our primary keys
* Are we properly applying business logic (ex: proper aggregation, anonymization of PII field etc)

## Creating the test dataset

The next step is to create a test dataset.

Creating the test dataset is a critical step. As any Unit tests, we need to add all possible data variation to ensure our logic is properly implemented.

As example, let's make sure we'll ingest data having NULL id or ids as string.

Open the [./test/DLT-Test-Dataset-setup]($./test/DLT-Test-Dataset-setup) notebook to see how this is done

## Defining the Unit Tests

We now have the data ready.

The final step is creating the actual test.

Open the [./test/DLT-Tests]($./test/DLT-Tests) notebook to see how this is done!

# That's it! our pipeline is fully ready & tested.

We can then process as usual: build dashboard to track production metrics (ex: data quality & quantity) but also BI reporting & Data Science for final business use-case leveraging the Lakehouse:

Here is a full example of the test pipeline definition.

Note that we have 3 notebooks in the DLT pipeline:

* **DLT-ingest_test**: ingesting our test datasets
* **DLT-pipeline-to-test**: the actual pipeline we want to test
* **test/DLT-Tests**: the test definition

Remember that you'll have to schedule FULL REFRESH everytime your run the pipeline to get accurate test results (we want to consume all the entry dataset from scratch).

This test pipeline can be scheduled to run within a Workflow, or as part of a CICD step (ex: triggered after a git commit)

```
{
    "clusters": [
        {
            "label": "default",
            "autoscale": {
                "min_workers": 1,
                "max_workers": 5
            }
        }
    ],
    "development": true,
    "continuous": false,
    "channel": "CURRENT",
    "edition": "advanced",
    "libraries": [
        {
            "notebook": {
                "path": "/Repos/xxxx/Delta-Live-Table-Unit-Test/ingestion_profile/DLT-ingest_test"
            }
        },
        {
            "notebook": {
                "path": "/Repos/xxxx/Delta-Live-Table-Unit-Test/DLT-pipeline-to-test"
            }
        },
        {
            "notebook": {
                "path": "/Repos/xxxx/Delta-Live-Table-Unit-Test/test/DLT-Tests"
            }
        }
    ],
    "name": "dbdemos_dlt_unit_test_{{CATALOG}}_{{SCHEMA}}",
    "catalog": "{{CATALOG}}",
    "target": "{{SCHEMA}}"
}
```

# Going further with DLT

## Checking your data quality metrics with Delta Live Table
Delta Live Tables tracks all your data quality metrics. You can leverage the expecations directly as SQL table with Databricks SQL to track your expectation metrics and send alerts as required. This let you build the following dashboards:

<img width="1000" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/retail/resources/images/retail-dlt-data-quality-dashboard.png">

## Building our first business dashboard with Databricks SQL

Once the data is ingested, we switch to Databricks SQL to build a new dashboard based on all the data we ingested.

Here is an example:

<img width="1000" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/retail/resources/images/retail-dashboard.png"/>