# Churn Prediction Inference - Batch or serverless real-time


With AutoML, our best model was automatically saved in our MLFlow registry.

All we need to do now is use this model to run Inferences. A simple solution is to share the model name to our Data Engineering team and they'll be able to call this model within the pipeline they maintained. That's what we did in our Spark Declarative Pipelines pipeline!

Alternatively, this can be schedule in a separate job. Here is an example to show you how MLFlow can be directly used to retriver the model and run inferences.

<!-- Collect usage data (view). Remove it to disable collection or disable tracker during installation. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=lakehouse&org_id=7405609900705693&notebook=%2F04-Data-Science-ML%2F04.3-running-inference&demo_name=lakehouse-retail-c360&event=VIEW&path=%2F_dbdemos%2Flakehouse%2Flakehouse-retail-c360%2F04-Data-Science-ML%2F04.3-running-inference&version=1">

In [0]:
%pip install mlflow==3.1.0
dbutils.library.restartPython()

In [0]:
%run ../_resources/00-setup $reset_all_data=false

##Deploying the model for batch inferences

Now that our model is available in the Registry, we can load it to compute our inferences and save them in a table to start building dashboards.

We will use MLFlow function to load a pyspark UDF and distribute our inference in the entire cluster. If the data is small, we can also load the model with plain python and use a pandas Dataframe.

### Scaling inferences using Spark 
We'll first see how it can be loaded as a spark UDF and called directly in a SQL function:

In [0]:
import mlflow
model_name = "dbdemos_customer_churn"
mlflow.set_registry_uri("databricks-uc")
#                                                                                                Alias
#                                                                                  Model name       |
#                                                                                        |          |
predict_churn_udf = mlflow.pyfunc.spark_udf(spark, model_uri=f"models:/{catalog}.{db}.{model_name}@prod", env_manager='virtualenv', result_type='long')
# Note: virtualenv will recreate an env from scratch which can take some time, but prevent any version issue. If you're using the same compute as for training, you can remove it to use the local env instead (just install the lib from the requirements.txt file as below)
#We can use the function in SQL
spark.udf.register("predict_churn", predict_churn_udf)

In [0]:
columns = predict_churn_udf.metadata.get_input_schema().input_names()
spark.table('churn_features').withColumn("churn_prediction", predict_churn_udf(*columns)).display()

### Pure pandas inference
If we have a small dataset, we can also compute our segment using a single node and pandas API:

In [0]:
from mlflow.store.artifact.models_artifact_repo import ModelsArtifactRepository
import mlflow
# Use the Unity Catalog model registry
mlflow.set_registry_uri("databricks-uc")
# download model requirement from remote registry
requirements_path = ModelsArtifactRepository(f"models:/{catalog}.{db}.dbdemos_customer_churn@prod").download_artifacts(artifact_path="requirements.txt") 

In [0]:
%pip install -r $requirements_path
dbutils.library.restartPython()

In [0]:
%run ../_resources/00-setup $reset_all_data=false

In [0]:
import mlflow
mlflow.set_registry_uri("databricks-uc")
model_name = "dbdemos_customer_churn"
model = mlflow.pyfunc.load_model(f"models:/{catalog}.{db}.{model_name}@prod")
columns = model.metadata.get_input_schema().input_names()
df = spark.table('churn_features').select(*columns).limit(10).toPandas()
df['churn_prediction'] = model.predict(df)
df.head(3)


## Realtime model serving with Databricks serverless serving

<img style="float: right; margin-left: 20px" width="700" src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/retail/lakehouse-churn/lakehouse-c360-model-serving.png?raw=true" />

Databricks also provides serverless serving.

Click on model Serving, enable realtime serverless and your endpoint will be created, providing serving over REST api within a Click.

Databricks Serverless offer autoscaling, including downscaling to zero when you don't have any traffic to offer best-in-class TCO while keeping low-latencies model serving.

To deploy your serverless model, open the [Model Serving menu](https://xxxx.cloud.databricks.com/?o=1660015457675682#mlflow/endpoints), and select the model you registered within Unity Catalog.

In [0]:
from mlflow.deployments import get_deploy_client
model_endpoint_name = "dbdemos_customer_churn_endpoint"
last_version = get_last_model_version(f"{catalog}.{db}.{model_name}")
client = get_deploy_client("databricks")
try:
    endpoint = client.create_endpoint(
        name=model_endpoint_name,
        config={
            "served_entities": [
                {
                    "name": f"dbdemos_customer_churn_endpoint_{last_version}",
                    "entity_name": f"{catalog}.{db}.{model_name}",
                    "entity_version": last_version,
                    "workload_size": "Small",
                    "scale_to_zero_enabled": True
                }
            ]
        }
    )
except Exception as e:
    if "already exists" in str(e).lower():
        print(f"Endpoint {catalog}.{db}.{model_endpoint_name} already exists. Skipping creation.")
    else:
        raise e

while client.get_endpoint(model_endpoint_name)['state']['config_update'] == 'IN_PROGRESS':
    time.sleep(10)

In [0]:
dataset = spark.table('churn_features').select(*columns).limit(3).toPandas()
#Make it a string to send to the inference endpoint
dataset['last_transaction'] = dataset['last_transaction'].astype(str)
dataset

In [0]:
from mlflow import deployments

def score_model(dataset):
  client = mlflow.deployments.get_deploy_client("databricks")
  payload = {"dataframe_split": dataset.to_dict(orient='split')}
  predictions = client.predict(endpoint=model_endpoint_name, inputs=payload)
  print(predictions)

#Deploy your model and uncomment to run your inferences live!
score_model(dataset)


# Next step: Leverage inferences and automate actions to increase revenue

## Automate action to reduce churn based on predictions

We now have an end 2 end data pipeline analizing and predicting churn. We can now easily trigger actions to reduce the churn based on our business:

- Send targeting email campaign to the customer the most likely to churn
- Phone campaign to discuss with our customers and understand what's going
- Understand what's wrong with our line of product and fixing it

These actions are out of the scope of this demo and simply leverage the Churn prediction field from our ML model.

## Track churn impact over the next month and campaign impact

Of course, this churn prediction can be re-used in our dashboard to analyse future churn and measure churn reduction. 

The pipeline created with the Lakehouse will offer a strong ROI: it took us a few hours to setup this pipeline end 2 end and we have potential gain for $129,914 / month!

<img width="800px" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-dbsql-prediction-dashboard.png">

<a dbdemos-dashboard-id="churn-prediction" href='/sql/dashboardsv3/01f0f16ad84a15ef89c46929f7810112'>Open the Churn prediction DBSQL dashboard</a>





## Reducing churn leveraging Databricks GenAI and LLMs capabilities 

GenAI provides unique capabilities to improve your customer relationship, providing better services but also better analyzing your churn risk.

Databricks provides built-in GenAI capabilities for you to accelerate such GenAI apps deployment. 

Discover how with the [Agent Tools]($../05-Generative-AI/05.1-Agent-Functions-Creation) Notebook in the new Generative AI section of this demo!

[Go back to the introduction]($../00-churn-introduction-lakehouse)