In [0]:
%pip install mlflow==3.1.0
#If you have issues, make sure this matches your automl dependency version. For prod usage, use env_manager='conda'
%pip install azure-core azure-storage-file-datalake #for the display() in Azure only
dbutils.library.restartPython()

# Data engineering with Databricks - Building our C360 database

Building a C360 database requires ingesting multiple data sources.

It's a complex process requiring batch loads and streaming ingestion to support real-time insights, used for personalization and marketing targeting among others.

Ingesting, transforming and cleaning data to create clean SQL tables for our downstream user (Data Analysts and Data Scientists) is complex.

<link href="https://fonts.googleapis.com/css?family=DM Sans" rel="stylesheet"/>
<div style="width: 300px; height: 300px; text-align: center; float: right; margin: 30px 60px 10px 10px; font-family: 'DM Sans'; border-radius: 50%; border: 25px solid #fcba33ff; box-sizing: border-box; overflow: hidden;">
  <div style="display: flex; flex-direction: column; align-items: center; justify-content: center; height: 100%; width: 100%;">
    <div style="font-size: 70px; color: #70c4ab; font-weight: bold;">
      73%
    </div>
    <div style="color: #1b5162; padding: 0 30px; text-align: center;">
      of enterprise data goes unused for analytics and decision making
    </div>
  </div>
  <div style="color: #bfbfbf; padding-top: 5px;">
    Source: Forrester
  </div>
</div>

<br>


## <img src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/refs/heads/main/images/john.png" style="float:left; margin: -35px 0px 0px 0px" width="80px"> John, as Data engineer, spends immense timeâ€¦.

* Hand-coding data ingestion & transformations and dealing with technical challenges:<br>
  *Supporting streaming and batch, handling concurrent operations, small files issues, GDPR requirements, complex DAG dependencies...*<br><br>
* Building custom frameworks to enforce quality and tests<br><br>
* Building and maintaining scalable infrastructure, with observability and monitoring<br><br>
* Managing incompatible governance models from different systems
<br style="clear: both">

This results in **operational complexity** and overhead, requiring expert profile and ultimately **putting data projects at risk**.

<!-- Collect usage data (view). Remove it to disable collection or disable tracker during installation. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=lakehouse&org_id=2269002447546540&notebook=%2F01-Data-ingestion%2F01.2-SDP-python%2F01.1-SDP-churn-Python&demo_name=lakehouse-retail-c360&event=VIEW&path=%2F_dbdemos%2Flakehouse%2Flakehouse-retail-c360%2F01-Data-ingestion%2F01.2-SDP-python%2F01.1-SDP-churn-Python&version=1">

# Simplify Ingestion and Transformation with Lakeflow Connect & SDP

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/cross_demo_assets/Lakehouse_Demo_Team_architecture_1.png?raw=true" style="float: right" width="500px">

In this notebook, we'll work as a Data Engineer to build our c360 database. <br>
We'll consume and clean our raw data sources to prepare the tables required for our BI & ML workload.

We want to ingest the datasets below from Salesforce Sales Cloud and blob storage (`/demos/retail/churn/`) incrementally into our Data Warehousing tables:

- Customer profile data *(name, age, address etc)*
- Orders history *(what our customer bought over time)*
- Streaming Events from our application *(when was the last time customers used the application, typically a stream from a Kafka queue)*


<a href="https://www.databricks.com/resources/demos/tours/platform/discover-databricks-lakeflow-connect-demo" target="_blank"><img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/lakeflow-connect-anim.gif?raw=true" style="float: right; margin-right: 20px" width="250px"></a>

## 1/ Ingest data with Lakeflow Connect


Lakeflow Connect offers built-in data ingestion connectors for popular SaaS applications, databases and file sources, such as Salesforce, Workday, and SQL Server to build incremental data pipelines at scale, fully integrated with Databricks.


## 2/ Prepare and transform your data with SDP

<div>
  <div style="width: 45%; float: left; margin-bottom: 10px; padding-right: 45px">
    <p style="min-height: 65px;">
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/diganparikh-dp/Images/refs/heads/main/Icons/LakeFlow%20Connect.jpg"/>
      <strong>Efficient end-to-end ingestion</strong> <br/>
      Enable analysts and data engineers to innovate rapidly with simple pipeline development and maintenance
    </p>
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/diganparikh-dp/Images/refs/heads/main/Icons/LakeFlow%20Pipelines.jpg"/>
      <strong>Flexible and easy setup</strong> <br/>
      By automating complex administrative tasks and gaining broader visibility into pipeline operations
    </p>
  </div>
  <div style="width: 48%; float: left">
    <p style="min-height: 65px;">
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-trust.png"/>
      <strong>Trust your data</strong> <br/>
      With built-in orchestration, quality controls and quality monitoring to ensure accurate and useful BI, Data Science, and ML
    </p>
    <p>
      <img style="width: 50px; float: left; margin: 0px 5px 30px 0px;" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/logo-stream.png"/>
      <strong>Simplify batch and streaming</strong> <br/>
      With self-optimization and auto-scaling data pipelines for batch or streaming processing
    </p>
</div>
</div>

<br style="clear:both">

## Building a SDP pipeline to analyze and reduce churn

In this example, we'll implement an end-to-end SDP pipeline consuming our customers information. We'll use the medallion architecture but we could build star schema, data vault or any other modelisation.

We'll incrementally load new data with the autoloader, enrich this information and then load a model from MLFlow to perform our customer churn prediction.

