
# Build a ML model on Titanic dataset & Apply MLFlow for model tracking

**Source:** [scikit-learn.org](https://scikit-learn.org/stable/auto_examples/compose/plot_column_transformer_mixed_types.html)

This example illustrates how to apply different preprocessing and feature
extraction pipelines to different subsets of features, using
:class:`~compose.ColumnTransformer`. This is particularly handy for the
case of datasets that contain heterogeneous data types, since we may want to
scale the numeric features and one-hot encode the categorical ones.

In this example, the numeric data is standard-scaled after mean-imputation. The
categorical data is one-hot encoded via ``OneHotEncoder``, which
creates a new category for missing values. We further reduce the dimensionality
by selecting categories using a chi-squared test.

In addition, we show two different ways to dispatch the columns to the
particular pre-processor: by column names and by column data types.

Finally, the preprocessing pipeline is integrated in a full prediction pipeline
using :class:`~pipeline.Pipeline`, together with a simple classification
model.


In [1]:
# Author: Pedro Morales <part.morales@gmail.com>
#
# License: BSD 3 clause

## Libraries

In [2]:
import numpy as np
import time
import logging

from sklearn.compose import ColumnTransformer
from sklearn.datasets import fetch_openml
from sklearn.feature_selection import SelectPercentile, chi2
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, StratifiedKFold, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

import mlflow
import mlflow.sklearn
from mlflow import MlflowClient
from pprint import pprint

np.random.seed(0)

## Dataset

Load data from https://www.openml.org/d/40945



In [3]:
X, y = fetch_openml("titanic", version=1, as_frame=True, return_X_y=True)

# Alternatively X and y can be obtained directly from the frame attribute:
# X = titanic.frame.drop('survived', axis=1)
# y = titanic.frame['survived']

## Preprocessing pipeline with ColumnTransformer

Use ``ColumnTransformer`` by selecting column by names

We will train our classifier with the following features:

Numeric Features:

* ``age``: float;
* ``fare``: float.

Categorical Features:

* ``embarked``: categories encoded as strings ``{'C', 'S', 'Q'}``;
* ``sex``: categories encoded as strings ``{'female', 'male'}``;
* ``pclass``: ordinal integers ``{1, 2, 3}``.

We create the preprocessing pipelines for both numeric and categorical data.
Note that ``pclass`` could either be treated as a categorical or numeric
feature.



In [4]:
numeric_features = ["age", "fare"]
numeric_transformer = Pipeline(
    steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
)

categorical_features = ["embarked", "sex", "pclass"]
categorical_transformer = Pipeline(
    steps=[
        ("encoder", OneHotEncoder(handle_unknown="ignore")),
        ("selector", SelectPercentile(chi2, percentile=50)),
    ]
)
preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ]
)

## Prediction pipeline (full pipeline)

Append classifier to preprocessing pipeline.
Now we have a full prediction pipeline.



In [5]:
clf = Pipeline(
    steps=[("preprocessor", preprocessor), ("classifier", LogisticRegression())]
)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

clf.fit(X_train, y_train)
print("model score: %.3f" % clf.score(X_test, y_test))

model score: 0.798


HTML representation of ``Pipeline`` (display diagram)

When the ``Pipeline`` is printed out in a jupyter notebook an HTML
representation of the estimator is displayed:



In [6]:
clf

## Prediction pipeline with auto selection of columns

Use ``ColumnTransformer`` by selecting column by data types

When dealing with a cleaned dataset, the preprocessing can be automatic by
using the data types of the column to decide whether to treat a column as a
numerical or categorical feature.
:func:`sklearn.compose.make_column_selector` gives this possibility.
First, let's only select a subset of columns to simplify our
example.



In [6]:
subset_feature = ["embarked", "sex", "pclass", "age", "fare"]
X_train, X_test = X_train[subset_feature], X_test[subset_feature]

Then, we introspect the information regarding each column data type.



In [7]:
X_train.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1047 entries, 1118 to 684
Data columns (total 5 columns):
 #   Column    Non-Null Count  Dtype   
---  ------    --------------  -----   
 0   embarked  1045 non-null   category
 1   sex       1047 non-null   category
 2   pclass    1047 non-null   int64   
 3   age       841 non-null    float64 
 4   fare      1046 non-null   float64 
dtypes: category(2), float64(2), int64(1)
memory usage: 35.0 KB


We can observe that the `embarked` and `sex` columns were tagged as
`category` columns when loading the data with ``fetch_openml``. Therefore, we
can use this information to dispatch the categorical columns to the
``categorical_transformer`` and the remaining columns to the
``numerical_transformer``.



