# Simplify Ingestion and Transformation with Delta Live Tables

In this notebook, we'll work as a Data Engineer to build our Credit Decisioning database. <br>
We'll consume and clean our raw data sources to prepare the tables required for our BI & ML workload.

We have four data sources sending new files in our blob storage and we want to incrementally load this data into our Data warehousing tables:

- **Internal banking** data *(KYC, accounts, collections, applications, relationship)* come from the bank's internal relational databases and is ingested *once a day* through a CDC pipeline,
- **Credit Bureau** data (usually in XML or CSV format and *accessed monthly* through API) comes from government agencies (such as a central banks) and contains a lot of valuable information for every customer. We also use this data to re-calculate whether a user has defaulted in the past 60 days,
- **Partner** data - used to augment the internal banking data and ingested *once a week*. In this case we use telco data in order to further evaluate the character and creditworthiness of banking customers,
- **Fund transfer** are the banking transactions (such as credit card transactions) and are *available real-time* through Kafka streams.


## Delta Live Table: A simple way to build and manage data pipelines for fresh, high quality data!


Databricks simplifies this task with Delta Live Table (DLT) by making Data Engineering accessible to all.

DLT allows Data Analysts to create advanced pipeline with plain SQL.

<div>
  <div style="width: 45%; float: left; margin-bottom: 10px; padding-right: 45px">
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-accelerate.png"/> 
      <strong>Accelerate ETL development</strong> <br/>
      Enable analysts and data engineers to innovate rapidly with simple pipeline development and maintenance 
    </p>
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-complexity.png"/> 
      <strong>Remove operational complexity</strong> <br/>
      By automating complex administrative tasks and gaining broader visibility into pipeline operations
    </p>
  </div>
  <div style="width: 48%; float: left">
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-trust.png"/> 
      <strong>Trust your data</strong> <br/>
      With built-in quality controls and quality monitoring to ensure accurate and useful BI, Data Science, and ML 
    </p>
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-stream.png"/> 
      <strong>Simplify batch and streaming</strong> <br/>
      With self-optimization and auto-scaling data pipelines for batch or streaming processing 
    </p>
</div>
</div>

<br style="clear:both">

<img src="https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-logo.png" style="float: right;" width="200px">

## Delta Lake

All the tables we'll create in the Lakehouse will be stored as Delta Lake tables. Delta Lake is an open storage framework for reliability and performance.<br>
It provides many functionalities (ACID Transaction, DELETE/UPDATE/MERGE, Clone zero copy, Change data Capture...)<br>
For more details on Delta Lake, run dbdemos.install('delta-lake')

<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=lakehouse&org_id=1444828305810485&notebook=%2F01-Data-Ingestion%2F01-DLT-Internal-Banking-Data-SQL&demo_name=lakehouse-fsi-credit&event=VIEW&path=%2F_dbdemos%2Flakehouse%2Flakehouse-fsi-credit%2F01-Data-Ingestion%2F01-DLT-Internal-Banking-Data-SQL&version=1&user_hash=7804490f0d3be4559d29a7b52959f461489c4ee5e35d4afc7b55f311360ac589">

## Building a Delta Live Table pipeline to analyze consumer credit

In this example, we'll implement an end-to-end DLT pipeline consuming the aforementioned information. We'll use the medaillon architecture but we could build star schema, data vault, or any other modelisation.

We'll incrementally load new data with the autoloader, enrich this information and then load a model from MLFlow to perform our credit decisioning prediction.

This information will then be used to build our DBSQL dashboard to create credit scores, decisioning, and risk.

Let's implement the following flow: 
 
<div><img width="1000px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/credit_decisioning/fsi_credit_decisioning_dlt_0.png" /></div>

Your DLT Pipeline has been installed and started for you! Open the <a dbdemos-pipeline-id="dlt-fsi-credit-decisioning" href="#joblist/pipelines" target="_blank">Delta Live Table pipeline</a> to see it in action.<br/>
*(Note: The pipeline will automatically start once the initialization job is completed, this might take a few minutes... Check installation logs for more details)*

### 1/ Loading our data using Databricks Autoloader (cloud_files)

<img width="650px" style="float:right" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/credit_decisioning/fsi_credit_decisioning_dlt_1.png"/>
  
Autoloader allow us to efficiently ingest millions of files from a cloud storage, and support efficient schema inference and evolution at scale.

