In [31]:
import pandas as pd
import numpy as np
from xgboost import XGBRegressor
import optuna

from sklearn.model_selection import train_test_split, KFold, cross_val_score
from sklearn.ensemble import IsolationForest
from sklearn.cluster import KMeans
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, FunctionTransformer, StandardScaler
from sklearn.metrics import mean_squared_log_error, mean_absolute_error, r2_score, make_scorer

class Config:
    """A class that stores all configurations and constants."""
    # Main toggler to decide whether we want to use optuna tuning or use default parameters
    TUNE_HYPERPARAMETERS = False

    # Number of trials in Optuna
    N_TRIALS_OPTUNA = 50
    
    # Optimal number of clusters in K-Means algorithm
    OPTIMAL_K_CLUSTERS = 12
    
    # Best xgboost parameters found previously by optuna
    BEST_PARAMS = {
        'n_estimators': 2353,
        'learning_rate': 0.015,
        'max_depth': 3, 
        'subsample': 0.719,
        'colsample_bytree': 0.678,
        'reg_alpha': 0.016,
        'reg_lambda': 3.652,
        'min_child_weight': 5.172
    }
    
    # Mapping ordinal features
    ORDINAL_FEATURE_MAP = {
        'ExterQual': ['Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'ExterCond': ['Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'BsmtQual': ['None', 'Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'BsmtCond': ['None', 'Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'BsmtExposure': ['None', 'No', 'Mn', 'Av', 'Gd'],
        'BsmtFinType1': ['None', 'Unf', 'LwQ', 'Rec', 'BLQ', 'ALQ', 'GLQ'],
        'BsmtFinType2': ['None', 'Unf', 'LwQ', 'Rec', 'BLQ', 'ALQ', 'GLQ'],
        'HeatingQC': ['Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'KitchenQual': ['Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'GarageFinish': ['None', 'Unf', 'RFn', 'Fin'],
        'GarageQual': ['None', 'Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'GarageCond': ['None', 'Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'PavedDrive': ['N', 'P', 'Y'],
        'Fence': ['None', 'MnWw', 'GdWo', 'MnPrv', 'GdPrv'],
        'OverallQual': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        'OverallCond': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        'Functional': ['Sal', 'Sev', 'Maj2', 'Maj1', 'Mod', 'Min2', 'Min1', 'Typ'],
        'FireplaceQu': ['None', 'Po', 'Fa', 'TA', 'Gd', 'Ex'],
        'PoolQC': ['None', 'Fa', 'TA', 'Gd', 'Ex'],
        'LotShape': ['IR3', 'IR2', 'IR1', 'Reg'],
        'LandContour': ['Low', 'HLS', 'Bnk', 'Lvl'],
        'LandSlope': ['Sev', 'Mod', 'Gtl']
    }

    # Lists of features grouped by different types
    
    NOMINAL_FEATURES = [
        'MSSubClass', 'MSZoning', 'Alley', 'LotConfig', 'Neighborhood',
        'Condition1', 'Condition2', 'BldgType', 'HouseStyle', 'RoofStyle',
        'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType', 'Foundation',
        'Heating', 'Electrical', 'GarageType', 'MiscFeature', 'SaleType',
        'SaleCondition', 'House_Cluster'
    ]

    SKEWED_FEATURES = [
        'LotFrontage', 'LotArea', 'YearBuilt', 'YearRemodAdd', 'MasVnrArea',
        'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', '1stFlrSF',
        '2ndFlrSF', 'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath',
        'HalfBath', 'KitchenAbvGr', 'TotRmsAbvGrd', 'Fireplaces', 'GarageYrBlt',
        'WoodDeckSF', 'OpenPorchSF', 'EnclosedPorch', '3SsnPorch', 'ScreenPorch',
        'PoolArea', 'MiscVal', 'GarageArea', 'AgeHouse', 'AgeSinceRemod',
        'TotalSF', 'TotalPorchSF', 'GarageAge'
    ]
    
    NUMERIC_FEATURES = [
        'FullBath', 'BedroomAbvGr', 'GarageCars', 'Month_sin', 'Month_cos', 
        'YrSold', 'CentralAir', 'IsRemodeled', 'HasPorch', 'HasGarage', 
        'Has2ndFlr', 'HasBsmt', 'HasFireplace', 'HasPool', 'IsNewHouse',
        'TotalBaths', 'GarageAreaPerCar', 'OverallScore', 'OverallQual_sq', 
        'BsmtFinRatio', 'BedBathRatio'
    ]

class HousingPricePredictor:
    """Class used for training a model and predicting house prices."""
    def __init__(self, config):
        self.config = config
        self.preprocessor = None
        self.model = None

    @staticmethod
    def _print_evaluation_metrics(y_true, y_pred, dataset_name):
        """Print evaluation metrics for given dataset."""
        print(f"\n=== {dataset_name} metrics ===")
        print(f"R²: {r2_score(np.expm1(y_true), np.expm1(y_pred)):.4f}")
        print(f"RMSLE: {mean_squared_log_error(np.expm1(y_true), np.expm1(y_pred), squared = False):.4f}")
        print(f"MAE: {mean_absolute_error(np.expm1(y_true), np.expm1(y_pred)):.2f}")
        
    def _clean_data(self, data):
        """Handles missing values correctly based on their meaning."""
        df = data.copy()

        # Define strategies of handling with missing data
        cols_na_as_none = [
            'Alley', 'BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1',
            'BsmtFinType2', 'FireplaceQu', 'GarageType', 'GarageFinish',
            'GarageQual', 'GarageCond', 'PoolQC', 'Fence', 'MiscFeature', 'MasVnrType'
        ]
        cols_na_as_zero = [
            'BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', 'BsmtFullBath',
            'BsmtHalfBath', 'GarageArea', 'GarageCars', 'MasVnrArea'
        ]

        # Columns to fill with the most frequent value per category
        cols_na_as_mode = [
            'MSZoning', 'Functional', 'Electrical',
            'KitchenQual', 'Exterior1st', 'Exterior2nd', 'SaleType'
        ]

        # Fill missed values in specified columns
        for col in cols_na_as_none:
            df[col] = df[col].fillna('None')

        for col in cols_na_as_zero:
            df[col] = df[col].fillna(0)

        for col in cols_na_as_mode:
            df[col] = df[col].fillna(df[col].mode()[0])

        # Fill with median from neighborhood, then with global median if there are empty values left
        df['LotFrontage'] = df.groupby('Neighborhood')['LotFrontage'].transform(
            lambda x: x.fillna(x.median()))
        
        if df['LotFrontage'].isnull().any():
            df['LotFrontage'] = df['LotFrontage'].fillna(df['LotFrontage'].median())

        # If no info about GarageYrBlt then fill with house year built
        df['GarageYrBlt'] = df['GarageYrBlt'].fillna(df['YearBuilt'])

        # Switch formats of some variables for convinience
        df['MSSubClass'] = df['MSSubClass'].astype(str)
        df['CentralAir'] = df['CentralAir'].map({'Y': 1, 'N': 0})

        return df
        
    def _apply_feature_engineering(self, data):
        """Add custom engineering features to improve predictive power of the model."""
        df = data.copy()
        # Features based on time
        df['AgeHouse'] = (df['YrSold'] - df['YearBuilt']).clip(lower = 0)
        df['AgeSinceRemod'] = (df['YrSold'] - df['YearRemodAdd']).clip(lower = 0)
        df['IsRemodeled'] = (df['YearRemodAdd'] > df['YearBuilt']).astype(int)
        df['IsNewHouse'] = (df['YearBuilt'] == df['YrSold']).astype(int)
        df['GarageAge'] = (df['YrSold'] - df['GarageYrBlt']).clip(lower = 0)
    
        # Features based on total surface
        df['TotalSF'] = df[['TotalBsmtSF','1stFlrSF','2ndFlrSF']].sum(axis = 1).clip(lower = 0)
        df['TotalPorchSF'] = df[['OpenPorchSF','EnclosedPorch','3SsnPorch','ScreenPorch','WoodDeckSF']].sum(axis = 1)

        # Binary features
        df['HasPorch'] = (df['TotalPorchSF'] > 0).astype(int)
        df['Has2ndFlr'] = (df['2ndFlrSF'] > 0).astype(int)
        df['HasBsmt'] = (df['TotalBsmtSF'] > 0).astype(int)
        df['HasFireplace'] = (df['Fireplaces'] > 0).astype(int)
        df['HasPool'] = (df['PoolArea'] > 0).astype(int)
        df['HasGarage'] = (df['GarageArea'] > 0).astype(int)

        # Features based on number of rooms
        df['TotalBaths'] = df['FullBath'] + 0.5 * df['HalfBath'] + df['BsmtFullBath'] + 0.5 * df['BsmtHalfBath']

        # Proportion indicators
        df['GarageAreaPerCar'] = df['GarageArea'] / df['GarageCars'].replace(0,1)
        df['BsmtFinRatio'] = df['BsmtFinSF1'] / df['TotalBsmtSF'].replace(0, 1)
        df['BedBathRatio'] = df['BedroomAbvGr'] / df['TotalBaths'].replace(0,1)

        # Interaction and quality features
        df['OverallScore'] = df['OverallQual'] * df['OverallCond']
        df['QualPerSF'] = df['OverallQual'] / df['GrLivArea'].replace(0,1)
        df["OverallQual_sq"] = df["OverallQual"] ** 2
        df["GrLivArea_sq"] = np.log1p(df["GrLivArea"])

        # Seasonal features
        df['Month_sin'] = np.sin(2 * np.pi * df['MoSold'] / 12)
        df['Month_cos'] = np.cos(2 * np.pi * df['MoSold'] / 12)

        # Drop unnecessary and noise data
        df = df.drop(['MoSold', 'Utilities', 'Street'], axis = 1, errors = 'ignore')

        return df

    def _filter_outliers(self, X, y):
        """Detect and remove outliers from training set."""
        # Select only skewed features with high feature importance for selection
        outlier_detection_features = ['GrLivArea', 'OverallQual', 'TotalSF', 
                                      'YearBuilt', 'TotalBaths', 'GarageArea']
        
        clf = IsolationForest(n_estimators = 100, contamination = 0.005, random_state = 1)
        outlier_preds = clf.fit_predict(X[outlier_detection_features].fillna(0).values) # Return 1 for normal and -1 for outliers
        
        non_outlier_indices = np.where(outlier_preds == 1)[0]
        
        return X.iloc[non_outlier_indices], y.iloc[non_outlier_indices]

    def _add_cluster_features(self, X, X_test):
        """Adds cluster features for K-Means algorithm."""
        cluster_features = ['OverallQual', 'GrLivArea', 'TotalSF', 'AgeHouse', 'TotalBaths', 'OverallScore']
        
        # Scaling is required for K-Means algorithm
        scaler = StandardScaler()
        kmeans = KMeans(n_clusters = self.config.OPTIMAL_K_CLUSTERS, random_state = 1, n_init = 'auto')

        # Adjust scaler only to training data
        scaled_train_features = scaler.fit_transform(X[cluster_features].fillna(0))
        kmeans.fit(scaled_train_features)
        
        # Use objects for transformation of both datasets
        X['House_Cluster'] = kmeans.predict(scaled_train_features)
        scaled_test_features = scaler.transform(X_test[cluster_features].fillna(0))
        X_test['House_Cluster'] = kmeans.predict(scaled_test_features)
        
        return X, X_test
    
    def _build_preprocessor(self):
        """Build pipeline for data preprocessing."""
        # Transform ordinal map into keys and values
        ordinal_features = list(self.config.ORDINAL_FEATURE_MAP.keys())
        ordinal_categories = list(self.config.ORDINAL_FEATURE_MAP.values())
        
        # Pipeline for numerical features, fill missing values with median
        numeric_transformer = Pipeline(steps = [
            ('imputer', SimpleImputer(strategy = 'median'))
        ])
    
        # Pipeline for skewed numerical features
        skewed_transformer = Pipeline(steps = [
            ('imputer', SimpleImputer(strategy = 'median')),
            ('log', FunctionTransformer(np.log1p, validate = False)),
            ('scaler', StandardScaler())
        ])

        # Pipeline for ordinal features
        ordinal_transformer = Pipeline(steps = [
            ('imputer', SimpleImputer(strategy = 'most_frequent')),
            ('encoder', OrdinalEncoder(
                categories = ordinal_categories,
                handle_unknown = 'use_encoded_value',
                unknown_value = -1
            ))
        ])
    
        # Pipeline for nominal features
        nominal_transformer = Pipeline(steps = [
            ('imputer', SimpleImputer(strategy = 'most_frequent')),
            ('onehot', OneHotEncoder(handle_unknown = 'ignore', sparse_output = False))
        ])
    
        # Combine all pipelines in single ColumnTransformer
        preprocessor = ColumnTransformer(transformers = [
            ('num', numeric_transformer, self.config.NUMERIC_FEATURES),
            ('skewed', skewed_transformer, self.config.SKEWED_FEATURES),
            ('ord', ordinal_transformer, ordinal_features),
            ('nom', nominal_transformer, self.config.NOMINAL_FEATURES)
        ], remainder = 'passthrough')
        return preprocessor

    def tune_hyperparameters(self, X, y):
        """Runs Optuna to find optimal hyperparameters."""
        def objective(trial):
            # Search through different hyperparameters
            params = {
                'n_estimators': trial.suggest_int('n_estimators', 1000, 3000),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.05),
                'max_depth': trial.suggest_int('max_depth', 3, 7),
                'subsample': trial.suggest_float('subsample', 0.6, 0.9),
                'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 0.9),
                'reg_alpha': trial.suggest_float('reg_alpha', 0.01, 1.0),
                'reg_lambda': trial.suggest_float('reg_lambda', 0.01, 10.0),
                'min_child_weight': trial.suggest_float('min_child_weight', 1, 10)
            }
            model = XGBRegressor(**params, n_jobs = -1, random_state = 1)
            pipeline = Pipeline(steps = [
                ('preprocessor', self.preprocessor),
                ('regressor', model)
            ])
            kfold = KFold(n_splits = 5, shuffle = True, random_state = 1)
            # Return positive RMSLE, because Optuna has to minimize it
            rmsle_scorer = make_scorer(lambda y_true_log, y_pred_log: 
                                       mean_squared_log_error(
                                            np.expm1(y_true_log), 
                                            np.expm1(y_pred_log),
                                            squared = False  
                                        ), greater_is_better = False)
            score = -cross_val_score(pipeline, X, y, cv = kfold, scoring = rmsle_scorer).mean()

            return score

        # Optimization loop
        study = optuna.create_study(direction = 'minimize')
        study.optimize(objective, n_trials = self.config.N_TRIALS_OPTUNA, show_progress_bar = True)
        return study.best_params

    def train(self, X, y):
        """Train final model with best hyperparameters."""
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size = 0.2, random_state = 1)

        # Build preprocessor based on final set of features
        self.preprocessor = self._build_preprocessor()

        params = self.config.BEST_PARAMS
        if self.config.TUNE_HYPERPARAMETERS:
            params = self.tune_hyperparameters(X_train, y_train)

        self.model = XGBRegressor(**params, random_state = 1, n_jobs = -1)

        # Build final pipeline
        pipeline = Pipeline(steps = [
            ('preprocessor', self.preprocessor),
            ('regressor', self.model)
        ])
        pipeline.fit(X_train, y_train)

        # Evaluation on train and validation sets
        y_pred_train = pipeline.predict(X_train)
        y_pred_val = pipeline.predict(X_val)  

        # Show performance metrics
        print('Best parameters: \n', params)
        self._print_evaluation_metrics(y_train, y_pred_train, "Training")
        self._print_evaluation_metrics(y_val, y_pred_val, "Validation")

        # Train final model
        pipeline.fit(X, y)
        self.model = pipeline

    def predict(self, X_test):
        """Generate predictions on test data."""
        return self.model.predict(X_test)

    def run(self):
        """Runs whole pipeline."""
        # Load both datasets
        train_df = pd.read_csv('/kaggle/input/house-prices-advanced-regression-techniques/train.csv')
        test_df = pd.read_csv('/kaggle/input/house-prices-advanced-regression-techniques/test.csv')
        test_ids = test_df['Id']
        
        # Separate features and target
        X = train_df.drop(['SalePrice', 'Id'], axis = 1, errors = 'ignore')
        y = np.log1p(train_df['SalePrice'])
        X_test = test_df.drop(['Id'], axis = 1, errors = 'ignore')
        
        # Clean the data
        X = self._clean_data(X)
        X_test = self._clean_data(X_test)
    
        # Add new features to the model
        X = self._apply_feature_engineering(X)
        X_test = self._apply_feature_engineering(X_test)

        # Apply new feature based on grouping houses on different types
        X, X_test = self._add_cluster_features(X, X_test)

        # Remove houses with extremaly skewed features
        X, y = self._filter_outliers(X, y)

        # Reset indices after removing unwanted features
        X = X.reset_index(drop = True)
        y = y.reset_index(drop = True)
        X_test = X_test.reset_index(drop = True)
    
        # Train model
        self.train(X, y)
    
        # Predictions
        final_predictions = self.predict(X_test)
    
        # Generate submission file
        submission = pd.DataFrame({'Id': test_ids, 'SalePrice': np.expm1(final_predictions)})
        submission.to_csv('submission.csv', index = False)

# Main function
if __name__ == '__main__':
    config = Config()
    predictor = HousingPricePredictor(config)
    predictor.run()

Best parameters: 
 {'n_estimators': 2353, 'learning_rate': 0.015, 'max_depth': 3, 'subsample': 0.719, 'colsample_bytree': 0.678, 'reg_alpha': 0.016, 'reg_lambda': 3.652, 'min_child_weight': 5.172}

=== Training metrics ===
R²: 0.9861
RMSLE: 0.0483
MAE: 6167.04

=== Validation metrics ===
R²: 0.9231
RMSLE: 0.1209
MAE: 13139.42
