# Ingesting and transforming churn data with Delta Lake and Spark API

<img style="float: right" width="300px" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-2.png" />

In this notebook, we'll show you an alternative to Delta Live Table: building an ingestion pipeline with the Spark API.

As you'll see, this implementation is lower level than the Delta Live Table pipeline, and you'll have control over all the implementation details (handling checkpoints, data quality etc).

Lower level also means more power. Using Spark API, you'll have unlimited capabilities to ingest data in Batch or Streaming.

If you're unsure what to use, start with Delta Live Table!

*Remember that Databricks workflow can be used to orchestrate a mix of Delta Live Table pipeline with standard Spark pipeline.*

As reminder, we have multiple data sources coming from different system:

- Customer profile data *(name, age, adress etc)*
- Orders history *(what our customer bough over time)*
- Events from our application *(when was the last time customers used the application, typically this could be a stream from a Kafka queue)*


Leveraging Spark and Delta Lake makes such an implementation easy.

<!-- Collect usage data (view). Remove it to disable collection or disable tracker during installation. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=lakehouse&org_id=1444828305810485&notebook=%2F01-Data-ingestion%2Fplain-spark-delta-pipeline%2F01.5-Delta-pipeline-spark-churn&demo_name=lakehouse-retail-c360&event=VIEW&path=%2F_dbdemos%2Flakehouse%2Flakehouse-retail-c360%2F01-Data-ingestion%2Fplain-spark-delta-pipeline%2F01.5-Delta-pipeline-spark-churn&version=1&user_hash=0b3be070fa39374fb760232ebb606a5c489732ec881a7ebfc68231c496aed118">

In [0]:
%pip install mlflow==2.22.0

In [0]:
%run ../../_resources/00-setup $reset_all_data=false

## Building a Spark Data pipeline with Delta Lake

In this example, we'll implement a end 2 end pipeline consuming our customers information. We'll use the medaillon architecture but could build star schema, data vault or any other modelisation.



This can be challenging with traditional systems due to the following:
 * Data quality issue
 * Running concurrent operation
 * Running DELETE/UPDATE/MERGE over files
 * Governance & schema evolution
 * Performance ingesting millions of small files on cloud buckets
 * Processing & analysing unstructured data (image, video...)
 * Switching between batch or streaming depending of your requirement...

## Solving these challenges with Delta Lake

<div style="float:left">

**What's Delta Lake? It's a new OSS standard to bring SQL Transactional database capabilities on top of parquet files!**

Used as a new Spark format, built on top of Spark API / SQL

* **ACID transactions** (Multiple writers can simultaneously modify a data set)
* **Full DML support** (UPDATE/DELETE/MERGE)
* **BATCH and STREAMING** support
* **Data quality** (expectatiosn, Schema Enforcement, Inference and Evolution)
* **TIME TRAVEL** (Look back on how data looked like in the past)
* **Performance boost** with ZOrder, data skipping and Caching, solves small files issue 
</div>


<img src="https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-logo.png" style="height: 200px"/>

<br style="clear: both">

We'll incrementally load new data with the autoloader, enrich this information and then load a model from MLFlow to perform our customer churn prediction.

This information will then be used to build our DBSQL dashboard to track customer behavior and churn.

Let'simplement the following flow: 
 
<div><img width="1100px" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-de-delta.png"/></div>

*Note that we're including the ML model our [Data Scientist built](TODO) using Databricks AutoML to predict the churn.*

## ![](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 1/ Explore the dataset

Let's review the files being received

In [0]:
%sql 
LIST '/Volumes/data_pioneers/c360/c360/users'

In [0]:
%sql
SELECT * FROM json.`/Volumes/data_pioneers/c360/c360/users`

### 1/ Loading our data using Databricks Autoloader (cloud_files)
<div style="float:right">
  <img width="700px" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-de-delta-1.png"/>
</div>
  
Autoloader allow us to efficiently ingest millions of files from a cloud storage, and support efficient schema inference and evolution at scale.

For more details on autoloader, run `dbdemos.install('auto-loader')`

Let's use it to [create our pipeline](https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#joblist/pipelines/95f28631-1884-425e-af69-05c3f397dd90) and ingest the raw JSON & CSV data being delivered in our blob storage `/demos/retail/churn/...`. 