For more details on autoloader, run `dbdemos.install('auto-loader')`

#### 1. Credit Bureau

Credit bureau data refers to information about an individual's credit history, financial behavior, and creditworthiness that is collected and maintained by credit bureaus. Credit bureaus are companies that collect and compile information about consumers' credit activities, such as credit card usage, loan payments, and other financial transactions.

In [0]:
CREATE OR REFRESH STREAMING TABLE credit_bureau_bronze AS
  SELECT * FROM
    cloud_files('/Volumes/jy_demo_catalog/jy_fsi_credit_schema/credit_raw_data/credit_bureau', 'json',
                 map('header', 'true', 
                     'inferSchema', 'true', 
                     'cloudFiles.inferColumnTypes', 'true'))


#### 1. Customer table

The customer table comes from the internal KYC processes and contains customer-related data.

In [0]:
CREATE OR REFRESH STREAMING TABLE customer_bronze AS
  SELECT * FROM
    cloud_files('/Volumes/jy_demo_catalog/jy_fsi_credit_schema/credit_raw_data/internalbanking/customer', 'csv',
                 map('header', 'true', 
                     'inferSchema', 'true', 
                     'cloudFiles.inferColumnTypes', 'true',
                     'cloudFiles.schemaHints', 'passport_expiry date, visa_expiry date, join_date date, dob date'))


#### 2. Relationship table

The relationship table represents the relationship between the bank and the customer. It also comes from the raw databases.

In [0]:
CREATE OR REFRESH STREAMING TABLE relationship_bronze AS
  SELECT * FROM
    cloud_files('/Volumes/jy_demo_catalog/jy_fsi_credit_schema/credit_raw_data/internalbanking/relationship', 'csv',
                 map('header', 'true', 
                     'inferSchema', 'true', 
                     'cloudFiles.inferColumnTypes', 'true'))


#### 3. Account table

In [0]:
CREATE OR REFRESH STREAMING TABLE account_bronze AS
  SELECT * FROM
    cloud_files('/Volumes/jy_demo_catalog/jy_fsi_credit_schema/credit_raw_data/internalbanking/account', 'csv',
                 map('header', 'true', 
                     'inferSchema', 'true', 
                     'cloudFiles.inferColumnTypes', 'true'))


#### 4. Fund Transfer Table

Fund transfer is a real-time data stream that contains payment transactions performed by the client.

In [0]:
CREATE OR REFRESH STREAMING TABLE fund_trans_bronze AS
  SELECT * FROM
    cloud_files('/Volumes/jy_demo_catalog/jy_fsi_credit_schema/credit_raw_data/fund_trans', 'json',
                map('inferSchema', 'true', 
                    'cloudFiles.inferColumnTypes', 'true'))


#### 5. Telco

This is where we augment the internal banking data through external and alternative data sources - in this case, telecom partner data, containing payment features for the common customers (between the bank and the telco provider).

In [0]:
CREATE OR REFRESH STREAMING TABLE telco_bronze AS
  SELECT * FROM
    cloud_files('/Volumes/jy_demo_catalog/jy_fsi_credit_schema/credit_raw_data/telco', 'json',
                 map('inferSchema', 'true',
                     'cloudFiles.inferColumnTypes', 'true'))

### 2/ Enforce quality and materialize our tables for Data Analysts

<img width="650px" style="float:right" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/credit_decisioning/fsi_credit_decisioning_dlt_2.png"/>

The next layer often call silver is consuming **incremental** data from the bronze one, and cleaning up some information.

We're also adding an [expectation](https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-expectations.html) on different field to enforce and track our Data Quality. This will ensure that our dashboard are relevant and easily spot potential errors due to data anomaly.

For more advanced DLT capabilities run `dbdemos.install('dlt-loans')` or `dbdemos.install('dlt-cdc')` for CDC/SCDT2 example.

These tables are clean and ready to be used by the BI team!


#### 1. Fund transfer table

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW fund_trans_silver AS
  SELECT
    payer_account.cust_id payer_cust_id,
    payee_account.cust_id payee_cust_id,
    fund.*
  FROM
    live.fund_trans_bronze fund
  LEFT OUTER JOIN live.account_bronze payer_account ON fund.payer_acc_id = payer_account.id
  LEFT OUTER JOIN live.account_bronze payee_account ON fund.payee_acc_id = payee_account.id


