# Setup 

The code is based on Agerons code at github https://github.com/ageron/handson-ml3

In [15]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

# Common imports
import numpy as np
import os

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "end_to_end_project" 
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

Variables for path for download, path for saving file and function for fetching the data 

In [16]:
import os
import tarfile
import urllib.request

DOWNLOAD_ROOT = "https://raw.githubusercontent.com/ageron/handson-ml2/master/"
HOUSING_PATH = os.path.join("datasets", "housing")
HOUSING_URL = DOWNLOAD_ROOT + "datasets/housing/housing.tgz"

def fetch_housing_data(housing_url=HOUSING_URL, housing_path=HOUSING_PATH):
    if not os.path.isdir(housing_path):
        os.makedirs(housing_path)
    tgz_path = os.path.join(housing_path, "housing.tgz")
    urllib.request.urlretrieve(housing_url, tgz_path)
    housing_tgz = tarfile.open(tgz_path)
    housing_tgz.extractall(path=housing_path)
    housing_tgz.close()

Call the function 

In [17]:
fetch_housing_data()

  housing_tgz.extractall(path=housing_path)


Creates a "load to pandas DataFrame" function and call it

In [18]:
import pandas as pd

def load_housing_data(housing_path=HOUSING_PATH):
    csv_path = os.path.join(housing_path, "housing.csv")
    return pd.read_csv(csv_path)
housing = load_housing_data()

Test and train sets

In [19]:
from sklearn.model_selection import train_test_split
#Splitting the dataset into two datasets, features and labels
housing_features = housing.drop("median_house_value", axis=1)
housing_labels = housing["median_house_value"]

#Create training and test sets (random)
housing_train_features, housing_test_features, housing_train_labels, housing_test_labels = train_test_split(housing_features, housing_labels, test_size=0.2, random_state=42)

# Custom features

In [20]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_array, check_is_fitted
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import rbf_kernel

class ClusterSimilarity(BaseEstimator, TransformerMixin):
    def __init__(self, n_clusters=10, gamma=1.0, random_state=None):
        self.n_clusters = n_clusters
        self.gamma = gamma
        self.random_state = random_state

    def fit(self, X, y=None, sample_weight=None):
        self.kmeans_ = KMeans(self.n_clusters, n_init=10,
                              random_state=self.random_state)
        self.kmeans_.fit(X, sample_weight=sample_weight)
        return self  # always return self!

    def transform(self, X):
        return rbf_kernel(X, self.kmeans_.cluster_centers_, gamma=self.gamma)
    
    def get_feature_names_out(self, names=None):
        return [f"Cluster {i} similarity" for i in range(self.n_clusters)]
    

class StandardScalerClone(BaseEstimator, TransformerMixin):
    def __init__(self, with_mean=True):  # no *args or **kwargs!
        self.with_mean = with_mean

    def fit(self, X, y=None):  # y is required even though we don't use it
        X = check_array(X)  # checks that X is an array with finite float values
        self.mean_ = X.mean(axis=0)
        self.scale_ = X.std(axis=0)
        self.n_features_in_ = X.shape[1]  # every estimator stores this in fit()
        return self  # always return self!

    def transform(self, X):
        check_is_fitted(self)  # looks for learned attributes (with trailing _)
        X = check_array(X)
        assert self.n_features_in_ == X.shape[1]
        if self.with_mean:
            X = X - self.mean_
        return X / self.scale_

Pipelines for data transformation

In [21]:
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, FunctionTransformer, OneHotEncoder
from sklearn.compose import make_column_selector, ColumnTransformer

#Pipline to handle categories, it fills in (impute) null values with the most frequent value and then encodes it with OneHot encoding
cat_pipeline = Pipeline([
    ("impute",SimpleImputer(strategy="most_frequent")),
    ("encoder",OneHotEncoder(handle_unknown="ignore"))
])
def column_ratio(X):
    return X[:, [0]] / X[:, [1]]

def ratio_name(function_transformer, feature_names_in):
    return ["ratio"]  # feature names out

def ratio_pipeline():
    return make_pipeline(
        SimpleImputer(strategy="median"),
        FunctionTransformer(column_ratio, feature_names_out=ratio_name),
        StandardScaler())

log_pipeline = make_pipeline(
    SimpleImputer(strategy="median"),
    FunctionTransformer(np.log, feature_names_out="one-to-one"),
    StandardScaler())
cluster_simil = ClusterSimilarity(n_clusters=10, gamma=1., random_state=42)
default_num_pipeline = make_pipeline(SimpleImputer(strategy="median"),
                                     StandardScaler())