In [0]:
%sql
-- Note: tables are automatically created during  .writeStream.table("user_bronze") operation, but we can also use plain SQL to create them:
CREATE TABLE IF NOT EXISTS spark_churn_users_bronze (
     id                 STRING,
     email              STRING,
     creation_date      STRING,
     last_activity_date STRING,
     firstname          STRING,
     lastname           STRING,
     address            STRING,
     age_group          DOUBLE,
     canal              STRING,
     churn              BOOLEAN,
     country            STRING,
     gender             DOUBLE,
     _rescued_data      STRING
  ) 
  USING DELTA 
  CLUSTER BY (firstname, lastname) -- accelerate query by firstname/lastname with Liquid
  TBLPROPERTIES (
     delta.autooptimize.optimizewrite = TRUE,
     delta.autooptimize.autocompact   = TRUE ); 
-- With these 2 last options, Databricks engine will solve small files & optimize write out of the box!

In [0]:
volume_folder = f"/Volumes/{catalog}/{db}/c360"

def ingest_folder(folder, data_format, table):
  bronze_products = (spark.readStream
                              .format("cloudFiles")
                              .option("cloudFiles.format", data_format)
                              .option("cloudFiles.inferColumnTypes", "true")
                              .option("cloudFiles.schemaLocation", f"{volume_folder}/schema_spark/{table}") #Autoloader will automatically infer all the schema & evolution
                              .load(folder))

  return (bronze_products.writeStream
                    .option("checkpointLocation", f"{volume_folder}/checkpoint_spark/{table}") #exactly once delivery on Delta tables over restart/kill
                    .option("mergeSchema", "true") #merge any new column dynamically
                    .trigger(availableNow = True) #Remove for real time streaming
                    .table(table)) #Table will be created if we haven't specified the schema first
  
ingest_folder(f'{volume_folder}/orders', 'json', 'spark_churn_orders_bronze')
ingest_folder(f'{volume_folder}/events', 'csv', 'spark_churn_app_events')
ingest_folder(f'{volume_folder}/users', 'json',  'spark_churn_users_bronze').awaitTermination()

In [0]:
%sql 
-- Note the "_rescued_data" column. If we receive wrong data not matching existing schema, it'll be stored here
select * from spark_churn_users_bronze;


## ![](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 2/ Silver data: anonimized table, date cleaned

<img width="700px" style="float:right" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-de-delta-2.png"/>

We can chain these incremental transformation between tables, consuming only new data.

This can be triggered in near realtime, or in batch fashion, for example as a job running every night to consume daily data.

In [0]:
(spark.readStream 
        .table("spark_churn_users_bronze")
        .withColumnRenamed("id", "user_id")
        .withColumn("email", sha1(col("email")))
        .withColumn("creation_date", to_timestamp(col("creation_date"), "MM-dd-yyyy H:mm:ss"))
        .withColumn("last_activity_date", to_timestamp(col("last_activity_date"), "MM-dd-yyyy HH:mm:ss"))
        .withColumn("firstname", initcap(col("firstname")))
        .withColumn("lastname", initcap(col("lastname")))
        .withColumn("age_group", col("age_group").cast('int'))
        .withColumn("gender", col("gender").cast('int'))
        .withColumn("churn", col("churn").cast('int'))
        .drop(col("_rescued_data"))
     .writeStream
        .option("checkpointLocation", f"{volume_folder}/checkpoint_spark/churn_users")
        .option("mergeSchema", "true")
        .trigger(availableNow = True)
        .table("spark_churn_users").awaitTermination())

In [0]:
%sql select * from spark_churn_users;

In [0]:
(spark.readStream 
        .table("spark_churn_orders_bronze")
        .withColumnRenamed("id", "order_id")
        .withColumn("amount", col("amount").cast('int'))
        .withColumn("item_count", col("item_count").cast('int'))
        .withColumn("creation_date", to_timestamp(col("transaction_date"), "MM-dd-yyyy H:mm:ss"))
        .drop(col("_rescued_data"))
     .writeStream
        .option("checkpointLocation", f"{volume_folder}/checkpoint_spark/churn_orders")
        .option("mergeSchema", "true")
        .trigger(availableNow = True)
        .table("spark_churn_orders").awaitTermination())

### 3/ Aggregate and join data to create our ML features

<img width="700px" style="float:right" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-de-delta-3.png"/>


We're now ready to create the features required for our Churn prediction.

We need to enrich our user dataset with extra information which our model will use to help predicting churn, sucj as:

* last command date
* number of item bought
* number of actions in our website
* device used (ios/iphone)
* ...

