# **05 - MLModelEvaluation**

## Objectives

* Train a machine learning regression model to predict the sale price of inherited houses and other properties in the region.

## Inputs

* outputs/datasets/cleaned/HousePricesCleaned.csv

## Outputs

* docs/plots/regression_performance.png
* outputs/ml_pipeline/predict_price/v1/X_train.csv
* outputs/ml_pipeline/predict_price/v1/y_train.csv
* outputs/ml_pipeline/predict_price/v1/X_test.csv
* outputs/ml_pipeline/predict_price/v1/y_test.csv
* outputs/ml_pipeline/predict_price/v1/regression_pipeline.pkl
* outputs/ml_pipeline/predict_price/v1/features_importance.png
* docs/plots/features_importance.png

## Additional Comments

* This notebook deals with Business Requirement 2: Regression Analysis for Price Prediction


---

# Change working directory

* We are assuming you will store the notebooks in a subfolder, therefore when running the notebook in the editor, you will need to change the working directory

We need to change the working directory from its current folder to its parent folder
* We access the current directory with os.getcwd()

In [None]:
import os
current_dir = os.getcwd()
current_dir

We want to make the parent of the current directory the new current directory
* os.path.dirname() gets the parent directory
* os.chir() defines the new current directory

In [None]:
os.chdir(os.path.dirname(current_dir))
print("You set a new current directory")

Confirm the new current directory

In [None]:
current_dir = os.getcwd()
current_dir

Imports the Numpy and Pandas library and reads CSV file HousePricesRecords.csv into DataFrame df and displays the first 10 rows.

In [None]:
import numpy as np
import pandas as pd
df = pd.read_csv("outputs/datasets/collection/HousePricesRecords.csv")

print(df.shape)
df.head(10)

Imports libraries for building a machine learning regression pipeline, incorporating feature engineering, preprocessing, and modeling.

In [5]:
from sklearn.pipeline import Pipeline

from feature_engine.imputation import MeanMedianImputer
from feature_engine.selection import DropFeatures
from feature_engine.imputation import CategoricalImputer

from feature_engine import creation
from feature_engine.encoding import OrdinalEncoder
from feature_engine.selection import SmartCorrelatedSelection
from feature_engine import transformation as vt
from feature_engine.outliers import Winsorizer

from sklearn.preprocessing import StandardScaler

from sklearn.feature_selection import SelectFromModel

from sklearn.tree import DecisionTreeRegressor
from xgboost import XGBRegressor
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import AdaBoostRegressor
from sklearn.ensemble import ExtraTreesRegressor

Drops irrelevant features using DropFeatures, imputes missing values using MeanMedianImputer, encodes categorical variables with OrdinalEncoder, applies log and power transformations using LogTransformer and PowerTransformer, handles outliers with Winsorizer, removes highly correlated features using SmartCorrelatedSelection, standardizes data with StandardScaler, selects important features using SelectFromModel, and trains the specified machine learning model.

In [6]:
def PipelineOptimization(model):
  pipeline_base = Pipeline([
     ( 'drop',  DropFeatures(features_to_drop=['EnclosedPorch', 'WoodDeckSF', 'GarageFinish', 'BsmtFinType1', 'BsmtExposure', 'GarageYrBlt'])),

     ( 'mean',  MeanMedianImputer(imputation_method='mean',
                                     variables=['LotFrontage', 'BedroomAbvGr']) ),

     ( 'median',  MeanMedianImputer(imputation_method='median',
                                     variables=['2ndFlrSF', 'MasVnrArea']) ),
   
    ("OrdinalCategoricalEncoder",OrdinalEncoder(encoding_method='arbitrary', 
                                                  variables = ['KitchenQual'])),
    
    ('lt', vt.LogTransformer(variables = ['GrLivArea', 'LotArea', 'LotFrontage']) ),

    ('pt', vt.PowerTransformer(variables = ['GarageArea', 'MasVnrArea', 'OpenPorchSF', 'TotalBsmtSF', '1stFlrSF', '2ndFlrSF']) ),
       
      
    ("Winsoriser_iqr",Winsorizer(capping_method='iqr', fold=1.5, tail='both', 
                                                  variables=['1stFlrSF',
                                                             '2ndFlrSF',
                                                             'GarageArea',
                                                             'LotArea',
                                                             'LotFrontage',
                                                             'MasVnrArea',
                                                             'OpenPorchSF',
                                                             'TotalBsmtSF',
                                                      ])),      
       
    ("SmartCorrelatedSelection",SmartCorrelatedSelection(variables= None,
       method="spearman", threshold=0.8,selection_method="variance") ),

    ("feat_scaling", StandardScaler() ),

    ("feat_selection",  SelectFromModel(model) ),

    ("model", model ),
    ])

  return pipeline_base

