#### Imputing misssing values and dropping columns that are not relevant

In [1]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
import pandas as pd
import numpy as np
import joblib

class MissingValueHandling(BaseEstimator, TransformerMixin):
    def _init__(self):
        pass

    def fit(self, X):
        return self

    def transform(self, X):
        X_imputed = X.copy(deep=True)

        X_imputed.drop(["PoolQC", "Alley", "Id", "MiscFeature"], axis=1, inplace=True)

        index_impute = X["GarageYrBlt"].isna()
        impute_values = X.loc[index_impute, "YearBuilt"]
        X_imputed.loc[index_impute, "GarageYrBlt"] = impute_values

        variable_type = pd.read_excel("variable category.xlsx")
        var = variable_type["variable"]
        type = variable_type["type"]

        numerical = [i for i, j in zip(var, type) if j=="N"]
        numerical.remove("LotFrontage")
        numerical.remove("GarageYrBlt")
        categorical = [i for i, j in zip(var, type) if j=="C"]
        ordinal = [i for i, j in zip(var, type) if j=="O"]
        pass_through = ["LotFrontage", "MiscVal", "GarageYrBlt"]

        preprocessor = joblib.load("simple_imputers.joblib")
        
        # Apply transformations
        df_imputed = preprocessor.transform(X_imputed)

        # Convert back to DataFrame with column names
        df_imputed = pd.DataFrame(df_imputed, columns=numerical + categorical + ordinal + pass_through)

        return df_imputed

#### Column transformer (pre-processor) for imputing LotFrontage missing values

In [2]:
class TransformerLotFrontage(BaseEstimator, TransformerMixin):
    def _init__(self):
        pass

    def fit(self, X):
        cols = ["MSSubClass", "MSZoning", "LotFrontage", "LotArea", "Street", "LotShape", "LandContour"]
        self.X_short = X[cols]
        # self.lot_frontage = self.X_short["LotFrontage"]
        self.X_short = self.X_short.drop("LotFrontage", axis=1)
        self.numerical = ["LotArea"]
        self.categorical = self.X_short.columns.to_list()
        self.categorical.remove("LotArea")
        self.preprocessor = joblib.load("lot_frontage_transformer.joblib")
        return self

    def transform(self, X):
        X_processed = self.preprocessor.transform(self.X_short).toarray()
        categorical_features = self.preprocessor.named_transformers_["onehot"].get_feature_names_out(self.categorical)
        numerical_features = self.numerical
        self.df_processed = pd.DataFrame(X_processed, columns=list(categorical_features)+numerical_features)
        return self.df_processed

#### Model for Imputing LotFrontage Missing Values

In [3]:
class ImputerLotFrontage(BaseEstimator, TransformerMixin):
    def _init__(self):
        pass

    def fit(self, X):

        try:
            self.y_true = X["SalePrice"]
        except:
            pass
        
        self.lf = joblib.load("imputation.joblib")

        enc = TransformerLotFrontage()
        self.X_short = enc.fit_transform(X)
        
        # self.total_features = X.columns.to_list()
        self.sig_var = pd.read_excel("sig_var_impute.xlsx")["sig_var"].to_list()

        return self
    
    def transform(self, X):
        index_impute = X["LotFrontage"].isna()
        X_impute = self.X_short[index_impute]
        X_impute = X_impute[self.sig_var]
        y_pred = self.lf.predict(X_impute)
        X_complete = X.copy(deep=True)
        X_complete.loc[index_impute, "LotFrontage"] = y_pred
        return X_complete

#### Pre-processor for SalePrice model

In [4]:
class TransfromerSalePrice(BaseEstimator, TransformerMixin):
    def _init__(self):
        pass

    def fit(self, X):

        variable_type = pd.read_excel("variable category.xlsx")
        var = variable_type["variable"]
        type = variable_type["type"]

        numerical = [i for i, j in zip(var, type) if j=="N"]
        categorical = [i for i, j in zip(var, type) if j=="C"]
        ordinal = [i for i, j in zip(var, type) if j=="O"]
        pass_through = [i for i, j in zip(var, type) if j=="PT"]
        
        self.preprocessor = joblib.load("sale_price_transformer.joblib")

        categorical_features = self.preprocessor.named_transformers_["onehot"].get_feature_names_out(categorical)
        ordinal_features = self.preprocessor.named_transformers_["oe"].get_feature_names_out(ordinal)
        numerical_features = numerical  # Original names for numerical columns
        pass_through_features = pass_through
        self.all_features = list(categorical_features) + list(ordinal_features) + numerical_features + pass_through_features
        
        return self
    
    def transform(self, X):
   
        arr = self.preprocessor.transform(X).toarray()
        X_transformed = pd.DataFrame(data=arr, columns=self.all_features)
        return X_transformed

####  Elastic Net Model for SalePrice prediction

In [5]:
class ElasticNetModel(BaseEstimator, TransformerMixin):
    def _init__(self):
        pass

    def fit(self, X):

        self.model = joblib.load("model_elastic_net.joblib")
        self.X_transformed = X.copy(deep=True)
        self.misc_val = self.X_transformed["MiscVal"].reset_index(drop=True)
        self.X_transformed = self.X_transformed.drop("MiscVal", axis=1)
        self.mean = 12.013
        self.std = 0.386
        
        return self
    
    def transform(self, X):
        y_pred = self.model.predict(self.X_transformed)
        y_pred = (y_pred*self.std) + self.mean
        y_pred = np.exp(y_pred) + self.misc_val.to_numpy()
        y_pred = y_pred.reshape(-1, 1)
        return y_pred

