<a href="https://colab.research.google.com/github/nsp8/Machine-Learning-Resources/blob/colab-ml-practice/regression_flood_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Imports

In [2]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patheffects import withStroke
import seaborn as sns
from sklearn.decomposition import PCA
from sklearn.feature_selection import mutual_info_regression
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from google.colab import drive


In [3]:
!pip install ipython-autotime

[31mERROR: Operation cancelled by user[0m[31m
[0m

In [4]:
%load_ext autotime

time: 476 µs (started: 2024-05-31 00:28:42 +00:00)


#### Horizontal Chart for values associated with text labels

In [5]:
class Charts:
    def __init__(self, data, width=18, height=9, font_family="Sans Serif"):
        self.fig, self.ax = plt.subplots(figsize=(width, height))
        self.data = data
        self.y_pos = [i * 0.9 for i in range(len(data))]
        self.ax.barh(
            self.y_pos,
            self.data.values(),
            height=0.55,
            align="edge",
            color="blue",
        )
        self.font_family = font_family

    @property
    def max_data_value(self):
        return max(self.data.values())

    @property
    def min_data_value(self):
        return min(self.data.values())

    @property
    def axis_period(self):
        return int(self.max_data_value) // 10 or 1

    def set_axes(self):
        from math import ceil
        data_size = len(self.data)
        num_ticks = ceil(self.max_data_value)
        self.ax.xaxis.set_ticks([i * self.axis_period for i in range(num_ticks)])
        self.ax.xaxis.set_ticklabels(
            [i * self.axis_period for i in range(0, num_ticks)],
            size=16,
            fontfamily=self.font_family,
            fontweight=100,
        )
        self.ax.xaxis.set_tick_params(labelbottom=False, labeltop=True, length=0)
        self.ax.set_axisbelow(True)
        self.ax.grid(axis="x", color="#8C92AC", lw=1.2)
        self.ax.spines["left"].set_visible(False)
        self.ax.spines["right"].set_visible(False)
        self.ax.spines["top"].set_visible(False)
        self.ax.spines["bottom"].set_visible(False)
        self.ax.spines["left"].set_lw(1.5)
        self.ax.spines["left"].set_capstyle("butt")
        self.ax.yaxis.set_visible(False)
        return self

    def add_bar_text(self, label_object: dict):
        self.ax.text(
            x=label_object["x_pos"],
            y=label_object["y_pos"],
            s=label_object["text"],
            c=label_object["color"],
            fontfamily=self.font_family,
            fontsize=12,
            va="center",
            path_effects=label_object["path_effects"],
        )

    def add_labels(self, padding: float = 0.3):
        offset = 0.04
        for label, value, y_pos in zip(
            self.data.keys(), self.data.values(), self.y_pos
        ):
            metric_label = dict(
                x_pos=int(self.min_data_value) + (padding * 2.2),
                y_pos=(y_pos + 0.25),
                text=label,
                color="white",
                path_effects=[withStroke(linewidth=6, foreground="green")]
            )
            value_label = dict(
                x_pos=value,
                y_pos=(y_pos + 0.25),
                text=f"{round(value, 3)}0",
                color="green",
                path_effects=[withStroke(linewidth=6, foreground="white")]
            )
            if value < 0:
                metric_label["x_pos"] = self.min_data_value - (padding * 0.7) - offset
                metric_label["path_effects"] = [withStroke(linewidth=6, foreground="red")]
                metric_label["color"] = "white"
                value_label["color"] = "red"
                value_label["x_pos"] -= offset
            self.add_bar_text(metric_label)
            self.add_bar_text(value_label)

        return self

    def add_title(self, title, pos=0.5):
        self.fig.text(
            pos, 0.925, title, fontsize=16, fontweight="bold", fontfamily=self.font_family
        )
        return self

    def plot(self, title=None):
        self.set_axes().add_labels()
        if title:
            self.add_title(title)
        plt.show()


time: 10.8 ms (started: 2024-05-31 00:28:42 +00:00)


In [6]:
drive.mount('/gdrive/')

Drive already mounted at /gdrive/; to attempt to forcibly remount, call drive.mount("/gdrive/", force_remount=True).
time: 879 ms (started: 2024-05-31 00:28:42 +00:00)


### **Data Loading**

In [None]:
base_folder_path = os.path.join('/gdrive', 'My Drive', 'Projects', 'ML Practice', 'Regression: flood prediction', 'data')
if os.path.exists(base_folder_path):
    print("Loading data ...")
    train_data = pd.read_csv(os.path.join(base_folder_path, "train.csv"))
    test_data = pd.read_csv(os.path.join(base_folder_path, "test.csv"))
    print("Loaded train and test files from Drive")
else:
    raise FileNotFoundError("Could not locate files")

Loading data ...


## **Data Exploration**

### Basic statistical analyses

In [None]:
train_data.describe()

In [None]:
# Null counts
train_data.isnull().sum()

In [None]:
# Data type information
train_data.info()

In [None]:
# Layout of data
features_train = train_data.set_index("id")
target_column = "FloodProbability"
label_train = features_train.pop(target_column)
features_train.hist(bins=15, figsize=(20, 45), layout=(4, 5))

In [None]:
# Standardizing data

def standardize(data):
    standard_scaler = StandardScaler()
    standard_scaler.fit(data)
    return standard_scaler.transform(data)
    # _data = data.copy(deep=True)
    # return (_data - _data.mean(axis=0)) / _data.std(axis=0)


features_train_standardized = pd.DataFrame(standardize(features_train), columns=features_train.columns)
features_train_standardized.hist(bins=15, figsize=(20, 45), layout=(4, 5))
# print(features_train_standardized.shape)

In [None]:
def standardize_labels(label_data):
    mean, std = label_data.mean(), label_data.std()
    return label_data.apply(lambda v: (v - mean) / std)


label_train_standardized = pd.DataFrame(standardize_labels(label_train), columns=[target_column])
label_train_standardized.hist(bins=15, figsize=(5, 10))

#### Train test split

In [None]:
X_train, X_test, y_train, y_test = train_test_split(features_train_standardized, label_train_standardized, test_size=0.25)

In [None]:
X_train.columns

#### Correlation matrix of training features

In [None]:
plt.figure(figsize=(20, 20))
sns.heatmap(X_train.corr(), vmax=1, square=True, annot=True, cmap="plasma")

### Feature Analyses

In [None]:
class FeatureAnalyzer:
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.pca = PCA()
        self.pca.fit_transform(self.X)

    @property
    def X_pca(self):
        x_pca = self.pca.fit_transform(self.X)
        return pd.DataFrame(x_pca, columns=[f"PC{i+1}" for i in range(x_pca.shape[1])])

    @property
    def pca_loadings(self):
        return pd.DataFrame(self.pca.components_.T, columns=self.X_pca.columns, index=self.X.columns)

    def plot_variance(self, width=8, dpi=100):
        fig, axes = plt.subplots(1, 2)
        n = self.pca.n_components_
        print(f"\n{'-' * 35}\nPCA Loadings \n{'-' * 35}\n")
        print(self.pca_loadings)
        grid = np.arange(1, n+1)
        explained_variance = self.pca.explained_variance_ratio_
        axes[0].bar(grid, explained_variance)
        axes[0].set(xlabel="Component", title="% Explained Variance", ylim=(0.0, 1.0))
        cumulative_variance = np.cumsum(explained_variance)
        axes[1].plot(np.r_[0, grid], np.r_[0, cumulative_variance], marker="h")
        axes[1].set(xlabel="Component", title="% Cumulative Variance", ylim=(0.0, 1.0))
        fig.set(figwidth=width, dpi=dpi)
        return axes

    def make_mi_scores(self, discrete_features: bool = False):
        mi_regression = mutual_info_regression(self.X, self.y, discrete_features=discrete_features)
        return pd.Series(mi_regression, name="Mutual Info Scores", index=self.X.columns).sort_values(ascending=False)

    def check_outliers(self):
        sns.catplot(
            y="value",
            col="variable",
            data=self.X_pca.melt(),
            kind="boxen",
            sharey=False,
            col_wrap=2
        )


def show_feature_analysis_artifacts(X, y):
    feature_analyzer = FeatureAnalyzer(X, y)
    feature_analyzer.plot_variance()
    mi_scores = feature_analyzer.make_mi_scores()
    feature_analyzer.check_outliers()
    print(f"\n{'-' * 35}\nMutual Information Scores \n{'-' * 35}\n")
    print(mi_scores)
    return feature_analyzer


feature_analyzer_object = show_feature_analysis_artifacts(X_train, y_train)

In [None]:
feature_analyzer_object.pca_loadings

#### Visualize principal components

In [None]:
def plot_principal_component(feature_analyzer_object, level=1):
    if level < 1:
        raise ValueError("PC level can't be less than 1")
    _loadings = feature_analyzer_object.pca_loadings
    _column = _loadings.columns[level - 1]
    pc_dataframe = _loadings.loc[:, _column].reset_index().to_dict(orient="records")
    feature_to_pc1 = {_entry["index"]: float(_entry[_column]) for _entry in pc_dataframe}
    chart = Charts(data=feature_to_pc1, font_family="monospace")
    chart.plot(title=_column)

plot_principal_component(feature_analyzer_object, level=1)

#### Adding new features [test]

In [None]:
X_train_new = X_train.copy(deep=True)
X_train_new["IsCoastalVulnerability"] = X_train["CoastalVulnerability"] > 0
X_train_new["IsIneffectiveDisasterPreparedness"] = X_train["IneffectiveDisasterPreparedness"] > 0
X_train_new["IsDeforestation"] = X_train["Deforestation"] > 0
X_test_new = X_test.copy(deep=True)
X_test_new["IsCoastalVulnerability"] = X_test["CoastalVulnerability"] > 0
X_test_new["IsIneffectiveDisasterPreparedness"] = X_test["IneffectiveDisasterPreparedness"] > 0
X_test_new["IsDeforestation"] = X_test["Deforestation"] > 0

## **Training the model: XGBoost Regressor**

In [None]:
from sklearn.model_selection import learning_curve, validation_curve
from sklearn.pipeline import make_pipeline
from xgboost import XGBRegressor


class RegressionModel:
    def __init__(
        self,
        train_data: tuple,
        test_data: tuple,
        learning_rate=0.01,
        n_estimators=1000,
        early_stopping_rounds=5,
        corss_validation_folds=10,
        n_jobs=1
    ):
        self.model = XGBRegressor()
        self.train_data = train_data
        self.test_data = test_data
        self.learning_curve_params = {
            "X": train_data[0],
            "y": train_data[1],
            "train_sizes": np.linspace(0.1, 1, 10),
            "cv": corss_validation_folds,
            "n_jobs": n_jobs
        }
        self.validation_curve_params = {
            "X": train_data[0],
            "y": train_data[1],
            "cv": corss_validation_folds,
            "param_name": "xgbregressor__learning_rate",
            "param_range": np.linspace(0.0001, 0.1, 100),
            "n_jobs": n_jobs
        }

    @property
    def pipeline(self):
        return make_pipeline(self.model)

    def plot_learning_curve(self, curve_function="learning_curve"):
        def plot(data_sizes, scores, plot_params):
            data_average = np.mean(scores, axis=1)
            data_std = np.std(scores, axis=1)
            plt.plot(
                data_sizes,
                data_average,
                **plot_params
            )
            plt.fill_between(
                data_sizes,
                data_average + data_std,
                data_average - data_std,
                alpha=0.15,
                color=plot_params["color"]
            )
        if curve_function == "learning_curve":
            train_sizes, train_scores, test_scores = learning_curve(
                estimator=self.pipeline,
                **self.learning_curve_params
            )
            plt.xlabel("Number of training examples")
        else:
            train_scores, test_scores = validation_curve(
                estimator=self.pipeline,
                **self.validation_curve_params
            )
            train_sizes = self.validation_curve_params["param_range"]
            plt.xlabel("XGBRegressor learning rate")
        plot(train_sizes, train_scores, dict(color="blue", marker="o", markersize=5, label="Training accuracy"))
        plot(train_sizes, test_scores, dict(color="red", marker="s", linestyle="--", markersize=5, label="Validation accuracy"))
        plt.grid()

        plt.ylabel("Accuracy")
        plt.legend(loc="upper right")
        # plt.ylim([0.8, 1.03])
        plt.show()

In [None]:
models = list()

In [None]:
model = RegressionModel(train_data=(X_train, y_train), test_data=(X_test, y_test))
model.plot_learning_curve(learning_curve)
model.plot_learning_curve(validation_curve)
models.append(model)

Exception ignored on calling ctypes callback function: <bound method DataIter._next_wrapper of <xgboost.data.SingleBatchInternalIter object at 0x78646be37040>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/xgboost/core.py", line 589, in _next_wrapper
    def _next_wrapper(self, this: None) -> int:  # pylint: disable=unused-argument
KeyboardInterrupt: 


In [None]:
model = RegressionModel(train_data=(X_train_new, y_train), test_data=(X_test_new, y_test))
model.plot_learning_curve(learning_curve)
model.plot_learning_curve(validation_curve)
models.append(model)

#### Adding features based on the PCA

In [1]:
X_train_pca = feature_analyzer_object.pca.fit_transform(X_train)
X_test_pca = feature_analyzer_object.pca.fit_transform(X_test)
model = RegressionModel(train_data=(X_train_pca, y_train), test_data=(X_test_pca, y_test))
model.plot_learning_curve(learning_curve)
model.plot_learning_curve(validation_curve)
models.append(model)

NameError: name 'feature_analyzer_object' is not defined

In [None]:
X_train_pca = feature_analyzer_object.pca.fit_transform(X_train)
X_test_pca = feature_analyzer_object.pca.transform(X_test)
model = RegressionModel(train_data=(X_train_pca, y_train), test_data=(X_test_pca, y_test))
model.plot_learning_curve(learning_curve)
model.plot_learning_curve(validation_curve)
models.append(model)

In [None]:
from sklearn.model_selection import HalvingRandomSearchCV


xgbr_param_grid = [
    {
        "xgbr__learning_rate": np.linspace(0.0001, 0.1, 100)
    }
]