In [None]:
from pprint import pp

In [None]:
# Residual plots
IS_PLOT_RES = False
SAVE_PLOT_RES = False

# Combined plots
SAVE_PLOT = False

# Data
SAVE_DATA = False

### Class definitions
Note that in real production you should put this definition in a separate .py file and import it. 
For example

```python
from classes_v1 import DataHandler, MyUtil, RegSwitcher
```

In [None]:
import pickle
from datetime import datetime
from sklearn.base import BaseEstimator
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.metrics import (
    PredictionErrorDisplay,
    mean_absolute_percentage_error,
    mean_squared_error,
    r2_score,
)
import pandas as pd


class MyUtil:
    @staticmethod
    def save_data(filename, data):
        with open(filename, "wb") as file:
            pickle.dump(data, file)
    
    @staticmethod
    def load_data(filename):
        with open(filename, "rb") as file:
            data = pickle.load(file)
        return data

    @staticmethod
    def get_dt():
        return datetime.now().strftime("%Y-%m-%d_%H-%M")


class DataHandler:
    def __init__(self, _X, _Y, scalerX, scalerY):
        self._X = _X
        self._Y = _Y
        self.scalerX = scalerX
        self.scalerY = scalerY
        self.X_train = None
        self.X_test = None
        self.Y_train = None
        self.Y_test = None

    def split_and_scale(self, test_size, random_state):
        _X_train, _X_test, _Y_train, _Y_test = train_test_split(
            self._X, self._Y, test_size=test_size, random_state=random_state
        )
        self.X_train = self.scalerX.fit_transform(_X_train)
        self.X_test = self.scalerX.transform(_X_test)

        self.Y_train = self.scalerY.fit_transform(_Y_train)
        self.Y_test = self.scalerY.transform(_Y_test)

    def get_train(self):
        return self.X_train, self.Y_train

    def get_test(self):
        return self.X_test, self.Y_test


class RegSwitcher(BaseEstimator):
    def __init__(self, base=None):
        self.base = base
        self.dt = datetime.now().strftime("%Y-%m-%d_%H-%M")

    def fit(self, X, Y):
        self.base.fit(X, Y)
        self.is_fitted_ = True
        return self

    def predict(self, X):
        return self.base.predict(X)


class MyEval:
    @staticmethod
    def eval_perf(y_true, y_pred):
        mse = mean_squared_error(y_true=y_true, y_pred=y_pred)
        mape = mean_absolute_percentage_error(y_true=y_true, y_pred=y_pred)
        r2 = r2_score(y_true=y_true, y_pred=y_pred)
        return mse, mape, r2

    @classmethod
    def eval(cls, Y_train, Y_test, Y_train_pred, Y_test_pred, **kwargs):
        data_arr = []
        for i in range(0, Y_train.shape[1]):
            mse_train, mape_train, r2_train = cls.eval_perf(
                y_true=Y_train[:, i], y_pred=Y_train_pred[:, i]
            )
            mse_test, mape_test, r2_test = cls.eval_perf(
                y_true=Y_test[:, i], y_pred=Y_test_pred[:, i]
            )

            data = {
                **kwargs,
                "Y": f"Y-{i + 1}",
                "MSE Train (No Val)": mse_train,
                "MSE Test": mse_test,
                "MAPE Train (No Val)": mape_train,
                "MAPE Test": mape_test,
                "R2 Train (No Val)": r2_train,
                "R2 Test": r2_test,
            }
            data_arr.append(data)

        mse_train, mape_train, r2_train = cls.eval_perf(
            y_true=Y_train, y_pred=Y_train_pred
        )
        mse_test, mape_test, r2_test = cls.eval_perf(y_true=Y_test, y_pred=Y_test_pred)
        data = {
            **kwargs,
            "Y": "Y-All",
            "MSE Train (No Val)": mse_train,
            "MSE Test": mse_test,
            "MAPE Train (No Val)": mape_train,
            "MAPE Test": mape_test,
            "R2 Train (No Val)": r2_train,
            "R2 Test": r2_test,
        }
        data_arr.append(data)
        df_eval = pd.DataFrame.from_dict(data_arr)
        return df_eval

    @classmethod
    def plot_res(
        cls,
        Y_train,
        Y_test,
        Y_train_pred,
        Y_test_pred,
        current_dir=".",
        dt="",
        save=False,
        show=False,
        file_prefix="",
    ):
        for i in range(0, Y_train.shape[1]):
            fig, axes = plt.subplots(
                nrows=1,
                ncols=2,
                figsize=(10, 5),
                constrained_layout=True,
                sharex=True,
                sharey=True,
            )

            display_train = PredictionErrorDisplay(
                y_true=Y_train[:, i], y_pred=Y_train_pred[:, i]
            )
            display_train.plot(ax=axes[0])
            axes[0].set_title("Train")

            display_train = PredictionErrorDisplay(
                y_true=Y_test[:, i], y_pred=Y_test_pred[:, i]
            )
            display_train.plot(ax=axes[1])
            axes[1].set_title("Test")

            if file_prefix != "":
                fig.suptitle(file_prefix)

            if save:
                if file_prefix == "":
                    raise Exception("Please specify file prefix")

                filename = f"{current_dir}/{file_prefix}_{dt}_{i}.png"
                fig.savefig(filename, dpi=300)

            if show:
                plt.show()
            else:
                plt.close(fig)