Defines HyperparameterOptimizationSearch for performing hyperparameter tuning using GridSearchCV. Initializes with models, hyperparameter grids, and a dictionary to store grid search results. Implements fit to iterate over models, apply PipelineOptimization, and run GridSearchCV with cross-validation, parallel processing, and optional scoring. Stores fitted grid searches in self.grid_searches. Defines score_summary to compile results by extracting cross-validation scores, computing statistics (min, max, mean, std), and formatting results into a sorted DataFrame. Returns the summary DataFrame and grid search results.

In [7]:
from sklearn.model_selection import GridSearchCV
class HyperparameterOptimizationSearch:

    def __init__(self, models, params):
        self.models = models
        self.params = params
        self.keys = models.keys()
        self.grid_searches = {}

    def fit(self, X, y, cv, n_jobs, verbose=1, scoring=None, refit=False):
        for key in self.keys:
            print(f"\nRunning GridSearchCV for {key} \n")
            model=  PipelineOptimization(self.models[key])

            params = self.params[key]
            gs = GridSearchCV(model, params, cv=cv, n_jobs=n_jobs, verbose=verbose, scoring=scoring)
            gs.fit(X,y)
            self.grid_searches[key] = gs    

    def score_summary(self, sort_by='mean_score'):
        def row(key, scores, params):
            d = {
                 'estimator': key,
                 'min_score': min(scores),
                 'max_score': max(scores),
                 'mean_score': np.mean(scores),
                 'std_score': np.std(scores),
            }
            return pd.Series({**params,**d})

        rows = []
        for k in self.grid_searches:
            params = self.grid_searches[k].cv_results_['params']
            scores = []
            for i in range(self.grid_searches[k].cv):
                key = "split{}_test_score".format(i)
                r = self.grid_searches[k].cv_results_[key]        
                scores.append(r.reshape(len(params),1))

            all_scores = np.hstack(scores)
            for p, s in zip(params,all_scores):
                rows.append((row(k, s, p)))

        df = pd.concat(rows, axis=1).T.sort_values([sort_by], ascending=False)

        columns = ['estimator', 'min_score', 'mean_score', 'max_score', 'std_score']
        columns = columns + [c for c in df.columns if c not in columns]

        return df[columns], self.grid_searches

Splits the dataset into training and testing sets, with 20% of the data for testing, using train_test_split. The target variable SalePrice is separated from the features. Prints the column names and shapes of the training and testing sets.

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    df.drop(['SalePrice'], axis=1),
    df['SalePrice'],
    test_size=0.2,
    random_state=42
)

print("Columns in X_train:", X_train.columns)
print("Columns in X_test:", X_test.columns)

print("* Train set:", X_train.shape, y_train.shape, "\n* Test set:",  X_test.shape, y_test.shape)

Displays the first 10 rows of both the training set (X_train) and the testing set (X_test) by using the head() function. Preview the data in both sets to verify the feature separation and ensure the split is correct.

In [None]:
print(X_train.head(10))
print(X_test.head(10))

Defines two dictionaries: models_quick_search, which includes seven regression models (LinearRegression, DecisionTreeRegressor, RandomForestRegressor, ExtraTreesRegressor, AdaBoostRegressor, GradientBoostingRegressor, and XGBRegressor), and params_quick_search, which specifies hyperparameter grids for each model, including options for parameters.