#### 2. Customer table

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW customer_silver AS
  SELECT
    * EXCEPT (dob, customer._rescued_data, relationship._rescued_data, relationship.id, relationship.operation),
    year(dob) AS birth_year
  FROM
    live.customer_bronze customer
  LEFT OUTER JOIN live.relationship_bronze relationship ON customer.id = relationship.cust_id


#### 3. Account table

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW account_silver AS
  WITH cust_acc AS (
      SELECT cust_id, count(1) num_accs, avg(balance) avg_balance 
        FROM live.account_bronze
        GROUP BY cust_id
    )
  SELECT
    acc_usd.cust_id,
    num_accs,
    avg_balance,
    balance balance_usd,
    available_balance available_balance_usd,
    operation
  FROM
    cust_acc
  LEFT OUTER JOIN live.account_bronze acc_usd ON cust_acc.cust_id = acc_usd.cust_id AND acc_usd.currency = 'USD'


### 3/ Aggregation layer for analytics & ML

<img width="650px" style="float:right" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/fsi/credit_decisioning/fsi_credit_decisioning_dlt_3.png"/>

We curate all the tables in Delta Lake using Delta Live Tables so we can apply all the joins, masking, and data constraints in real-time. Data scientists can now use these datasets to built high-quality models, particularly to predict credit worthiness. Because we are masking sensitive data as part of Unity Catalog capabilities, we are able to confidently expose the data to many downstream users from data scientists to data analysts and business users.


#### 1. Credit bureau cleanup

We begin by ingesting credit bureau data, sourced from a Delta Lake table here. Typically, this data would be curated via API ingestion and dumped into cloud object stores. 

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW credit_bureau_gold
  (CONSTRAINT CustomerID_not_null EXPECT (CUST_ID IS NOT NULL) ON VIOLATION DROP ROW)
AS
  SELECT * FROM live.credit_bureau_bronze


#### 1. Fund transfer table

The fund transfer table represents peer-to-peer payments made between the customer and another person. This helps us to understand the frequency and monetary attributes for payments for each customer as a credit risk source.

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW fund_trans_gold AS (
  WITH 
    max_date AS (SELECT max(datetime) AS max_date FROM live.fund_trans_silver),
    12m_payer AS (SELECT
                      payer_cust_id,
                      COUNT(DISTINCT payer_cust_id) dist_payer_cnt_12m,
                      COUNT(1) sent_txn_cnt_12m,
                      SUM(txn_amt) sent_txn_amt_12m,
                      AVG(txn_amt) sent_amt_avg_12m
                    FROM live.fund_trans_silver WHERE cast(datetime AS date) >= date_add(MONTH, -12, (SELECT CAST(max_date AS date) FROM max_date))
                    GROUP BY payer_cust_id),
      12m_payee AS (SELECT
                        payee_cust_id,
                        COUNT(DISTINCT payee_cust_id) dist_payee_cnt_12m,
                        COUNT(1) rcvd_txn_cnt_12m,
                        SUM(txn_amt) rcvd_txn_amt_12m,
                        AVG(txn_amt) rcvd_amt_avg_12m
                      FROM live.fund_trans_silver WHERE CAST(datetime AS date) >= date_add(MONTH, -12, (SELECT CAST(max_date AS date) FROM max_date))
                      GROUP BY payee_cust_id),
      6m_payer AS (SELECT
                    payer_cust_id,
                    COUNT(DISTINCT payer_cust_id) dist_payer_cnt_6m,
                    COUNT(1) sent_txn_cnt_6m,
                    SUM(txn_amt) sent_txn_amt_6m,
                    AVG(txn_amt) sent_amt_avg_6m
                  FROM live.fund_trans_silver WHERE CAST(datetime AS date) >= date_add(MONTH, -6, (SELECT CAST(max_date AS date) FROM max_date))
                  GROUP BY payer_cust_id),
      6m_payee AS (SELECT
                    payee_cust_id,
                    COUNT(DISTINCT payee_cust_id) dist_payee_cnt_6m,
                    COUNT(1) rcvd_txn_cnt_6m,
                    SUM(txn_amt) rcvd_txn_amt_6m,
                    AVG(txn_amt) rcvd_amt_avg_6m
                  FROM live.fund_trans_silver WHERE CAST(datetime AS date) >= date_add(MONTH, -6, (SELECT CAST(max_date AS date) FROM max_date))
                  GROUP BY payee_cust_id),
      3m_payer AS (SELECT
                    payer_cust_id,
                    COUNT(DISTINCT payer_cust_id) dist_payer_cnt_3m,
                    COUNT(1) sent_txn_cnt_3m,
                    SUM(txn_amt) sent_txn_amt_3m,
                    AVG(txn_amt) sent_amt_avg_3m
                  FROM live.fund_trans_silver WHERE CAST(datetime AS date) >= date_add(MONTH, -3, (SELECT CAST(max_date AS date) FROM max_date))
                  GROUP BY payer_cust_id),
      3m_payee AS (SELECT
                    payee_cust_id,
                    COUNT(DISTINCT payee_cust_id) dist_payee_cnt_3m,
                    COUNT(1) rcvd_txn_cnt_3m,
                    SUM(txn_amt) rcvd_txn_amt_3m,
                    AVG(txn_amt) rcvd_amt_avg_3m
                  FROM live.fund_trans_silver WHERE CAST(datetime AS date) >= date_add(MONTH, -3, (SELECT CAST(max_date AS date) FROM max_date))
                  GROUP BY payee_cust_id)        
  SELECT c.cust_id, 
    12m_payer.* EXCEPT (payer_cust_id),
    12m_payee.* EXCEPT (payee_cust_id), 
    6m_payer.* EXCEPT (payer_cust_id), 
    6m_payee.* EXCEPT (payee_cust_id), 
    3m_payer.* EXCEPT (payer_cust_id), 
    3m_payee.* EXCEPT (payee_cust_id) 
  FROM live.customer_silver c 
    LEFT JOIN 12m_payer ON 12m_payer.payer_cust_id = c.cust_id
    LEFT JOIN 12m_payee ON 12m_payee.payee_cust_id = c.cust_id
    LEFT JOIN 6m_payer ON 6m_payer.payer_cust_id = c.cust_id
    LEFT JOIN 6m_payee ON 6m_payee.payee_cust_id = c.cust_id
    LEFT JOIN 3m_payer ON 3m_payer.payer_cust_id = c.cust_id
    LEFT JOIN 3m_payee ON 3m_payee.payee_cust_id = c.cust_id)


