# Creating a Churn prediction model

This notebook covers the following steps:
- Creating ML Pipeline using PySpark MlLib
- Tuning the hyperparameters of the model using PySpark
- Tracking the tuning experiments using MlFlow

In [None]:
spark.sparkContext.setLogLevel("WARN")

#The setLogLevel method is used to set the logging level for Spark. 
# Spark will only display warning messages or higher severity messages in the console or logs, 
# which can be helpful in reducing the amount of output and improving performance. 
# 
# Setting the logging level to WARN can be especially useful in a production environment.

In [None]:
pip install mlflow

Python interpreter will be restarted.
Collecting mlflow
  Downloading mlflow-2.3.2-py3-none-any.whl (17.7 MB)
Collecting gunicorn<21
  Downloading gunicorn-20.1.0-py3-none-any.whl (79 kB)
Collecting docker<7,>=4.0.0
  Downloading docker-6.1.2-py3-none-any.whl (148 kB)
Collecting pyyaml<7,>=5.1
  Downloading PyYAML-6.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (661 kB)
Collecting gitpython<4,>=2.1.0
  Downloading GitPython-3.1.31-py3-none-any.whl (184 kB)
Collecting cloudpickle<3
  Downloading cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Collecting alembic!=1.10.0,<2
  Downloading alembic-1.11.1-py3-none-any.whl (224 kB)
Collecting Flask<3
  Downloading Flask-2.3.2-py3-none-any.whl (96 kB)
Collecting markdown<4,>=3.3
  Downloading Markdown-3.4.3-py3-none-any.whl (93 kB)
Collecting querystring-parser<2
  Downloading querystring_parser-1.2.4-py2.py3-none-any.whl (7.9 kB)
Collecting importlib-metadata!=4.7.0,<7,>=3.7.0
  Downloading impor

#### `Step 1`: Understanding Churn Modelling

Churn, in the context of business, refers to the phenomenon of customers or users ceasing their engagement or relationship with a company's product or service. It is commonly used to describe situations where customers discontinue using a service, cancel a subscription, or switch to a competitor. Churn can have a significant impact on a business, and modeling churn can help companies understand and mitigate this impact effectively.

Modeling churn is essential for several reasons. **Firstly**, by identifying the factors that contribute to churn, companies can gain insights into customer behavior and preferences, allowing them to take proactive measures to retain customers. Understanding why customers churn can help businesses address their pain points, improve customer satisfaction, and enhance overall service quality.

**Secondly**, predicting churn enables businesses to allocate their resources effectively. By identifying customers who are likely to churn in the future, companies can prioritize retention efforts and allocate resources to targeted marketing campaigns, personalized offers, or tailored customer service interventions. This approach can be more cost-effective than applying retention strategies uniformly to all customers.

**Lastly**, churn modeling helps companies evaluate the success of their retention strategies and make data-driven decisions. By tracking and analyzing churn rates over time, businesses can assess the effectiveness of their initiatives and optimize their retention efforts. This iterative process allows organizations to refine their strategies, test new interventions, and continuously improve customer retention rates.

Let's consider a business example to illustrate the importance of churn modeling. Imagine you're working for a subscription-based streaming platform like Netflix. Churn modeling would be crucial for such a company because subscriber retention is vital for its success.

By building a churn model, you could analyze various customer attributes and behaviors that contribute to churn. For instance, you might find that customers who have not engaged with the platform for a specific period or who consistently rate content poorly are more likely to churn. Armed with this knowledge, you can develop targeted retention strategies.

Using the churn model predictions, you could implement personalized approaches to retain customers. For instance, if the model indicates that a particular subscriber is at high risk of churning, you could offer them a discounted subscription plan, recommend content tailored to their preferences, or send them personalized emails highlighting new releases aligned with their interests. These efforts can significantly improve customer retention rates and ultimately boost the platform's revenue and growth.