In [10]:
models_quick_search = {
    "LinearRegression": LinearRegression(),
    "DecisionTreeRegressor": DecisionTreeRegressor(random_state=42),
    "RandomForestRegressor": RandomForestRegressor(random_state=42),
    "ExtraTreesRegressor": ExtraTreesRegressor(random_state=42),
    "AdaBoostRegressor": AdaBoostRegressor(random_state=42),
    "GradientBoostingRegressor": GradientBoostingRegressor(random_state=42),
    "XGBRegressor": XGBRegressor(random_state=42),
}

params_quick_search = {
    "LinearRegression": {},

    "DecisionTreeRegressor": {
        'model__max_depth': [None, 4, 15],
        'model__min_samples_split': [2, 50],
        'model__min_samples_leaf': [1, 50],
        'model__max_leaf_nodes': [None, 50],
    },

    "RandomForestRegressor": {
        'model__n_estimators': [100, 50, 140],
        'model__max_depth': [None, 4, 15],
        'model__min_samples_split': [2, 50],
        'model__min_samples_leaf': [1, 50],
        'model__max_leaf_nodes': [None, 50],
    },

    "ExtraTreesRegressor": {
        'model__n_estimators': [100, 50, 150],
        'model__max_depth': [None, 3, 15],
        'model__min_samples_split': [2, 50],
        'model__min_samples_leaf': [1, 50],
    },

    "AdaBoostRegressor": {
        'model__n_estimators': [50, 25, 80, 150],
        'model__learning_rate': [1, 0.1, 2],
        'model__loss': ['linear', 'square', 'exponential'],
    },

    "GradientBoostingRegressor": {
        'model__n_estimators': [100, 50, 140],
        'model__learning_rate': [0.1, 0.01, 0.001],
        'model__max_depth': [3, 15, None],
        'model__min_samples_split': [2, 50],
        'model__min_samples_leaf': [1, 50],
        'model__max_leaf_nodes': [None, 50],
    },

    "XGBRegressor": {
        'model__n_estimators': [30, 80, 200],
        'model__max_depth': [None, 3, 15],
        'model__learning_rate': [0.01, 0.1, 0.001],
        'model__gamma': [0, 0.1],
    },
}

Imports the warnings module and suppresses all warnings. Initializes an instance of the HyperparameterOptimizationSearch class with the models_quick_search and params_quick_search dictionaries, which contain the models and their hyperparameter grids. The fit method of HyperparameterOptimizationSearch is called to perform grid search with cross-validation (cv=5) on the training data (X_train and y_train). It uses the R-squared (r2) metric for scoring, runs the search in parallel using all available CPU cores (n_jobs=-1)

In [None]:
import warnings
warnings.filterwarnings("ignore")
os.environ["PYTHONWARNINGS"] = "ignore"

search = HyperparameterOptimizationSearch(models=models_quick_search, params=params_quick_search)
search.fit(X_train, y_train, scoring='r2', n_jobs=-1, cv=5)

Calls the score_summary method from the HyperparameterOptimizationSearch class, passing sort_by='mean_score' to sort the summary DataFrame by the mean score (average performance across cross-validation folds)

In [None]:
grid_search_summary, grid_search_pipelines = search.score_summary(sort_by='mean_score')
grid_search_summary

Defines two dictionaries for performing a hyperparameter search with the ExtraTreesRegressor model.

In [13]:
models_search = {
    "ExtraTreesRegressor": ExtraTreesRegressor(random_state=42),
}

params_search = {
    "ExtraTreesRegressor":{'model__n_estimators': [50,100,150],
        'model__max_depth': [None, 3, 15],
        'model__min_samples_split': [2, 50],
        'model__min_samples_leaf': [1,50],
        },
}

 Performs hyperparameter optimization using the HyperparameterOptimizationSearch method. It searches for the best model and hyperparameters from models_search and params_search.

In [None]:
search = HyperparameterOptimizationSearch(models=models_search, params=params_search)
search.fit(X_train, y_train, scoring = 'r2', n_jobs=-1, cv=5)

