## **Load and prepare data**

In [1]:
# First step is to connect to a data source or importing the data to the LK
# In my case this code block below downloads the data from a remote source and saves it to the LK

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 3, Finished, Available, Finished)

Downloaded demo data files into lakehouse.


In [2]:
'''
After data is downloaded or imported, we can load data into a Spark Dataframe
this code reads the CSV file into a Spark DataFrame, infers the schema, and caches 
it for faster access during subsequent operations
'''

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 4, Finished, Available, Finished)

## **Prepare the data**

In [3]:
'''
First, we define a function to clean the data, which includes dropping rows with missing data, 
removing duplicate rows based on specific columns, and dropping unnecessary columns.

The clean_data function helps ensure the dataset is free of missing values and duplicates 
while removing unnecessary columns.

''' 


# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 5, Finished, Available, Finished)

In [4]:
''' 
Next, we perform feature engineering by creating dummy columns for the 'Geography' and 'Gender' 
columns using one-hot encoding.

Here, we use one-hot encoding to convert categorical columns into binary dummy columns, 
making them suitable for machine learning algorithms. This step is optional in many ML projects, 
but highly recommended if you want to obtain better results in your predictions.

''' 

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 6, Finished, Available, Finished)

In [5]:
# Finally, we display the cleaned and feature-engineered dataset using the display function.

display(df_clean)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 10e65e52-51b3-41d2-94b5-278a74d268d7)

In [6]:
''' 
Now, we will save the cleaned and feature-engineered dataset to the LK.

Here, we take the cleaned and transformed PySpark DataFrame, df_clean, and save it as a 
Delta table named "churn_data_clean" in the LK. 

'''

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 8, Finished, Available, Finished)

Spark dataframe saved to delta table: churn_data_clean


## **Create test and training datasets**

In [7]:
''' 
Next, we will create the test and training datasets from the cleaned and feature-engineered data.

With this code we load a cleaned and feature-engineered dataset from the LK  using Delta format, 
split it into training and testing sets with an 80-20 ratio, and prepare the data for ML. 
This preparation involves importing the VectorAssembler from PySpark ML to combine feature 
columns into a single "features" column. Then, we use the VectorAssembler to transform the 
training and testing datasets, resulting in train_data and test_data DF that contain the target 
variable "Exited" and the feature vectors. These datasets are now ready for use in 
building and evaluating ML models.
'''

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 9, Finished, Available, Finished)

## **Train baseline**

In [8]:
''' 
Using the featurized data (previous step), we'll train a baseline machine learning model, 
configure MLflow for experiment tracking, define a prediction function for metrics calculation, 
and finally, view and log the resulting ROC AUC score.

Using the logging library we suppress unnecessary output from the Synapse.ml library, 
keeping the logs cleaner.
'''


import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 10, Finished, Available, Finished)

## **Configure MLflow**

In [9]:
''' 
With the code below, we configure MLflow for experiment tracking. 
Additionally, we enable automatic logging, ensuring that model parameters, 
metrics, and artifacts are automatically logged to MLflow.
'''


import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 11, Finished, Available, Finished)

2025/04/03 23:28:56 INFO mlflow.tracking.fluent: Experiment with name 'automl_sample' does not exist. Creating a new experiment.
2025/04/03 23:29:08 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml.


## **Train and evaluate the model**

In [10]:
''' 
Training a LightGBMClassifier model on the provided training data. 
The model is configured with the necessary settings for binary classification and imbalance handling. 
Then the trained model is used to make predictions on the test data. 
We extract the predicted probabilities for the positive class and the true labels from the test data. 
Finally, we calculate the ROC AUC score using sklearn's roc_auc_score function.
'''

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 12, Finished, Available, Finished)

2025/04/03 23:29:39 INFO mlflow.tracking.fluent: Autologging successfully enabled for sklearn.


ROC AUC Score: 0.844431602279924


## **Create an AutoML trial with FLAML**

In the next steps, we will be creating an AutoML trial using the FLAML package, configure the trial settings, convert the Spark dataset to a Pandas on Spark dataset, run the AutoML trial, and view the resulting metrics.

In [11]:
'''
Importing the necessary classes and modules from the FLAML package and create an instance of AutoML,
that will be used to automate the ML pipeline.
'''

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 13, Finished, Available, Finished)