#### 3. Telco table

The telco table represents all the payments data for a given prospect or customer to understand credit worthiness based on a non-bank credit source.

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW telco_gold AS
SELECT
  customer.id cust_id,
  telco.*
FROM
  live.telco_bronze telco
  LEFT OUTER JOIN live.customer_bronze customer ON telco.user_phone = customer.mobile_phone


#### 4. Customer table

The customer data represents the system of record for PII and customer attributes that will be joined to other fact tables. 

In [0]:
CREATE OR REFRESH MATERIALIZED VIEW customer_gold AS
SELECT
  customer.*,
  account.avg_balance,
  account.num_accs,
  account.balance_usd,
  account.available_balance_usd
FROM
  live.customer_silver customer
  LEFT OUTER JOIN live.account_silver account ON customer.cust_id = account.cust_id 


#### 5. Adding a view removing firstname for our datas scientist users

The best practice for masking data in Databricks Delta Lake tables is to use dynamic views and functions like is_member to encrypt or mask data based on the group. In this case, we want to mask PII data based on user group, and we are using built-in encryption functions `aes_encrypt` to do this. Moreover, the key itself is saved into a Databricks secret for security reasons.

In [0]:
CREATE OR REPLACE LIVE VIEW customer_gold_secured AS
SELECT
  c.* EXCEPT (first_name),
  CASE
    WHEN is_member('data-science-users')
    THEN base64(aes_encrypt(c.first_name, 'YOUR_SECRET_FROM_MANAGER')) -- save secret in Databricks manager and load it in SQL with secret('<YOUR_SCOPE> ', '<YOUR_SECRET_NAME>')
    ELSE c.first_name
  END AS first_name
FROM
  live.customer_gold AS c

# Next: secure and share data with Unity Catalog

Now that we have ingested all these various sources of data, we can jump to the:

* [Governance with Unity Catalog notebook]($../02-Data-Governance/02-Data-Governance-credit-decisioning) to see how to grant fine-grained access to every user and persona and explore the **data lineage graph**,
* [Feature Engineering notebook]($../03-Data-Science-ML/03.1-Feature-Engineering-credit-decisioning) and start creating features for our machine learning models,
* Go back to the [Introduction]($../00-Credit-Decisioning).