Calls the score_summary method from the HyperparameterOptimizationSearch class, passing sort_by='mean_score' to sort the summary DataFrame by the mean score (average performance across cross-validation folds)

In [None]:
grid_search_summary, grid_search_pipelines = search.score_summary(sort_by='mean_score')
grid_search_summary

Selects the best-performing model from the grid_search_summary by accessing the first row and first column.

In [None]:
best_model = grid_search_summary.iloc[0,0]
best_model

Retrieves the best hyperparameters for the model selected through grid search. Acesses the best_params_ attribute of the best model from the grid_search_pipelines dictionary, which contains the tuned parameters that produced the optimal model performance.

In [None]:
best_parameters = grid_search_pipelines[best_model].best_params_
best_parameters

Retrieves the best estimator from the grid search that achieved the best performance during the grid search process.

In [None]:
best_regressor_pipeline = grid_search_pipelines[best_model].best_estimator_
best_regressor_pipeline

Performs feature importance analysis on a regression pipeline. Selects the preprocessing steps from the pipeline, applies them to transform the training data, and extracts the transformed feature names. Identifies selected features if feature selection is applied. Retrieves feature importances if supported by the model, then sorts and displays the most important features. If feature importance is available, it plots a bar chart.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
sns.set_theme(style='whitegrid')

data_cleaning_feat_eng_steps = 9

pipeline_steps = best_regressor_pipeline.steps[:data_cleaning_feat_eng_steps]
temp_pipeline = Pipeline(pipeline_steps)

transformed_data = temp_pipeline.fit_transform(X_train)

def get_feature_names_from_pipeline(pipeline_steps, X_train):
    feature_names = X_train.columns  # Default to original feature names
    
    for name, transformer in pipeline_steps:
        if hasattr(transformer, 'get_feature_names_out'):
            feature_names = transformer.get_feature_names_out()
    
    return feature_names

feature_names = get_feature_names_from_pipeline(best_regressor_pipeline.steps[:data_cleaning_feat_eng_steps], X_train)

transformed_data = pd.DataFrame(transformed_data, columns=feature_names)

if 'feat_selection' in best_regressor_pipeline.named_steps:
    feat_selector = best_regressor_pipeline['feat_selection']
    selected_columns = transformed_data.columns[feat_selector.get_support()]
else:
    selected_columns = transformed_data.columns

if hasattr(best_regressor_pipeline['model'], 'feature_importances_'):
    feature_importances = best_regressor_pipeline['model'].feature_importances_
else:
    print("Model does not have feature importances attribute. Skipping this step.")
    feature_importances = None

if feature_importances is not None:
    df_feature_importance = pd.DataFrame({
        'Feature': selected_columns,
        'Importance': feature_importances
    }).sort_values(by='Importance', ascending=False)

    print(f"* These are the {len(df_feature_importance)} most important features in descending order: \n{df_feature_importance['Feature'].to_list()}")

    df_feature_importance.plot(kind='bar', x='Feature', y='Importance')
    plt.show()
else:
    print("No feature importance available to plot.")

Evaluates performance of a regression model by calculating and visualizing key metrics. regression_performance function evaluates model on both training and test sets, calling regression_evaluation to compute R² score, mean absolute error, mean squared error, and root mean squared error for each dataset. regression_evaluation_plots function generates scatter plots comparing actual values to predicted values for both training and test sets, overlaying a red line representing perfect predictions. Plots are saved as an image and displayed.

In [20]:
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error 
import numpy as np

def regression_performance(X_train, y_train, X_test, y_test,pipeline):
	print("Model Evaluation \n")
	print("* Train Set")
	regression_evaluation(X_train,y_train,pipeline)
	print("* Test Set")
	regression_evaluation(X_test,y_test,pipeline)