2025/04/03 23:30:41 INFO mlflow.tracking.fluent: Autologging successfully enabled for xgboost.
2025/04/03 23:30:44 INFO mlflow.tracking.fluent: Autologging successfully enabled for lightgbm.
2025/04/03 23:31:13 INFO mlflow.tracking.fluent: Autologging successfully enabled for transformers.
2025/04/03 23:31:15 INFO mlflow.tracking.fluent: Autologging successfully enabled for pytorch_lightning.


In [12]:
# Define the configuration settings for the AutoML trial.
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 14, Finished, Available, Finished)

In [13]:
# Convert the Spark training dataset to a Pandas on Spark dataset. 
# This enables FLAML to work with the data efficiently.

df_automl = to_pandas_on_spark(train_data)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 15, Finished, Available, Finished)

In [14]:
'''
Now, we execute the AutoML trial. We use a nested MLflow run to track the experiment within 
the existing MLflow run context. 

While executing this piece of code, the model is going to start iterating with different 
hyperparameters and will show all the possible configurations tested, after it will only show 
the best result found, based on the AUC, ROC metrics
'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 16, Finished, Available, Finished)

[flaml.automl.logger: 04-03 23:31:46] {1791} INFO - task = classification
[flaml.automl.logger: 04-03 23:31:48] {1901} INFO - Minimizing error metric: 1-roc_auc
[flaml.automl.logger: 04-03 23:31:50] {2019} INFO - List of ML learners in AutoML Run: ['lgbm_spark', 'rf_spark']
[flaml.automl.logger: 04-03 23:31:50] {2330} INFO - iteration 0, current learner lgbm_spark
[flaml.automl.logger: 04-03 23:32:01] {2465} INFO - Estimated sufficient time budget=106968s. Estimated necessary time budget=107s.




[flaml.automl.logger: 04-03 23:32:13] {2514} INFO -  at 16.8s,	estimator lgbm_spark's best error=0.1942,	best estimator lgbm_spark's best error=0.1942
[flaml.automl.logger: 04-03 23:32:13] {2330} INFO - iteration 1, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:32:34] {2514} INFO -  at 36.7s,	estimator lgbm_spark's best error=0.1596,	best estimator lgbm_spark's best error=0.1596
[flaml.automl.logger: 04-03 23:32:34] {2330} INFO - iteration 2, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:32:53] {2514} INFO -  at 56.3s,	estimator lgbm_spark's best error=0.1596,	best estimator lgbm_spark's best error=0.1596
[flaml.automl.logger: 04-03 23:32:53] {2330} INFO - iteration 3, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:33:13] {2514} INFO -  at 75.7s,	estimator lgbm_spark's best error=0.1501,	best estimator lgbm_spark's best error=0.1501
[flaml.automl.logger: 04-03 23:33:13] {2330} INFO - iteration 4, current learner rf_spark




[flaml.automl.logger: 04-03 23:33:41] {2514} INFO -  at 98.5s,	estimator rf_spark's best error=0.2390,	best estimator lgbm_spark's best error=0.1501
[flaml.automl.logger: 04-03 23:33:41] {2330} INFO - iteration 5, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:34:00] {2514} INFO -  at 123.3s,	estimator lgbm_spark's best error=0.1501,	best estimator lgbm_spark's best error=0.1501
[flaml.automl.logger: 04-03 23:34:00] {2330} INFO - iteration 6, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:34:19] {2514} INFO -  at 142.0s,	estimator lgbm_spark's best error=0.1482,	best estimator lgbm_spark's best error=0.1482
[flaml.automl.logger: 04-03 23:34:19] {2330} INFO - iteration 7, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:34:38] {2514} INFO -  at 161.2s,	estimator lgbm_spark's best error=0.1394,	best estimator lgbm_spark's best error=0.1394
[flaml.automl.logger: 04-03 23:34:38] {2330} INFO - iteration 8, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:34:57] {2514} INFO -  at 179.9s,	estimator lgbm_spark's best error=0.1394,	best estimator lgbm_spark's best error=0.1394
[flaml.automl.logger: 04-03 23:34:57] {2330} INFO - iteration 9, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:35:16] {2514} INFO -  at 199.3s,	estimator lgbm_spark's best error=0.1394,	best estimator lgbm_spark's best error=0.1394
[flaml.automl.logger: 04-03 23:35:16] {2330} INFO - iteration 10, current learner lgbm_spark




[flaml.automl.logger: 04-03 23:35:34] {2514} INFO -  at 218.1s,	estimator lgbm_spark's best error=0.1394,	best estimator lgbm_spark's best error=0.1394
[flaml.automl.logger: 04-03 23:35:34] {2330} INFO - iteration 11, current learner rf_spark




[flaml.automl.logger: 04-03 23:36:02] {2514} INFO -  at 238.8s,	estimator rf_spark's best error=0.1481,	best estimator lgbm_spark's best error=0.1394
[flaml.automl.logger: 04-03 23:36:02] {567} INFO - logging best model lgbm_spark
[flaml.automl.logger: 04-03 23:36:06] {2757} INFO - retrain lgbm_spark for 0.7s
[flaml.automl.logger: 04-03 23:36:06] {2760} INFO - retrained model: LightGBMClassifier_cd77b677fd44
[flaml.automl.logger: 04-03 23:36:06] {2761} INFO - Auto Feature Engineering pipeline: None
[flaml.automl.logger: 04-03 23:36:06] {2763} INFO - Best MLflow run name: busy_giraffe_pg1fw095_child_7
[flaml.automl.logger: 04-03 23:36:06] {2764} INFO - Best MLflow run id: 9e7d4fce-a7c6-4325-b9d1-2739fa682502


2025/04/03 23:36:21 INFO mlflow.tracking.fluent: Autologging successfully enabled for xgboost.
2025/04/03 23:36:21 INFO mlflow.tracking.fluent: Autologging successfully enabled for sklearn.
2025/04/03 23:36:21 INFO mlflow.tracking.fluent: Autologging successfully enabled for transformers.
2025/04/03 23:36:21 INFO mlflow.tracking.fluent: Autologging successfully enabled for lightgbm.
2025/04/03 23:36:21 INFO mlflow.tracking.fluent: Autologging successfully enabled for pytorch_lightning.
2025/04/03 23:36:21 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml.


In [15]:
# Retrieve and display the best hyperparameter configuration and metrics

print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 17, Finished, Available, Finished)

Best hyperparameter config: {'numIterations': 9, 'numLeaves': 6, 'minDataInLeaf': 22, 'learningRate': 0.24006330308062684, 'log_max_bin': 8, 'featureFraction': 0.7837401734553133, 'lambdaL1': 0.001068160626146036, 'lambdaL2': 1.5328844264449084}
Best ROC AUC on validation data: 0.8606
Training duration of the best run: 0.67 s


## **Parallelize your AutoML trial with Apache Spark**

In scenarios where your dataset can fit into a single node and you want to leverage the power of Spark for running multiple parallel AutoML trials simultaneously, you can follow these steps:


In [16]:
# To enable parallelization, your data must first be converted into a Pandas DF. 
pandas_df = train_raw.toPandas()

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 18, Finished, Available, Finished)

In [17]:
# Configure parallelization settings
# Set use_spark to True to enable Spark-based parallelism. By default, FLAML will launch 
# one trial per executor. You can customize the number of concurrent trials. 

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name



StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 19, Finished, Available, Finished)

In [18]:
# Run the AutoML trial in parallel

# Now, we will run the AutoML trial in parallel with the specified settings. 
# We will use a nested MLflow run to track the experiment within the existing MLflow run context.

with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 20, Finished, Available, Finished)

[I 2025-04-03 23:38:03,001] A new study created in memory with name: optuna






































2025/04/03 23:46:55 INFO mlflow.tracking.fluent: Autologging successfully enabled for xgboost.
2025/04/03 23:46:55 INFO mlflow.tracking.fluent: Autologging successfully enabled for sklearn.
2025/04/03 23:46:55 INFO mlflow.tracking.fluent: Autologging successfully enabled for transformers.
2025/04/03 23:46:55 INFO mlflow.tracking.fluent: Autologging successfully enabled for lightgbm.
2025/04/03 23:46:55 INFO mlflow.tracking.fluent: Autologging successfully enabled for pytorch_lightning.
2025/04/03 23:46:55 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml.


In [19]:
'''
After running the parallel AutoML trial, retrieve and display the results, 
including the best hyperparameter configuration, ROC AUC on the validation data
'''


print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))

StatementMeta(, ea1395cb-411d-45df-bcaa-4dce1aafb1ee, 21, Finished, Available, Finished)

Best hyperparmeter config: {'early_stopping_rounds': 14, 'learning_rate': 0.07562124206042799, 'n_estimators': 8192}
Best roc_auc on validation data: 0.8738
Training duration of best run: 0.8615 s