<div class="alert alert-info"><h4>Note</h4><p>In practice, you will have to handle yourself the column data type.
   If you want some columns to be considered as `category`, you will have to
   convert them into categorical columns. If you are using pandas, you can
   refer to their documentation regarding [Categorical data](https://pandas.pydata.org/pandas-docs/stable/user_guide/categorical.html).</p></div>



In [7]:
from sklearn.compose import make_column_selector as selector

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, selector(dtype_exclude="category")),
        ("cat", categorical_transformer, selector(dtype_include="category")),
    ]
)
clf = Pipeline(
    steps=[("preprocessor", preprocessor), ("classifier", LogisticRegression())]
)


clf.fit(X_train, y_train)
print("model score: %.3f" % clf.score(X_test, y_test))
clf

model score: 0.798


The resulting score is not exactly the same as the one from the previous
pipeline because the dtype-based selector treats the ``pclass`` column as
a numeric feature instead of a categorical feature as previously:



In [8]:
selector(dtype_exclude="category")(X_train)

['pclass', 'age', 'fare']

In [9]:
selector(dtype_include="category")(X_train)

['embarked', 'sex']

## Cross Validation & Hyperparameters with GridSearchCV

Using the prediction pipeline in a grid search

Grid search can also be performed on the different preprocessing steps
defined in the ``ColumnTransformer`` object, together with the classifier's
hyperparameters as part of the ``Pipeline``.
We will search for both the imputer strategy of the numeric preprocessing
and the regularization parameter of the logistic regression using
:class:`~sklearn.model_selection.RandomizedSearchCV`. This
hyperparameter search randomly selects a fixed number of parameter
settings configured by `n_iter`. Alternatively, one can use
:class:`~sklearn.model_selection.GridSearchCV` but the cartesian product of
the parameter space will be evaluated.



In [10]:
param_grid = {
    "preprocessor__num__imputer__strategy": ["mean", "median"],
    "preprocessor__cat__selector__percentile": [10, 30, 50, 70],
    "classifier__C": [0.1, 1.0, 10, 100],
}

search_cv = GridSearchCV(clf, param_grid, return_train_score=True)
search_cv

## Track best model within the CV process

Please run the mlflow server first. In the terminal : `mlflow ui`.

Calling 'fit' triggers the cross-validated search for the best
hyper-parameters combination:




In [13]:
# Initialize MLflow tracking
mlflow.set_tracking_uri("http://127.0.0.1:5000")

# Define experiment name
mlflow.set_experiment("titanic_grid_search_experiment_best_model")

# Start MLflow run
with mlflow.start_run():
    # Perform grid search cross-validation
    search_cv = GridSearchCV(clf, param_grid)
    search_cv.fit(X_train, y_train)

    # Log parameters, metrics, and best model to MLflow
    mlflow.log_params(search_cv.best_params_)
    mlflow.log_metric("mean_test_score", search_cv.best_score_)

    # Evaluate on test set
    test_score = search_cv.score(X_test, y_test)
    mlflow.log_metric("test_score", test_score)

    # Log the model
    mlflow.sklearn.log_model(search_cv.best_estimator_, "LogisticRegression")



## Track all results within the CV process

In [None]:
# Initialize MLflow tracking
mlflow.set_tracking_uri("http://127.0.0.1:5000")

# Define experiment name
mlflow.set_experiment("titanic_grid_search_experiment_all_folds")

# Parameters
param_grid = {
    "preprocessor__num__imputer__strategy": ["mean", "median"],
    "preprocessor__cat__selector__percentile": [10],#, 30, 50, 70],
    "classifier__C": [0.1]#, 1.0, 10, 100],
}