def regression_evaluation(X, y, pipeline):
  prediction = pipeline.predict(X)
  print('R2 Score:', round(r2_score(y, prediction), 3))
  print('Mean Absolute Error:', round(mean_absolute_error(y, prediction), 3))
  print('Mean Squared Error:', round(mean_squared_error(y, prediction), 3))
  print('Root Mean Squared Error:', round(np.sqrt(mean_squared_error(y, prediction)), 3))
  print("\n")


def regression_evaluation_plots(X_train, y_train, X_test, y_test,pipeline, alpha_scatter=0.5):
  pred_train = pipeline.predict(X_train)
  pred_test = pipeline.predict(X_test)


  fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(12,6))
  sns.scatterplot(x=y_train, y=pred_train, alpha=alpha_scatter, ax=axes[0])
  sns.lineplot(x=y_train , y=y_train, color='red', ax=axes[0])
  axes[0].set_xlabel("Actual")
  axes[0].set_ylabel("Predictions")
  axes[0].set_title("Train Set")

  sns.scatterplot(x=y_test, y=pred_test, alpha=alpha_scatter, ax=axes[1])
  sns.lineplot(x=y_test, y=y_test, color='red', ax=axes[1])
  axes[1].set_xlabel("Actual")
  axes[1].set_ylabel("Predictions")
  axes[1].set_title("Test Set")
  plt.savefig(f'docs/plots/regression_performance.png', bbox_inches='tight')  
  plt.show()

Evaluates performance of the best regressor pipeline on both training and test sets.

In [None]:
regression_performance(X_train, y_train, X_test, y_test,best_regressor_pipeline)
regression_evaluation_plots(X_train, y_train, X_test, y_test, best_regressor_pipeline)

Pipeline of the best regressor model.

In [None]:
best_regressor_pipeline

Creates a machine learning pipeline for regression that imputes missing values in the TotalBsmtSF feature with the mean, applies a log transformation to GrLivArea, a power transformation to TotalBsmtSF, the IQR method to cap extreme values in TotalBsmtSF and GarageArea, feature scaling with StandardScaler, and applies an ExtraTreesRegressor with hyperparameters.

In [23]:
def PipelineOptimization(model):
    pipeline_base = Pipeline(steps=[  
    ( 'mean',  MeanMedianImputer(imputation_method='mean',
                                     variables=['TotalBsmtSF']) ),
                                     
    ('lt', vt.LogTransformer(variables = ['GrLivArea']) ),

    ('pt', vt.PowerTransformer(variables = ['TotalBsmtSF']) ),
      
    ("Winsoriser_iqr",Winsorizer(capping_method='iqr', fold=1.5, tail='both', 
                                                  variables=['TotalBsmtSF', 'GarageArea']) ),      

    ("feat_scaling", StandardScaler() ),

  ('model', ExtraTreesRegressor(max_depth=15, min_samples_split=50,
                                     n_estimators=150, random_state=42))])        
    return pipeline_base

Prints original column names in the df dataset.

In [None]:
print("Original columns in the dataset:", df.columns)

Checks if columns in best_features are present in both X_train and X_test datasets. Identifies and prints any missing columns. If no columns are missing, filters both datasets to include only best_features and prints shapes and first 5 rows of X_train. Prints a message if any columns are missing.

In [None]:
best_features = ['OverallQual', 'GrLivArea', 'GarageArea', 'YearBuilt', 'TotalBsmtSF']

missing_train_columns = [col for col in best_features if col not in X_train.columns]
missing_test_columns = [col for col in best_features if col not in X_test.columns]

if missing_train_columns:
    print(f"Missing columns in X_train: {missing_train_columns}")

if missing_test_columns:
    print(f"Missing columns in X_test: {missing_test_columns}")

if not missing_train_columns and not missing_test_columns:
    X_train = X_train[best_features]
    X_test = X_test[best_features]

    print("* Train set:", X_train.shape, y_train.shape, "\n* Test set:", X_test.shape, y_test.shape)
    print(X_train.head(5))
else:
    print("Some columns are missing")

Dictionary containing a model selection.

In [None]:
models_search

Dictionary containing a parameter selection.

In [None]:
best_parameters