In [0]:
spark.sql("""
    CREATE OR REPLACE TABLE spark_churn_features AS
      WITH 
          spark_churn_orders_stats AS (SELECT user_id, count(*) as order_count, sum(amount) as total_amount, sum(item_count) as total_item, max(creation_date) as last_transaction
            FROM spark_churn_orders GROUP BY user_id),  
          spark_churn_app_events_stats as (
            SELECT first(platform) as platform, user_id, count(*) as event_count, count(distinct session_id) as session_count, max(to_timestamp(date, "MM-dd-yyyy HH:mm:ss")) as last_event
              FROM spark_churn_app_events GROUP BY user_id)
        SELECT *, 
           datediff(now(), creation_date) as days_since_creation,
           datediff(now(), last_activity_date) as days_since_last_activity,
           datediff(now(), last_event) as days_last_event
           FROM spark_churn_users
             INNER JOIN spark_churn_orders_stats using (user_id)
             INNER JOIN spark_churn_app_events_stats using (user_id)""")
     
display(spark.table("spark_churn_features"))

## 5/ Enriching the gold data with a ML model

<img width="700px" style="float:right" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-de-delta-5.png"/>

Our Data scientist team has build a churn prediction model using Auto ML and saved it into Databricks Model registry. 

One of the key value of the Lakehouse is that we can easily load this model and predict our churn right into our pipeline. 

Note that we don't have to worry about the model framework (sklearn or other), MLFlow abstract that for us.

In [0]:
import mlflow
# Setup registry to use Databricks Unity Catalog
mlflow.set_registry_uri('databricks-uc')

#                                                                                            Alias/version
#                                                                 Model name (UC)                   |   
#                                                                     |                             |   
predict_churn_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{catalog}.{db}.dbdemos_customer_churn@prod", result_type="long", env_manager='virtualenv')

In [0]:
columns = predict_churn_udf.metadata.get_input_schema().input_names()
predictions = spark.table('spark_churn_features').limit(10).withColumn('churn_prediction', predict_churn_udf(*columns))
display(predictions)

## Simplify your operations with transactional DELETE/UPDATE/MERGE operations

Traditional Data Lake struggle to run these simple DML operations. Using Databricks and Delta Lake, your data is stored on your blob storage with transactional capabilities. You can issue DML operation on Petabyte of data without having to worry about concurrent operations.

In [0]:
%sql DELETE FROM spark_churn_users where creation_date < '2016-01-01T03:38:55.000+0000';

In [0]:
%sql describe history spark_churn_users;

In [0]:
%sql 
 --also works with AS OF TIMESTAMP "yyyy-MM-dd HH:mm:ss"
select * from spark_churn_users version as of 1 ;

-- You made the DELETE by mistake ? You can easily restore the table at a given version / date:
-- RESTORE TABLE spark_churn_users_clone TO VERSION AS OF 1

-- Or clone it (SHALLOW provides zero copy clone):
-- CREATE TABLE spark_user_gold_clone SHALLOW|DEEP CLONE user_gold VERSION AS OF 1

-- Turn on CDC to capture insert/update/delete operation:
-- ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [0]:
%sql
ALTER TABLE spark_churn_users    SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );
ALTER TABLE spark_churn_orders   SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );
ALTER TABLE spark_churn_features SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );

## Our finale tables are now ready to be used to build SQL Dashboards and ML models for customer classification!
<img style="float: right" width="400" src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/retail/lakehouse-churn/lakehouse-retail-c360-dashboard-churn-prediction.png?raw=true"/>


Switch to Databricks SQL to see how this data can easily be requested using <a  dbdemos-dashboard-id="churn-universal" href='/sql/dashboardsv3/01f06ca956721d19bb53d59fbf04a342' target="_blank">Churn prediction DBSQL dashboard</a>, or an external BI tool. 

Creating a single flow was simple.  However, handling many data pipeline at scale can become a real challenge:
* Hard to build and maintain table dependencies 
* Difficult to monitor & enforce advance data quality
* Impossible to trace data lineage
* Difficult pipeline operations (observability, error recovery)


#### To solve these challenges, Databricks introduced **Delta Live Table**
A simple way to build and manage data pipelines for fresh, high quality data!

# Next: secure and share data with Unity Catalog

Now that these tables are available in our Lakehouse, let's review how we can share them with the Data Scientists and Data Analysts teams.

Jump to the [Governance with Unity Catalog notebook]($../02-Data-governance/02-UC-data-governance-security-churn) or [Go back to the introduction]($../00-churn-introduction-lakehouse)