Learn more [here](https://inmoment.com/blog/ready-to-tackle-customer-churn-heres-how/).

![churn](https://inmoment.com/wp-content/uploads/2021/05/Customer-Churn-Rate.png)

#### `Step 2`: Load the dataset

In [None]:
orders_df = spark.read.format("parquet").option("header","true").load("/FileStore/bda_data/ord_sample/")

orders_df.limit(5).display()

order_id,account_id,reference_date,merchant_id,order_shift,order_origin,delivery_fee,order_total,subsidy,device_app_version,city,merchant_category,distance_merchant_customer,nps_score,review_score,review_created_at,review_is_commented,review_is_fraud
8c37df7a47f7acb958ba64a21d0b8c4615b520b725cd7c25d173aef79e9c8203,152569bd4e43dc5a773a60af1e871c669ea3df11773c57a3c9f15e6a612314e5,2021-06-05,bebf7f3a9f0d78e5f4e54a0236434ea3bd03d322b1796e2a93378dc2914d7057,weekend dinner,RESTAURANT,279.65000000000003,5459.650000000001,0.0,9.100.0,GONDOR,SMALL_BUSINESS,3447.1948,,,,,
32611db6cd360ca4860bf334c4fb21f7f62009fb3bf3df07f1d46298e539e98e,6deb32d01419c424d4c95dfc95cebfb4fb0402384127817a4788a5d74545822b,2021-03-13,16d86434ef63db1a7a091648e87f57e701b94cef62a5b5b9b2060eefa0427bb7,weekend snack,RESTAURANT,367.15,3692.15,0.0,9.90.0,RIVENDELL,GOLD_PARTNER,8300.169,,,,,
467fb17648c52ad77bfe2498b1c56729b89a0075b9315ee9d31581b741f5a00f,818d68c1fca75470a22de84a2824b889ced6d0defacc358b1a811711c0ca3574,2021-06-25,1d75761dcab70c44fcebf53f87c4b054d8d3f903648dd16cb9860b8baf601520,weekend dinner,RESTAURANT,0.0,7376.25,0.0,9.108.0,ROHAN,SMALL_BUSINESS,1432.896,,,,,
80978f17425463a185f10630c65facec07a450bfa14f9b529a7ed127242cf5ab,22b830f42c585f4816bf7514ccf7d808b6be08985594557141150a99ae83bd8e,2021-01-28,9a1d8d7169dc1b2147154a335e8f3ca92b6e8ef70791e2ecec59f506676c4ba7,weekday dinner,RESTAURANT,280.0,1186.5,324.0,9.85.1,GONDOR,GOLD_PARTNER,7362.216,10.0,5.0,2021-01-30T02:43:44.330+0000,False,False
73c78c97246c5d3289934642ae6e673d07d64b79b6f1768095aaf93ebbe219ce,980c66024ef4d90a55ebd71348c824ddb5b998a463ac10d8da93b6dd1bc281d2,2021-03-03,17f056e61a4d0e43b36ff720deb3570aa6470660bf05ee018602bf89677cdb8a,weekday dinner,RESTAURANT,279.65000000000003,3170.65,0.0,9.90.0,GONDOR,GOLD_PARTNER,4926.0703,,,,,


#### `Step 3`: Create the features and labels

The code below generate features from an input `DataFrame`, which contains information about orders made by different accounts. The features generated are based on the RFM (Recency, Frequency, Monetary) framework, with an additional categorical feature called `subsidy_bucket``.

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.dataframe import DataFrame

def generate_features(input_df: DataFrame, start_date: str, end_date:str, recency_reference_date: str) -> DataFrame:
    """
    Generate RFM-based features and an additional categorical feature from the input DataFrame.
    
    :param input_df: Input DataFrame containing order information.
    :param start_date: Start date for filtering the input DataFrame.
    :param end_date: End date for filtering the input DataFrame.
    :param recency_reference_date: Reference date for calculating recency.
    :return: DataFrame with generated features.
    """
    features_df = (
        input_df
        .filter(f.col('reference_date')>= start_date) #Filters the input DataFrame based on the start_date and end_date values.
        .filter(f.col('reference_date')< end_date)
        .groupBy('account_id') #Groups the filtered DataFrame by the 'account_id' column.
        .agg(
            f.countDistinct('order_id').alias('freq_3mo'),
            f.sum('order_total').alias('total_paid_3mo'),
            f.max('reference_date').alias('most_recent_order_date'),
            (f.sum('subsidy')/f.sum('order_total')).alias('subsidy_pct')
        ) #Aggregates the grouped DataFrame by performing various calculations
        .withColumn('recency', f.datediff(f.lit(recency_reference_date), f.col('most_recent_order_date')))
        # We will create this "extra" feature which is not in the RFM framework, just to work with categorical variables in our pipeline
        .withColumn(
            'subsidy_bucket', 
            f.when(f.col('subsidy_pct') < 0.10, "bucket_1")
             .when(f.col('subsidy_pct') < 0.20, "bucket_2")
             .when(f.col('subsidy_pct') < 0.50, "bucket_3")
             .otherwise("bucket_4") #Creates an additional categorical feature called 'subsidy_bucket' based on the 'subsidy_pct' value
        )
        .dropna(how='all')
        .drop(f.col('most_recent_order_date'), f.col('subsidy_pct')) #Drops any rows that contain all null values and drops the 'most_recent_order_date' and 'subsidy_pct' columns.
    )
    
    return features_df

The code below generate labels regarding the `churn` behavior of customers, from the input `DataFrame`.  
For that, we create a new column called `y_status` based on the activity of the accounts in comparing the period of two months. We filter only active users in the previous month and then:
   - If the account has no orders in the most recent month, assign a value of 1.
   - Otherwise, assign a value of 0.  
   
 The value of 1 represents `churned` customers;
 The value of 0 represents all the rest.  
 
 We encode our `y` as an integer because we need it in this format for the PySpark Ml Pipeline. Otherwise, we'd need to preprocess it, applying an `StringIndexer` to the categories.

In [None]:
from datetime import datetime, timedelta

def generate_labels(input_df: DataFrame, y_month_string: str) -> DataFrame:
    """
    Generate labels for the input DataFrame based on the activity of accounts in specific months.
    input_df should comprise at least 3 months of data, from the y_reference_month backwards
    
    :param input_df: Input DataFrame containing order information.
    :param y_reference_month: reference month for filtering the input DataFrame.
    
    :return: DataFrame with generated labels.
    """
    y_month_date = datetime.strptime(y_month_string, '%Y-%m-%d')
    y_month_m1 = (y_month_date - timedelta(days=28)).replace(day=1) # truncate the date to month

    # Format the modified date back to a string
    y_month_m1_string = y_month_m1.strftime('%Y-%m-%d')

    labels_df = (
        orders_df
        .filter(f.col('reference_date')>= y_month_m1_string)
        .filter(f.col('reference_date')<= y_month_string)
        .withColumn('reference_month', f.date_trunc('month', f.col('reference_date')).cast('date'))
        .groupBy('account_id')
        .pivot('reference_month')
        .agg(
            f.countDistinct('order_id')
        )
        .filter(f.col(y_month_m1_string).isNotNull()) # only users active in last month
        .withColumn(
            'y_status',
            f.when(f.col(y_month_string).isNull(), 1).otherwise(0)
        )
        .select('account_id', 'y_status', f.col(y_month_m1_string).alias('orders_m-1'))
        .fillna(0, subset=['orders_m-1'])

    )
    return labels_df

Now we encapsulate the feature and labels generation in one single function:

In [None]:
def generate_features_and_labels(input_df: DataFrame) -> DataFrame:
    features_df = input_df.transform(lambda df: generate_features(df, '2021-03-01', '2021-06-01', '2021-06-01'))
    labels_df = input_df.transform(lambda df: generate_labels(df, '2021-06-01'))
    output_df = labels_df.join(features_df, ['account_id'], 'inner')
    
    return output_df

And then we apply it to our input data, also splitting the training and testing sets:

In [None]:
train_data, test_data = orders_df.transform(generate_features_and_labels).randomSplit([0.7, 0.3], seed=42)

#### `Step 4`: Train the pipeline

The code below creates a ML `Pipeline` for a classification problem. The pipeline consists of several preprocessing steps and a RandomForestClassifier mode. Here's a step-by-step explanation of the code:

In [None]:
# 1. Import the necessary classes and functions from PySpark's ML library.
from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline, PipelineModel

# 2. Create an Imputer to fill missing values in the 'freq_3mo', 'total_paid_3mo', and 'recency' columns.
impute = Imputer(inputCols=['freq_3mo', 'total_paid_3mo', 'recency'], outputCols=['freq_3mo', 'total_paid_3mo', 'recency']) 

# 3. Create a VectorAssembler to combine the continuous features into a single vector column called 'continuous_features'.
assemble = VectorAssembler(inputCols=['freq_3mo', 'total_paid_3mo', 'recency', 'orders_m-1'], outputCol='continuous_features')

# 4. Create a StandardScaler to scale the continuous features to have zero mean and unit variance.
scale = StandardScaler(inputCol='continuous_features', outputCol='scaled_continuous_features', withMean=True, withStd=True)

# 5. Create a StringIndexer to convert the 'subsidy_bucket' column into a numerical index column called 'subsidy_bucket_idx'.
index = StringIndexer(inputCols=['subsidy_bucket'], outputCols=['subsidy_bucket_idx'])

# 6. Create a OneHotEncoder to convert the 'subsidy_bucket_idx' column into a one-hot encoded vector column called 'subsidy_bucket_vector'.
one_hot = OneHotEncoder(inputCol='subsidy_bucket_idx', outputCol='subsidy_bucket_vector')

# 7. Create another VectorAssembler to combine the scaled continuous features and the one-hot encoded categorical feature into a single vector column called 'input_features'.
final_assemble = VectorAssembler(inputCols=['scaled_continuous_features', 'subsidy_bucket_vector'], outputCol='input_features')

# 8. Create a RandomForestClassifier with the specified input and output columns.
rf = RandomForestClassifier(featuresCol="input_features", labelCol="y_status", predictionCol="prediction")

In [None]:
# 9. Create a Pipeline and set its stages to include all the preprocessing steps and the classifier.
pipe = Pipeline()
pipe.setStages(
    [
        impute,
        assemble,
        scale,
        index,
        one_hot,
        final_assemble,
        rf
    ]
)

Out[7]: Pipeline_184e4ffb1c2e

In [None]:
# 10. Fit the pipeline to the training data. (CMD may take 5 or more minutes)
pipe_model = pipe.fit(train_data)

In [None]:
# 11. Save the fitted pipeline model to a specified path.
chkpt_path = 'dbfs:/FileStore/models_checkpoints/churn_classifier_1/'
pipe_model.save(chkpt_path)

In [None]:
# 12. Load the saved pipeline model.
pipe_model_loaded = PipelineModel.load(chkpt_path)


#13. Transform the training and test data using the loaded pipeline model.
fitted_train_data = pipe_model_loaded.transform(train_data)
fitted_test_data = pipe_model_loaded.transform(test_data)

#### `Step 5`: Evaluate classifier

We can create a `confusion matrix`...
|                   | Predicted Negative | Predicted Positive |
|-------------------|--------------------|--------------------|
| **Actual Negative** |        True Neg          |        False Pos          |
| **Actual Positive** |        False Neg          |        True Pos          |

...with a simple aggregation usin `.pivot()`:

In [None]:
fitted_train_data.groupBy('y_status').pivot('prediction').count().display()

y_status,0.0,1.0
1,790,17939
0,1379,4517


We can also use the `MulticlassClassificationEvaluator` from PySpark's ML library to evaluate the performance of a classification model on both the training and test datasets. The evaluation metric used in this case is the F1-Score.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create an evaluator instance with the specified label column, prediction column, and metric name (accuracy)
evaluator = MulticlassClassificationEvaluator(
    labelCol="y_status", predictionCol="prediction", metricName="f1")

# accepted metrics: f1|accuracy|weightedPrecision|weightedRecall|weightedTruePositiveRate| weightedFalsePositiveRate|weightedFMeasure|truePositiveRateByLabel| falsePositiveRateByLabel|precisionByLabel|recallByLabel|fMeasureByLabel| logLoss|hammingLoss

In [None]:
# Evaluate the f1-score of the model on the fitted training data
train_metric = evaluator.evaluate(fitted_train_data)
print("Training F1-Score = %g" % train_metric)

# Evaluate the f1-score of the model on the fitted test data
test_metric = evaluator.evaluate(fitted_test_data)
print("Test F1-Score = %g" % test_metric)

Training F1-Score = 0.744442
Test F1-Score = 0.734919


#### `Step 6`: Tune the model hyperparameters

Now we perform hyperparameter tuning for a `RandomForestClassifier` model using Ml Lib `TrainValidationSplit`. The goal is to find the best combination of hyperparameters that results in the highest model performance.

In [None]:
#CMD may take up to 20 min

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

# Define parameter grid to search over
paramGrid = (
    ParamGridBuilder()
    .addGrid(rf.numTrees, [10])
    .addGrid(rf.maxDepth, [2, 4])
    .addGrid(rf.maxBins, [4, 8])
    .build()
)

# Define TrainValidationSplit with 75% of the data for training and 25% for validation
tvs = TrainValidationSplit(estimator=pipe, estimatorParamMaps=paramGrid, evaluator=evaluator, trainRatio=0.75)

# Fit TrainValidationSplit to training data
tvsModel = tvs.fit(train_data)

In [None]:
# Retrieve the validation metrics for each combination of hyperparameters.
tvsModel.validationMetrics

Out[15]: [0.6439457338153165,
 0.6439457338153165,
 0.7301348374305254,
 0.7187422068901542]

In [None]:
# Get the best model from the TrainValidationSplit instance.
best_model = tvsModel.bestModel

In [None]:
# Extract the RandomForestClassifier model from the best model's pipeline stages.
# stages is an attribute of best_model that represents the pipeline stages of the model.
# [-1] is an index that retrieves the last stage of the pipeline.
rf_model = best_model.stages[-1] 

#  A pipeline is a sequence of stages that are executed in a specific order to process and transform data. Each stage in the pipeline represents a data transformation or a machine learning algorithm. The last stage of the pipeline [-1] refers to the final stage in the sequence of stages defined in the pipeline. It is the stage that is executed last when processing data. In the given code, best_model.stages[-1] retrieves the last stage of the pipeline.

# Get the model's parameters and create a dictionary mapping parameter names to values.
params = rf_model.extractParamMap()
param_names = [ k.name for k in params.keys() ]
param_values = [ v for v in params.values() ]
params_dict = dict(zip(param_names, param_values))

params_dict

Out[17]: {'bootstrap': True,
 'cacheNodeIds': False,
 'checkpointInterval': 10,
 'featureSubsetStrategy': 'auto',
 'featuresCol': 'input_features',
 'impurity': 'gini',
 'labelCol': 'y_status',
 'leafCol': '',
 'maxBins': 4,
 'maxDepth': 4,
 'maxMemoryInMB': 256,
 'minInfoGain': 0.0,
 'minInstancesPerNode': 1,
 'minWeightFractionPerNode': 0.0,
 'numTrees': 10,
 'predictionCol': 'prediction',
 'probabilityCol': 'probability',
 'rawPredictionCol': 'rawPrediction',
 'seed': -5387697053847413545,
 'subsamplingRate': 1.0}

In [None]:
# Evaluate the best model on the test data using the evaluator.
evaluator.evaluate(best_model.transform(test_data))

Out[18]: 0.7340688060387152

#### `Step 7`: Track experiments with ML Flow

The code below helps us find the best combination of hyperparameters for the RandomForestClassifier by performing a manual grid search and cross-validation. The results are logged using MLflow, which allows us to analyze and compare the performance of different models easily.

MLflow is an open source platform for managing the end-to-end machine learning lifecycle. MLflow supports tracking for machine learning model tuning in Python, R, and Scala.

In [None]:
!python3 -m pip freeze --disable-pip-version-check --exclude-editable --no-cache-dir > requirements.txt

# python3 -m pip: Invokes the Python package installer (pip) associated with Python 3.
# freeze: The freeze command displays a list of installed packages and their versions.
# --disable-pip-version-check: Disables the check for the latest version of pip.
# --exclude-editable: Excludes packages installed in editable mode (editable installations are usually used for development purposes).
# --no-cache-dir: Skips caching when retrieving package metadata.
# > requirements.txt: Redirects the output of the pip freeze command to a file named requirements.txt.
# In summary, this code generates a file named requirements.txt containing a list of installed Python packages and their versions, which is commonly used for documenting project dependencies and facilitating reproducibility.

In order to have more control over mlflow logging, we will create a manual cross validation training loop.  
Let's start by defining the parameters grid and the cv folds:

In [None]:
from functools import reduce
import numpy as np
import pandas as pd

# Define the parameter grid for maxDepth and numBins
max_depths = [2, 4, 8]
num_bins = [2, 4, 8]

# Create a pandas DataFrame with the parameter combinations
param_grid = pd.DataFrame([(depth, bins) for depth in max_depths for bins in num_bins], columns=['max_depth', 'num_bins'])

# Perform manual 3-fold cross-validation
num_folds = 3
probs = [0.33, 0.33, 0.33] 
train_data_to_split = train_data.cache()
splits = train_data_to_split.randomSplit(probs, seed=42)



Now we set the `mlflow` experiment.  
Experiments let you visualize, search for, and compare runs (each time the model is trained inside the tuning loop), as well as download run artifacts and metadata for analysis in other tools. An MLflow run corresponds to a single execution of model code. It's highly recommended to organize training runs with MLflow experiments.

In [None]:
import mlflow

experiment_name = "/Shared/manual_cv_rf_pipeline_2"
mlflow.set_experiment(experiment_name)

Out[21]: <Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/2402731393051134', creation_time=1684771975240, experiment_id='2402731393051134', last_update_time=1684805566017, lifecycle_stage='active', name='/Shared/manual_cv_rf_pipeline_2', tags={'mlflow.experiment.sourceName': '/Shared/manual_cv_rf_pipeline_2',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': '50123@novasbe.pt',
 'mlflow.ownerId': '3991991021770629'}>

Now we go through the tuning loop:

In [None]:
with mlflow.start_run():
    active_run = mlflow.active_run()
    expId = active_run.info.experiment_id
    print(f"Exp ID: {expId}")
    
    for _, row in param_grid.iterrows():
        depth = row['max_depth']
        bins = row['num_bins']
        
        # Set the parameters for the RandomForestClassifier
        pipe.getStages()[-1].setParams(maxDepth=depth, maxBins=bins)

        # Initialize the metrics for x-val
        metrics = []

        # Log the parameters and evaluation metrics using MLflow
        with mlflow.start_run(nested=True): 
            
            # Perform k-fold cross-validation
            for i in range(num_folds):
                # Split the data into training and test sets
                validation_df = splits[i]
                train_splits = splits.copy()
                train_splits.pop(i)
                train_df = reduce(lambda x,y: x.union(y), train_splits)

                # Train the model
                model = pipe.fit(train_df)

                # Make predictions on the test set
                predictions = model.transform(validation_df)

                # Evaluate the model
                metric = evaluator.evaluate(predictions)
                metrics.append(metric)
            
            # Calculate the average accuracy across the folds
            avg_metric = np.mean(metrics)
        
            mlflow.log_param("max_depth", depth)
            mlflow.log_param("num_bins", bins)
            mlflow.log_metric("avg_accuracy", avg_metric)

            print(f"MaxDepth: {depth}, NumBins: {bins}, avgMetric: {avg_metric}")
            mlflow.spark.log_model(model, "model", pip_requirements="requirements.txt") # passing the requirements speed up the log_model process

train_data_to_split.unpersist()

**An important disclaimer:**
It's not recommended to run a manual parameter search like this. We performed it so you can understand how tracking experiments with `mlflow` works. There are very performatic framework that are integrated with mlflow logging, like `hyperopt` or `ray-tune`. If you want to tune your models like a pro, you should go for these frameworks.

To view the MLflow experiment associated with the notebook, click the **Experiment** icon in the notebook context bar on the upper right. All notebook runs appear in the sidebar. To more easily compare their results, click the icon at the far right of Experiment Runs (it shows "View Experiment UI" when you hover over it). The Experiment page appears.

But you can also access experiments data programatically:

In [None]:
#Read data from an MLflow experiment using Apache Spark. The spark object represents the SparkSession, and expId is the ID of the MLflow experiment we want to load. The data is loaded into a DataFrame called metrics_df.
metrics_df = spark.read.format("mlflow-experiment").load(expId)

metrics_df = (
    metrics_df
    #The select operation is used to select specific columns from the DataFrame. We are extracting the following columns:
    #- tags['mlflow.runName'] is accessed using the col function (f.col()) and then aliased as 'run_name'.
    #- run_id column is renamed to 'run_id'.
    #- params.max_depth column is renamed to 'max_depth'.
    #- params.num_bins column is renamed to 'num_bins'.
    #- metrics.avg_accuracy column is renamed to 'f1'.
      .select(f.col('tags')['mlflow.runName'].alias('run_name'), f.col('run_id').alias('run_id'),f.col('params.max_depth').alias('max_depth'), f.col('params.num_bins').alias('num_bins'), f.col('metrics.avg_accuracy').alias('f1'))
    .filter(f.col('f1').isNotNull()) # filter out rows where the 'f1' column is null. This removes any rows where the 'f1' column does not have a value.
)

display(metrics_df)

In [None]:
# Inference after loading the logged model

model_uri = "runs:/{}/model".format(run_id) 
loaded_model = mlflow.spark.load_model(model_uri)

In [None]:
loaded_model

#### `EXTRA`: Add custom transformers to the pipeline stages

To add a custom transform to the pipeline stages in PySpark MLlib, you can define a Python function that performs the data manipulation you want to encapsulate. Then, you can wrap this function in a PySpark Transformer object and add it to the pipeline stages.  

In the example below, custom_transform is a Python function that performs some data manipulation on a PySpark DataFrame, and CustomTransformer is a PySpark Transformer object that wraps this function. The Pipeline object is then created with the custom transformer and a VectorAssembler stage to create a feature vector. Finally, the pipeline is fit to some input data, and then applied to new data using the transform method.

In [None]:
from pyspark.ml import Pipeline, Transformer

# Define a PySpark Transformer object that wraps the custom_transform function
class GenerateFeaturesAndLabels(Transformer):
    def __init__(self):
        super(GenerateFeaturesAndLabels, self).__init__()

    def _transform(self, df):
        df = df.transform(generate_features_and_labels)
        return df
    
gen_feat_label = GenerateFeaturesAndLabels()

In [None]:
pipe = Pipeline()
pipe.setStages(
    [
        gen_feat_label,
        impute,
        assemble,
        scale,
        index,
        one_hot,
        final_assemble,
        rf
    ]
)

In [None]:
# Fit the pipeline to your input data
pipe_model = pipe.fit(orders_df)

# Apply the pipeline to new data
fitted_train_data = pipe_model.transform(orders_df)

fitted_train_data.display()