Defines a set of hyperparameters for tuning an ExtraTreesRegressor model.

In [28]:
params_search = {
    "ExtraTreesRegressor": {
        'model__n_estimators': [50, 100, 150],
        'model__max_depth': [3, 5, 10],
        'model__min_samples_split': [5, 10, 20],
        'model__min_samples_leaf': [5, 10, 20],
    },
}

Performs hyperparameter optimization using HyperparameterOptimizationSearch with models (models_search) and hyperparameters (params_search). Fit method trains the model on X_train and y_train with 5-fold cross-validation (cv=5), evaluating performance using R² scoring (scoring='r2'). Utilizes all available CPU cores (n_jobs=-1) for parallel computation, aiming to find the best hyperparameter combination for ExtraTreesRegressor.

In [None]:
search = HyperparameterOptimizationSearch(models=models_search, params=params_search)
search.fit(X_train, y_train, scoring = 'r2', n_jobs=-1, cv=5)

Retrieves a summary of the grid search results using the score_summary method from the search object, sorting the results by the mean score.

In [None]:
grid_search_summary, grid_search_pipelines = search.score_summary(sort_by='mean_score')
grid_search_summary

Selects best model from grid_search_summary DataFrame by accessing the first row and first column, which corresponds to the model with the highest mean score.

In [None]:
best_model = grid_search_summary.iloc[0,0]
best_model

Retrieves the best regressor pipeline from grid_search_pipelines using the best model identifier.

In [None]:
best_pipeline_regressor = grid_search_pipelines[best_model].best_estimator_
best_pipeline_regressor

Attempts to create a directory structure for saving a machine learning pipeline, defining a path (file_path) for version 'v1' under outputs/ml_pipeline/predict_price/. Uses os.makedirs to create the directory and prints any exception messages if an error occurs.

In [None]:
import joblib
import os

version = 'v1'
file_path = f'outputs/ml_pipeline/predict_price/{version}'

try:
  os.makedirs(name=file_path)
except Exception as e:
  print(e)

Displays the first 10 rows of the X_train dataset.

In [None]:
X_train.head(10)

Saves the X_train and y_train datasets as CSV files in the specified directory (file_path). The X_train data is saved as X_train.csv and the y_train data as y_train.csv, both without including row indices.

In [35]:
X_train.to_csv(f"{file_path}/X_train.csv", index=False)
y_train.to_csv(f"{file_path}/y_train.csv", index=False)

Displays the first 10 rows of the y_test dataset.

In [None]:
y_test.head(10)

Saves the X_test and y_test datasets as CSV files in the specified directory (file_path). The X_test data is saved as X_test.csv and the y_test data as y_test.csv, both without including row indices.

In [37]:
X_test.to_csv(f"{file_path}/X_test.csv", index=False) 
y_test.to_csv(f"{file_path}/y_test.csv", index=False)

Show the steps within the pipeline.

In [None]:
best_pipeline_regressor

Prints the machine learning pipeline preprocessing and modeling steps.

In [None]:
print(best_pipeline_regressor)

Saves best_pipeline_regressor to a file using joblib. It serializes the pipeline object and stores it as regression_pipeline.pkl in a directory defined by file_path.

In [None]:
import joblib
joblib.dump(value=best_pipeline_regressor, filename=f"{file_path}/regression_pipeline.pkl")

Creates a bar plot to visualize the feature importance of a machine learning model using the data stored in df_feature_importance.

In [None]:
df_feature_importance.plot(kind='bar', x='Feature', y='Importance')
plt.show()

Creates a bar plot for feature importance and saves it as images in two specified locations.

In [None]:
df_feature_importance.plot(kind='bar',x='Feature',y='Importance')
plt.savefig(f'{file_path}/features_importance.png', bbox_inches='tight')
plt.savefig(f'docs/plots/features_importance.png', bbox_inches='tight') 

Prints the machine learning pipeline preprocessing and modeling steps.

In [None]:
print(best_pipeline_regressor)

## Conclusions and Next Steps

The regression pipeline is now complete.

The Next steps are to deploy it.