### Load data


In [None]:
# Search for pkl files
from os import listdir
from os.path import isfile, join

onlyfiles = [f for f in listdir(".") if (isfile(join(".", f)) and f.endswith("pkl"))]
pp(onlyfiles)

In [None]:
filenameInput = "S04_data_2025-12-11_11-50.pkl"
data_load = MyUtil.load_data(filename=filenameInput)

# Print keys
pp([k for k in data_load.keys()])


In [None]:
dt = MyUtil.get_dt()

In [None]:
data_handler = data_load["data_handler"]
df_cv = data_load["df_cv"]

In [None]:
df_cv

### Calculate test results


In [None]:
# Sort the DataFrame by "rank_test_score"
df_cv = df_cv.sort_values(by="rank_test_score")

# Groups the sorted DataFrame by the columns "id_split" and "estimator".
# For each group (unique combination of split and estimator), selects the first row (which, after sorting, is the one with the best rank_test_score).
# .reset_index() turns the groupby indices back into columns for a clean DataFrame.
df_fit_select = df_cv.groupby(["id_split", "estimator"]).first().reset_index()

display(df_fit_select)

In [None]:
# Initialize blank model (optional)
reg = RegSwitcher(base=None)


df_arr = []
for idx, fit in df_fit_select.iterrows():
    # pp(fit["param_split"])
    # pp(fit["params"])

    param_split = fit["param_split"]
    data_handler.split_and_scale(**param_split)

    X_train, Y_train = data_handler.get_train()
    X_test, Y_test = data_handler.get_test()

    params = fit["params"]
    reg.set_params(**params)

    reg.fit(X_train, Y_train)

    Y_train_pred = reg.predict(X_train)
    Y_test_pred = reg.predict(X_test)

    _df = MyEval.eval(
        Y_train=Y_train,
        Y_train_pred=Y_train_pred,
        Y_test=Y_test,
        Y_test_pred=Y_test_pred,
        id_split=fit["id_split"],
        id_gs=fit["id_gs"],
        estimator=fit["estimator"],
    )
    df_arr.append(_df)

    if IS_PLOT_RES:
        id_split = fit["id_split"]
        estimator = fit["estimator"]
        MyEval.plot_res(
            Y_train=Y_train,
            Y_train_pred=Y_train_pred,
            Y_test=Y_test,
            Y_test_pred=Y_test_pred,
            dt=dt,
            save=SAVE_PLOT_RES,
            file_prefix=f"S05-{estimator}-{id_split}",
        )

df_eval = pd.concat(df_arr).reset_index(drop=True)

In [None]:
df_eval

### Further data processing

In [None]:
# Merge
# - **After this code runs:**
#   - `df_eval` will have a new column, `validation_scores`, containing values from `df_cv` where the keys match.
#   - All original rows in `df_eval` are preserved.

