In [1]:
import numpy as np
import pandas as pd

from scipy.stats import norm
from sklearn.base import TransformerMixin
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


class BaseDataPreprocessor(TransformerMixin):
    def __init__(self, num_features="auto"):
        """
        :param needed_columns: if not None select these columns from the dataframe
        """
        self.scaler = StandardScaler()
        self.needed_columns_ = num_features

    def fit(self, data, *args):
        """
        Prepares the class for future transformations
        :param data: pd.DataFrame with all available columns
        :return: self
        """
        if self.needed_columns_ == "auto":
            self.needed_columns_ = [
                key for key in data.keys() if data[key].dtype in ("int64", "float64")
            ]
        if self.needed_columns_ is not None:
            data = data[self.needed_columns_]
        self.scaler.fit(data)
        return self

    def transform(self, data: pd.DataFrame) -> np.array:
        """
        Transforms features so that they can be fed into the regressors
        :param data: pd.DataFrame with all available columns
        :return: np.array with preprocessed features
        """
        if not self.needed_columns_ is None:
            data = data[self.needed_columns_]
        return self.scaler.transform(data)


class OneHotPreprocessor(BaseDataPreprocessor):
    def __init__(self, cat_features: list[str] = None, **kwargs):
        super(OneHotPreprocessor, self).__init__(**kwargs)
        self.cat_features_ = [] if cat_features is None else cat_features
        self.cat_encoder_ = OneHotEncoder(handle_unknown="ignore", sparse_output=False)

    def fit(self, data, *args):
        self.cat_encoder_.fit(data[self.cat_features_])
        super(OneHotPreprocessor, self).fit(data, *args)
        return self

    def transform(self, data):
        cat_data = self.cat_encoder_.transform(data[self.cat_features_])
        num_data = super(OneHotPreprocessor, self).transform(data)
        print(cat_data.shape, num_data.shape)
        return np.hstack((cat_data, num_data))


def make_ultimate_pipeline():
    cat_columns = ["Overall_Qual", "Garage_Qual", "Sale_Condition", "MS_Zoning"]

    pipe = Pipeline(
        steps=[
            (
                "Feature preprocessing",
                OneHotPreprocessor(
                    cat_features=cat_columns,
                    num_features="auto",
                ),
            ),
            ("Estimator", Ridge()),
        ]
    )
    return pipe

In [2]:
import openml
from sklearn.model_selection import train_test_split
from sklearn.metrics import root_mean_squared_log_error

seed = 42

dataset = openml.datasets.get_dataset(41211)
data, y, _, _ = dataset.get_data(dataset_format="dataframe")

target_column = "Sale_Price"
np.random.seed(seed)

test_size = 0.2
data_train, data_test, Y_train, Y_test = train_test_split(
    data[data.columns.drop("Sale_Price")],
    np.array(data["Sale_Price"]),
    test_size=test_size,
    random_state=seed,
)

pipe = make_ultimate_pipeline()
pipe.fit(data_train, Y_train)
root_mean_squared_log_error(Y_test, pipe.predict(data_test))

(2344, 29) (2344, 23)
(586, 29) (586, 23)


0.14507118037476052