This information will then be used to build our DBSQL dashboard to track customer behavior and churn.

Let's implement the following flow:

<div><img width="1100px" src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/retail/lakehouse-churn/lakehouse-retail-churn-de.png?raw=true"/></div>

*Note that we're including the ML model our [Data Scientist built]($../04-Data-Science-ML/04.1-automl-churn-prediction) using Databricks AutoML to predict the churn. We'll cover that in the next section.*

Your SDP Pipeline has been installed and started for you! Open the <a dbdemos-pipeline-id="sdp-churn" href="#joblist/pipelines/d9ee0288-0870-4a13-bb77-34bc53d73dd0" target="_blank">Churn SDP pipeline</a> to see it in action.<br/>
*(Note: The pipeline will automatically start once the initialization job is completed, this might take a few minutes... Check installation logs for more details)*


## 1/ Data Exploration

All Data projects start with some exploration. Open the [/explorations/sample_exploration]($./explorations/sample_exploration) notebook to get started and discover the data made available to you


## 2/ Ingest data: Bronze layer

<div><img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-de-small-1.png" width="700px" style="float: right"/></div>

Ingesting data from stream sources can be challenging. In this example we'll incrementally load the files from our cloud storage, only getting the new ones (in near real-time or triggered every X hours).

Note that while our streaming data is added to our cloud storage, we could easily ingest from kafka directly.

Auto-loader provides for you:

- Schema inference and evolution
- Scalability handling millions of files
- Simplicity: just define your ingestion folder, Databricks takes care of the rest!

For more details on autoloader, run `dbdemos.install('data-ingestion')`

Let's use it in our pipeline and ingest the raw JSON & CSV data being delivered in our blob storage.

Open the [transformations/01-bronze.py]($./transformations/01-bronze.py) notebook to review the Python code ingesting the raw data and creating our bronze layer.


## 3/ Silver layer: Clean and prepare data

<div><img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-de-small-2.png" width="700px" style="float: right"/></div>

The next layer often called silver is consuming **incremental** data from the bronze layer, and cleaning up some information.

We're also adding an [expectation](https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-expectations.html) on different fields to enforce and track our Data Quality. This will ensure that our dashboards are relevant and easily spot potential errors due to data anomaly.

For more advanced SDP capabilities run `dbdemos.install('pipeline-bike')` or `dbdemos.install('declarative-pipeline-cdc')` for CDC/SCD Type 2 examples.

These tables are clean and ready to be used by the BI team!

Open the [transformations/02-silver.py]($./transformations/02-silver.py) notebook to review the Python code creating our clean silver tables.


## 4/ Gold layer: Aggregate features and apply ML model

<div><img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-de-small-3.png" width="700px" style="float: right"/></div>

We're now ready to create the features required for our Churn prediction.

We need to enrich our user dataset with extra information which our model will use to help predicting churn, such as:

* last command date
* number of items bought
* number of actions in our website
* device used (iOS/iPhone)
* ...

Our Data scientist team has built a churn prediction model using Auto ML and saved it into Databricks Model registry.

One of the key values of the Lakehouse is that we can easily load this model and predict our churn right into our pipeline.

Note that we don't have to worry about the model framework (sklearn or other), MLFlow abstracts that for us.

Open the [transformations/03-gold.py]($./transformations/03-gold.py) notebook to review the Python code creating our features and predictions (ML model is loaded in this file).


## Conclusion
Our <a dbdemos-pipeline-id="sdp-churn" href="#joblist/pipelines/d9ee0288-0870-4a13-bb77-34bc53d73dd0" target="_blank">Spark Declarative Pipeline</a> is now ready using Python. We have an end-to-end cycle, and our ML model has been integrated seamlessly by our Data Engineering team.


For more details on model training, open the [model training notebook]($../04-Data-Science-ML/04.1-automl-churn-prediction)

Our final dataset includes our ML prediction for our Churn prediction use-case.

We are now ready to build our dashboards to track customer behavior and churn.

<img style="float: left; margin-right: 50px;" width="500px" src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/retail/lakehouse-churn/lakehouse-retail-c360-dashboard-churn-prediction.png?raw=true" />

<img width="500px" src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/retail/lakehouse-churn/lakehouse-retail-c360-dashboard-churn.png?raw=true"/>

<a dbdemos-dashboard-id="churn-universal" href='/sql/dashboardsv3/01f0db2542d6178c93dffacb9261c16b'  target="_blank">Open the DBSQL Dashboard</a>

# Next: secure and share data with Unity Catalog

Now that these tables are available in our Lakehouse, let's review how we can share them with the Data Scientists and Data Analysts teams.

Jump to the [Governance with Unity Catalog notebook]($../02-Data-governance/02.1-UC-data-governance-security-churn) or [Go back to the introduction]($../00-churn-introduction-lakehouse)

## Optional: Checking your data quality metrics with SDP
SDP tracks all your data quality metrics. You can leverage the expectations directly as SQL table with Databricks SQL to track your expectation metrics and send alerts as required. This lets you build the following dashboards:

<img width="1000" src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/retail/lakehouse-churn/lakehouse-retail-c360-dashboard-dlt-stat.png?raw=true">

<a dbdemos-dashboard-id="sdp-quality-stat" href='/sql/dashboardsv3/01f0db2542d6178c93dffacb9261c16b' target="_blank">Data Quality Dashboard</a>