#Pipline to handle categories, it fills in (impute) null values with the most frequent value and then encodes it with OneHot encoding
cat_pipeline = Pipeline([
    ("impute",SimpleImputer(strategy="most_frequent")),
    ("encoder",OneHotEncoder(handle_unknown="ignore"))
])


preprocessing_pipeline = ColumnTransformer([
        ("bedrooms", ratio_pipeline(), ["total_bedrooms", "total_rooms"]),
        ("rooms_per_house", ratio_pipeline(), ["total_rooms", "households"]),
        ("people_per_house", ratio_pipeline(), ["population", "households"]),
        ("log", log_pipeline, ["total_bedrooms", "total_rooms", "population",
                               "households", "median_income"]),
        ("geo", cluster_simil, ["latitude", "longitude"]),
        ("cat", cat_pipeline, make_column_selector(dtype_include=object)),
    ],
    remainder=default_num_pipeline)  # one column remaining: housing_median_age

Pipelines for training the models:

In [22]:
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
#Creates a linear regression pipeline with preprocessing
lin_reg_pipeline = Pipeline([
    ("preprocessing",preprocessing_pipeline), 
    ("linear_regression", LinearRegression())
])

#Creates a decision tree pipeline with preprocessing
decision_tree_reg_pipeline = Pipeline([
    ("preprocessing",preprocessing_pipeline),
    ("decision_tree", DecisionTreeRegressor())
])

random_forest_reg_pipeline = Pipeline([
    ("preprocessing", preprocessing_pipeline),
    ("random_forest_regression", RandomForestRegressor(random_state=42, n_jobs=-1))
])
#Creates a SVM(regression) pipeline

svr_pipeline = Pipeline([
    ("preprocessing", preprocessing_pipeline),
    ("SVR", SVR())
])

Define models to evaluate

In [23]:
models = {
    "Linear Regression": lin_reg_pipeline,
    "Decision Tree" : decision_tree_reg_pipeline,
    "Random Forest": random_forest_reg_pipeline,
    "SVR": svr_pipeline
}

Define a parametergrid

In [24]:
param_grid_search = {
    "Linear Regression": {
        'linear_regression__fit_intercept':[True,False],
        'linear_regression__positive':[True,False]
    },
    "Decision Tree": {
        'decision_tree__max_depth': [5, 10, 15, None],
        'decision_tree__min_samples_split': [2, 10, 20]
    },
    "Random Forest": {
        'random_forest_regression__n_estimators': [50, 100, 200],
        'random_forest_regression__max_depth': [10, 20, 30, None],
        'random_forest_regression__min_samples_split': [2, 5, 10]
    },
    "SVR": {
        'SVR__C': [0.1, 1, 10],
        'SVR__epsilon': [0.01, 0.1, 0.2],
        'SVR__kernel': ['linear', 'rbf']
    }
}

GridSearch for the models defined and store the results in a list

In [None]:
import time
from sklearn.model_selection import GridSearchCV
import pandas as pd

# Initialize an empty dictionary to store reports for each model
models_report_grid_search = {}

# Timing and applying GridSearchCV
for name, model in models.items():
    start_time = time.time()  # Start the timer
    
    grid_search = GridSearchCV(
        estimator=model, 
        param_grid=param_grid_search[name], 
        cv=5, 
        scoring="neg_root_mean_squared_error", 
        return_train_score=True
    )
    
    grid_search.fit(housing_train_features, housing_train_labels)
    
    end_time = time.time()  # End the timer
    elapsed_time = end_time - start_time  # Calculate elapsed time
    
    # Build a report for the current model
    report = {
        "Model": name,
        "Best Hyperparameters": grid_search.best_params_,
        "Best Score (RMSE)": -grid_search.best_score_,  # negate to get positive RMSE
        "Time Taken (s)": elapsed_time,  # Time taken for the grid search
        "Number of samples in training set": housing_train_features.shape[0]
    }

    # Create and sort the testing parameters DataFrame
    testing_params = pd.DataFrame(grid_search.cv_results_)
    testing_params = testing_params.sort_values(by='rank_test_score')
    cols = ['rank_test_score'] + [col for col in testing_params.columns if col != 'rank_test_score']
    testing_params = testing_params[cols]

    # Add the sorted testing parameters to the report
    report['Testing Parameters'] = testing_params
    
    # Append the report to the dictionary
    models_report_grid_search[name] = report



Creates a excel file with the results and stores it in the project folder.

In [12]:
# Convert the dictionary into a DataFrame for the main overview report, excluding "Testing Parameters"
df_grid_search_report = pd.DataFrame([{k: v for k, v in report.items() if k != 'Testing Parameters'} 
                                      for report in models_report_grid_search.values()])