# - **What it does:**
#   - Sets the value of the `"Y"` column in the `df_cv` DataFrame to `"Y-All"` for all rows.
#   - If the `"Y"` column does not exist, it is created.
df_cv["Y"] = "Y-All"

# - **What it does:**
#   - Defines a list of column names that will be used from `df_cv` when merging with `df_eval`.
#   - These columns are: `id_split`, `id_gs`, `Y`, and `validation_scores`.
colsToMerge = ["id_split", "id_gs", "Y", "validation_scores"]

# - **What it does:**
#   - Merges the `df_eval` DataFrame with a subset of `df_cv` (only the columns in `colsToMerge`).
#   - The merge is performed on the columns `id_split`, `id_gs`, and `Y`.
#   - `how="left"` specifies a left join, meaning all rows from `df_eval` are kept, and matching rows from `df_cv` are added where available.
#   - If there is no match in `df_cv`, the new columns (e.g., `validation_scores`) will have `NaN` values in `df_eval`.
df_eval = df_eval.merge(df_cv[colsToMerge], on=["id_split", "id_gs", "Y"], how="left")

In [None]:
df_eval

In [None]:
import numpy as np


# - **What this function does:**
#   - Takes a DataFrame `_df` (expected to be a group from a groupby operation).
#   - Extracts the `"validation_scores"` column, which contains arrays or lists.
#   - Concatenates all arrays/lists into a single NumPy array (`val_scores`).
#   - Returns a new DataFrame with one column, `"cv_results"`, containing all the concatenated scores.
def expandCV(_df):
    val_scores = np.concatenate(_df["validation_scores"].values)
    return pd.DataFrame(data={"cv_results": val_scores})


#   - Creates a boolean mask that is `True` where the `"Y"` column equals `"Y-All"`.
#   - This mask is used to filter the DataFrame for relevant rows.
filt = df_eval["Y"] == "Y-All"

# - **Step-by-step:**
#   1. **Filter Rows:**
#      `df_eval[filt]` selects only rows where `"Y" == "Y-All"`.
#   2. **Group by Estimator:**
#      `.groupby(by=["estimator"])` groups the filtered DataFrame by the `"estimator"` column.
#   3. **Apply `expandCV`:**
#      `.apply(expandCV, include_groups=False)` applies the `expandCV` function to each group, expanding the cross-validation results into individual rows.
#   4. **Reset Index:**
#      `.reset_index(drop=False)` resets the index so that group labels become columns.
#   5. **Drop Extra Column:**
#      `.drop(columns=["level_1"])` removes the `"level_1"` column, which is a byproduct of the groupby-apply process.
cv_data = (
    df_eval[filt]
    .groupby(by=["estimator"])
    .apply(expandCV)
    .reset_index(drop=False)
    .drop(columns=["level_1"])
)

display(cv_data)

### Plotting

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

fig, axes = plt.subplots(1, 3, figsize=(20, 4))

# Plot CV results
sns.boxplot(cv_data, x="estimator", y="cv_results", ax=axes[0])
axes[0].set_ylim([0, 1])
axes[0].set_title("CV Results")

# Plot test results
ax = sns.boxplot(data=df_eval, x="estimator", y="R2 Test", hue="Y", ax=axes[1])
axes[1].set_title("Test Result")

# Plot train (no cv) results
sns.boxplot(data=df_eval, x="estimator", y="R2 Train (No Val)", hue="Y", ax=axes[2])
axes[2].set_title("Train Result (No Validation)")

if SAVE_PLOT:
    filename = f"S05_eval_{dt}.png"
    fig.savefig(filename, dpi=300, bbox_inches="tight")

### Save data

In [None]:
if SAVE_DATA:
    filename = f"S05_data_{dt}.pkl"

    data_save = {
        "desc": "This is the saved data",
        "filenameInput": filename,
        "df_eval": df_eval,
        "cv_data": cv_data,
    }

    # Save the model
    MyUtil.save_data(filename=filename, data=data_save)

### Test loading data


In [None]:
if SAVE_DATA:
    data_load = MyUtil.load_data(filename=filename)

    pp(list(data_load.keys()))