####  Random Forest Model for SalePrice prediction

In [6]:
class ModelRandomForest(BaseEstimator, TransformerMixin):
    def _init__(self):
        pass

    def fit(self, X):

        model_lasso = joblib.load("model_elastic_net.joblib")

        self.X_transformed = X.copy(deep=True)
        self.misc_val = self.X_transformed["MiscVal"].reset_index(drop=True)
        self.X_transformed = self.X_transformed.drop("MiscVal", axis=1)
        
        coef = list(model_lasso.coef_)
        all_features = self.X_transformed.columns.to_list()
        self.sig_var = [i for i, j in zip(all_features, coef) if j!=0]
        self.mean = 12.013
        self.std = 0.386

        return self
    
    def transform(self, X):
        self.X_transformed = self.X_transformed[self.sig_var]
        model_rf = joblib.load("model_random_forest.joblib")
        y_pred = model_rf.predict(self.X_transformed)
        y_pred = (y_pred*self.std) + self.mean
        y_pred = y_pred + self.misc_val.to_numpy()
        y_pred = y_pred.reshape(-1, 1)
        return y_pred

####  Gradient boost Model for SalePrice prediction

In [7]:
class ModelGB(BaseEstimator, TransformerMixin):
    def _init__(self):
        pass

    def fit(self, X):

        model_lasso = joblib.load("model_elastic_net.joblib")

        self.X_transformed = X.copy(deep=True)
        self.misc_val = self.X_transformed["MiscVal"].reset_index(drop=True)
        self.X_transformed = self.X_transformed.drop("MiscVal", axis=1)
        
        coef = list(model_lasso.coef_)
        all_features = self.X_transformed.columns.to_list()
        self.sig_var = [i for i, j in zip(all_features, coef) if j!=0]
        self.mean = 12.013
        self.std = 0.386

        return self
    
    def transform(self, X):
        self.X_transformed = self.X_transformed[self.sig_var]
        model_rf = joblib.load("model_gb.joblib")
        y_pred = model_rf.predict(self.X_transformed)
        y_pred = (y_pred*self.std) + self.mean
        y_pred = y_pred + self.misc_val.to_numpy()
        y_pred = y_pred.reshape(-1, 1)
        return y_pred

In [8]:
class WeightedAverageEnsemble(BaseEstimator, TransformerMixin):
    def __init__(self, weights):
        self.weights = weights

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return np.average(X, axis=1, weights=self.weights)

#### Building pipeline for prediction

In [9]:
from sklearn.pipeline import Pipeline

df = pd.read_csv("test.csv")

data_processing_pipeline = Pipeline(steps=[
    ("missing_value_handler", MissingValueHandling()),
    ("lot_frontage_imputer", ImputerLotFrontage()),
    ("sale_price_transform", TransfromerSalePrice()),
])

df_processed = data_processing_pipeline.fit_transform(df)

# Create an ensemble pipeline
ensemble_pipeline = Pipeline(steps=[
    ("model_predictions", ColumnTransformer([
        ("elastic_net", ElasticNetModel(), list(df_processed.columns)),
        ("random_forest", ModelRandomForest(), list(df_processed.columns)), 
        ("xgboost", ModelGB(), list(df_processed.columns)),
    ])),
    ("weighted_avg", WeightedAverageEnsemble(weights=[0.5, 0.0, 0.5]))
])

prediction = ensemble_pipeline.fit_transform(df_processed)



#### Debugging code

In [10]:
df = pd.read_csv("test.csv")
# df = df[df["SalePrice"]<480000] ### outlier removal

missing_value_handler = MissingValueHandling()
df2 = missing_value_handler.fit_transform(df)

trans_lf = TransformerLotFrontage()
df3 = trans_lf.fit_transform(df2)

imputer_lf = ImputerLotFrontage()
df4 = imputer_lf.fit_transform(df2)

trans_sp = TransfromerSalePrice()
df5 = trans_sp.fit_transform(df4)

elastic_net_model = ElasticNetModel()
yhat_elastic_net = elastic_net_model.fit_transform(df5)

rf_model = ModelRandomForest()
yhat_rf = rf_model.fit_transform(df5)

gb_model = ModelGB()
yhat_gb = gb_model.fit_transform(df5)

prediction = 0.5*yhat_elastic_net + 0.0*yhat_rf + 0.5*yhat_gb

Id = df["Id"]
df_pred = pd.DataFrame(Id, columns=["Id"])

df_pred["SalePrice"] = prediction

df_pred.to_csv("submission.csv", index=False)



#### XGB Model not used during the main model

In [11]:
class ModelXGB(BaseEstimator, TransformerMixin):
    def _init__(self):
        pass

    def fit(self, X):

        model_lasso = joblib.load("model_elastic_net.joblib")

        self.X_transformed = X.copy(deep=True)
        self.misc_val = self.X_transformed["MiscVal"].reset_index(drop=True)
        self.X_transformed = self.X_transformed.drop("MiscVal", axis=1)
        
        coef = list(model_lasso.coef_)
        all_features = self.X_transformed.columns.to_list()
        self.sig_var = [i for i, j in zip(all_features, coef) if j!=0]
        self.mean = 12.013
        self.std = 0.386

        return self
    
    def transform(self, X):
        self.X_transformed = self.X_transformed[self.sig_var]
        model_rf = joblib.load("model_gb.joblib")
        y_pred = model_rf.predict(self.X_transformed)
        y_pred = (y_pred*self.std) + self.mean
        y_pred = y_pred + self.misc_val.to_numpy()
        y_pred = y_pred.reshape(-1, 1)
        return y_pred