# Write to Excel
with pd.ExcelWriter('model_search_results.xlsx') as writer:
    # Write the main overview sheet without the "Testing Parameters"
    df_grid_search_report.to_excel(writer, sheet_name='Grid Search Results', index=False)
    
    # Write the detailed testing parameters for each model to separate sheets
    for model_name, report in models_report_grid_search.items():
        if "Testing Parameters" in report:
            report["Testing Parameters"].to_excel(writer, sheet_name=f'{model_name}_Parameters', index=False)


RandomizedSearchCV

In [26]:
from scipy.stats import uniform, randint

param_grid_random = {
    "Linear Regression": {
        'linear_regression__fit_intercept': [True, False],
        'linear_regression__positive': [True, False]
    },
    "Decision Tree": {
        'decision_tree__max_depth': randint(5, 16),  # Randomly sample between 5 and 15, inclusive
        'decision_tree__min_samples_split': randint(2, 21)  # Randomly sample between 2 and 20
    },
    "Random Forest": {
        'random_forest_regression__n_estimators': randint(50, 201),  # Randomly sample between 50 and 200
        'random_forest_regression__max_depth': randint(10, 31),  # Randomly sample between 10 and 30
        'random_forest_regression__min_samples_split': randint(2, 11)  # Randomly sample between 2 and 10
    },
    "SVR": {
        'SVR__C': uniform(0.1, 9.9),  # Sample from a continuous uniform distribution between 0.1 and 9.9
        'SVR__epsilon': uniform(0.01, 0.19),  # Sample from a continuous uniform distribution between 0.01 and 0.19
        'SVR__kernel': ['linear', 'rbf']  # For kernel, we still use a list since it's categorical
    }
}

Random search

In [27]:
import time
from sklearn.model_selection import RandomizedSearchCV
import pandas as pd

# Initialize an empty dictionary to store reports for each model
models_report_random_search = {}

# Timing and applying RandomizedSearchCV
for name, model in models.items():
    start_time = time.time()  # Start the timer
    
    random_search = RandomizedSearchCV(
        estimator=model, 
        param_distributions=param_grid_random[name], 
        cv=5, 
        n_iter=10, 
        scoring="neg_root_mean_squared_error", 
        return_train_score=True
    )
    
    random_search.fit(housing_train_features, housing_train_labels)
    
    end_time = time.time()  # End the timer
    elapsed_time = end_time - start_time  # Calculate elapsed time
    
    # Build a report for the current model
    report = {
        "Model": name,
        "Best Hyperparameters": random_search.best_params_,
        "Best Score (RMSE)": -random_search.best_score_,  # negate to get positive RMSE
        "Time Taken (s)": elapsed_time,  # Time taken for the random search
        "Number of samples in training set": housing_train_features.shape[0]
    }

    # Create and sort the testing parameters DataFrame
    testing_params = pd.DataFrame(random_search.cv_results_)
    testing_params = testing_params.sort_values(by='rank_test_score')
    cols = ['rank_test_score'] + [col for col in testing_params.columns if col != 'rank_test_score']
    testing_params = testing_params[cols]

    # Add the sorted testing parameters to the report
    report['Testing Parameters'] = testing_params
    
    # Append the report to the dictionary
    models_report_random_search[name] = report



Stores the results in a excel file

In [33]:

# Convert the dictionary into a DataFrame for the main overview report
df_random_search_report = pd.DataFrame([{k: v for k, v in report.items() if k != 'Testing Parameters'} 
                                         for report in models_report_random_search.values()])

# Write to Excel
with pd.ExcelWriter('random_search_results.xlsx') as writer:
    # Write the main overview sheet without the "Testing Parameters"
    df_random_search_report.to_excel(writer, sheet_name='Random Search Results', index=False)
    
    # Write the detailed testing parameters for each model to separate sheets
    for model_name, report in models_report_random_search.items():
        if "Testing Parameters" in report:
            report["Testing Parameters"].to_excel(writer, sheet_name=f'{model_name}_Parameters', index=False)


Test the best model with tuned parameters on the test data.

In [28]:
from sklearn.metrics import mean_squared_error

random_forest_reg_pipeline_best = Pipeline([
    ("preprocessing", preprocessing_pipeline),
    ("random_forest_regression", RandomForestRegressor(max_depth=30, min_samples_split=2,n_estimators=200,random_state=42, n_jobs=-1))
])
random_forest_reg_pipeline_best.fit(housing_train_features, housing_train_labels)

predictions = random_forest_reg_pipeline_best.predict(housing_test_features)

rmse = np.sqrt(mean_squared_error(housing_test_labels, predictions))


In [29]:
print(rmse)

47680.806229532645