# Start an MLflow run to log the overall grid search
with mlflow.start_run(run_name="Grid Search with All Folds"):
    # Perform grid search cross-validation
    search_cv = GridSearchCV(clf, param_grid, return_train_score=True)
    search_cv.fit(X_train, y_train)
    cv_results = search_cv.cv_results_

    for i in range(len(cv_results['params'])):
        params = cv_results['params'][i]
        mean_test_score = cv_results['mean_test_score'][i]
        std_test_score = cv_results['std_test_score'][i]
        mean_train_score = cv_results['mean_train_score'][i]
        std_train_score = cv_results['std_train_score'][i]

        for fold in range(search_cv.n_splits_):
            fold_test_score = cv_results[f'split{fold}_test_score'][i]
            fold_train_score = cv_results[f'split{fold}_train_score'][i]
            
            # Log each fold result in a nested run
            with mlflow.start_run(nested=True, run_name=f"Fold {fold + 1}"):
                mlflow.log_params(params)
                mlflow.log_metric("fold_test_score", fold_test_score)
                mlflow.log_metric("fold_train_score", fold_train_score)

        # Log mean and std metrics for the current parameter combination
        mlflow.log_params(params)
        mlflow.log_metric("mean_test_score", mean_test_score)
        mlflow.log_metric("std_test_score", std_test_score)
        mlflow.log_metric("mean_train_score", mean_train_score)
        mlflow.log_metric("std_train_score", std_train_score)

    # Log the best model
    mlflow.sklearn.log_model(search_cv.best_estimator_, "best_model")
    mlflow.log_params(search_cv.best_params_)
    mlflow.log_metric("best_mean_test_score", search_cv.best_score_)

    # Evaluate the best model on the test set
    test_score = search_cv.score(X_test, y_test)
    mlflow.log_metric("test_score", test_score)



## Track all results within the CV process - Server error handling

With MLFlow server error handling, due to (sometime ...) internal server errors.

In [14]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize MLflow tracking
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("titanic_grid_search_experiment_all_folds")


param_grid = {
    "preprocessor__num__imputer__strategy": ["mean", "median"],
    "preprocessor__cat__selector__percentile": [10],#, 30, 50, 70],
    "classifier__C": [0.1]#, 1.0, 10, 100],
}

# Perform grid search cross-validation
search_cv = GridSearchCV(clf, param_grid, return_train_score=True)
search_cv.fit(X_train, y_train)

# Start an MLflow run to log the overall grid search
with mlflow.start_run(run_name="Grid Search with All Folds"):

    # Log the grid search results
    cv_results = search_cv.cv_results_

    for i in range(len(cv_results['params'])):
        params = cv_results['params'][i]
        mean_test_score = cv_results['mean_test_score'][i]
        std_test_score = cv_results['std_test_score'][i]
        mean_train_score = cv_results['mean_train_score'][i]
        std_train_score = cv_results['std_train_score'][i]

        try:
            # Log each fold's test and train scores        
            for fold in range(search_cv.n_splits_):
                fold_test_score = cv_results[f'split{fold}_test_score'][i]
                fold_train_score = cv_results[f'split{fold}_train_score'][i]
                
                # Log each fold result in a nested run
                with mlflow.start_run(nested=True, run_name=f"Fold {fold + 1}"):
                    #mlflow.log_params(params)
                    mlflow.log_metric("fold_test_score", fold_test_score)
                    mlflow.log_metric("fold_train_score", fold_train_score)
                    
                    # Add a sleep time to reduce load on the MLflow server
                    time.sleep(0.5)  # Sleep for 0.5 second
        
            # Log mean and std metrics for the current parameter combination
            mlflow.log_params(params)
            mlflow.log_metric("mean_test_score", mean_test_score)
            mlflow.log_metric("std_test_score", std_test_score)
            mlflow.log_metric("mean_train_score", mean_train_score)
            mlflow.log_metric("std_train_score", std_train_score)
            
            # Add a sleep time to reduce load on the MLflow server
            time.sleep(0.5)  # Sleep for 0.5 second
    
        except Exception as e:
            logger.error(f"Error logging parameters and metrics for params {params}: {str(e)}")
            continue    
    
    # Log the best model
    try:
        mlflow.sklearn.log_model(search_cv.best_estimator_, "best_model")
        mlflow.log_params(search_cv.best_params_)
        mlflow.log_metric("best_mean_test_score", search_cv.best_score_)

        # Evaluate the best model on the test set
        test_score = search_cv.score(X_test, y_test)
        mlflow.log_metric("test_score", test_score)
        
    except Exception as e:
        logger.error(f"Error logging the best model and its metrics: {str(e)}")


ERROR:__main__:Error logging parameters and metrics for params {'classifier__C': 0.1, 'preprocessor__cat__selector__percentile': 10, 'preprocessor__num__imputer__strategy': 'median'}: API request to http://127.0.0.1:5000/api/2.0/mlflow/runs/log-batch failed with exception HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /api/2.0/mlflow/runs/log-batch (Caused by ResponseError('too many 500 error responses'))
ERROR:__main__:Error logging the best model and its metrics: API request to http://127.0.0.1:5000/api/2.0/mlflow/runs/log-batch failed with exception HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /api/2.0/mlflow/runs/log-batch (Caused by ResponseError('too many 500 error responses'))
