# ML model scoring with PREDICT

## Introduction

In this notebook series, you'll see a Microsoft Fabric data science workflow with an end-to-end example. The scenario is to build a model to predict whether bank customers would churn or not. The churn rate, also known as the rate of attrition refers to the rate at which bank customers stop doing business with the bank.

The main steps in this notebook series are:


- Notebook 1: Data Ingestion 
    1. Install custom libraries
    2. Load the data 

- Notebook 2:<br>
    3. Understand and process the data through exploratory data analysis and demonstrate the use of Fabric Data Wrangler feature. 

- Notebook 3: <br>
    4. Train machine learning models using `Scikit-Learn` and `LightGBM`, and track experiments using MLflow and Fabric Autologging feature.
    5. Evaluate and save the final machine learning model

- Notebool 4:<br>
	6. load the best model to run predicitons.


## Generate batch inference from the ML model's item page

From the ML model's item page, you can choose either of the following options to start generating batch predictions for a specific model version with PREDICT. 

With **Apply this model in wizart**, you can use the UI to generate a customised PREDICT code as notebook or code sniped. Altenatively, **Copy code to apply** can generate a code template that you can copy into a notebook and customize the parameters yourself. For more detail, we recommend visiting the official [Microsoft Fabric documentation](https://learn.microsoft.com/en-us/fabric/data-science/model-scoring-predict#generate-predict-code-from-an-ml-models-item-page).

![image-alt-text](https://learn.microsoft.com/en-us/fabric/data-science/media/model-scoring-predict/apply-model.png#lightbox)




### Use a guided UI experience

The guided UI experience walks you through steps to:

1. Select source data for scoring
2. Map the data correctly to your ML model's inputs
3. Specify the destination for your model's outputs
4. Create a notebook that uses `transform` to generate and store prediction results

In our case, this generates the following code:

```python
import mlflow
from synapse.ml.predict import MLFlowTransformer
    
df = spark.read.format("delta").load(
    "abfss://fd38eb65-3cc4-4868-82bf-bc7b79c7b550@onelake.dfs.fabric.microsoft.com/2702e363-4951-4ca0-bb07-7517c5337666/Tables/df_test"
)
    
model = MLFlowTransformer(
    inputCols=["CreditScore","Age","Tenure","Balance","NumOfProducts","HasCrCard","IsActiveMember","EstimatedSalary","NewTenure","Geography_France","Geography_Germany","Geography_Spain", "Gender_Female","Gender_Male"],
    outputCol="predictions",
    modelName="lgbm_sm",
    modelVersion=1
)
df = model.transform(df)
    
df.write.format('delta').mode("overwrite").save(
    "abfss://fd38eb65-3cc4-4868-82bf-bc7b79c7b550@onelake.dfs.fabric.microsoft.com/2702e363-4951-4ca0-bb07-7517c5337666/Tables/customer_churn_test_predictions"
)

```

### Use a customizable code template
To use a code template for generating batch predictions:

1. Go to the item page for a given ML model version.
2. Select **Copy code to apply** from the **Apply this version** dropdown. The selection allows you to copy a customizable code template.

In our case, this generates the following template:

```python
import mlflow
from synapse.ml.predict import MLFlowTransformer
    
df = spark.read.format("delta").load(
    <INPUT_TABLE> # Your input table filepath here
)
    
model = MLFlowTransformer(
    inputCols=["CreditScore","Age","Tenure","Balance","NumOfProducts","HasCrCard","IsActiveMember","EstimatedSalary","NewTenure","Geography_France","Geography_Germany","Geography_Spain","Gender_Female","Gender_Male"], # Your input columns here
    outputCol="predictions", # Your new column name here
    modelName="lgbm_sm", # Your model name here
    modelVersion=1 # Your model version here
)
df = model.transform(df)
    
df.write.format('delta').mode("overwrite").save(
    <OUTPUT_TABLE> # Your output table filepath here
)
```

### Imports and Parameters

In [None]:
import mlflow
from synapse.ml.predict import MLFlowTransformer
from pyspark.ml.feature import SQLTransformer 
from pyspark.sql.functions import col, pandas_udf, udf, lit

Define these parameters, so that you can use this notebook with different datasets or [Assign parameters values from a pipeline](https://learn.microsoft.com/en-us/fabric/data-engineering/author-execute-notebook#assign-parameters-values-from-a-pipeline).

In [None]:
INPUT_TABLE_NAME = "gold/churn_test"
OUTPUT_TABLE_NAME = "churn_prediction"
MODEL_NAME = "lgbm_sm"
MODEL_VERSION = "latest"

## Step 6: Run batch inference

## Load the test data

In [None]:
df_test = spark.read.format("delta").load(f"Tables/{INPUT_TABLE_NAME}")
display(df_test.limit(5))

### Load ML model for inference

To create an `MLFlowTransformer` object for generating batch predictions, you must perform the following actions:

- `inputCols`: specify which columns you need as model inputs,
- `outputCol`: choose a name for the new output column, and
- `modelName` and `modelVersion`: provide the correct model name and model version for generating those predictions.

<mark>NOTE:
You can get more information about a python object by using mouseover or calling it with the `help()` function.</mark>

```python
>> help(MLFlowTransformer)

Help on class MLFlowTransformer in module synapse.ml.predict.MLFlowTransformer:

class MLFlowTransformer(pyspark.ml.base.Transformer, pyspark.ml.param.shared.HasInputCols, pyspark.ml.param.shared.HasOutputCol, synapse.ml.logging.LoggerFactory.SynapseMLLogging)
 |  MLFlowTransformer(inputCols: List[str] = None, outputCol: str = None, modelName: str = None, modelVersion: str = 'latest', trackingUri: str = None, registerModel: bool = True, flattenOutput: bool = True) -> pyspark.ml.base.Transformer
 |  
 |  Args:
 |      inputCols (str):  Columns to feed to the model
 |      outputCol (str): The column to add output predictions to
 |      modelName (str):  The name of the model in the model registry
 |      modelVersion (str):  The version of the model in the model registry
 |      trackingUri (str):  The location of the MLFlow tracking server
 |      registerModel (bool): Whether to register the model with the PREDICT SQL command
 |      flattenOutput (bool): Whether to Flatten Predict Output
 |  
 |  Method resolution order:
 |      MLFlowTransformer
 |      pyspark.ml.base.Transformer
 |      pyspark.ml.param.shared.HasInputCols
 |      pyspark.ml.param.shared.HasOutputCol
 |      pyspark.ml.param.Params
 |      pyspark.ml.util.Identifiable
 |      synapse.ml.logging.LoggerFactory.SynapseMLLogging
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, inputCols: List[str] = None, outputCol: str = None, modelName: str = None, modelVersion: str = 'latest', trackingUri: str = None, registerModel: bool = True, flattenOutput: bool = True) -> pyspark.ml.base.Transformer
 |      Parameters
 |      ----------
 ...

```

In [None]:
# Define the model URI
model_uri = f"models:/{MODEL_NAME}/{MODEL_VERSION}"
# Load the model
model = mlflow.pyfunc.load_model(model_uri)
# Access the model's signature
signature = model.metadata.get_input_schema()

print('Model input columns are:', signature.input_names())

In [None]:
model = MLFlowTransformer(
    inputCols=signature.input_names(),
    outputCol='predictions',
    modelName=MODEL_NAME,
    modelVersion=MODEL_VERSION # If you want to apply a specific model version, you should specify it here.
)

### PREDICT with the Transformer API

The following code calls the PREDICT function using the Transformer API.

In [None]:
predictions = model.transform(df_test)
display(predictions)

### PREDICT with the Spark SQL API
The following code calls the PREDICT function using the Spark SQL API.

In [None]:
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 'latest'
features = signature.input_names()

sqlt = SQLTransformer().setStatement( 
    f"SELECT PREDICT('{MODEL_NAME}/{MODEL_VERSION}', {','.join(features)}) as predictions FROM __THIS__")

# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))

### PREDICT with a user-defined function (UDF)

The following code calls the PREDICT function using a PySpark UDF.



In [None]:
# Substitute "model" and "features" below with values for your own model name and feature columns
model_udf = model.to_udf()
features = signature.input_names()

display(df_test.withColumn("predictions", model_udf(*[col(f) for f in features])).limit(5))

## Write model prediction results to the lakehouse
Once you have generated batch predictions, you can write the model results back to the lakehouse.

In [None]:
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = f"gold/{OUTPUT_TABLE_NAME}"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")