## Get the Data 

### Setup Workspace

In [None]:
# Data manipulation and analysis
import numpy as np
import pandas as pd

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("darkgrid")

# Statistical analysis
from scipy.stats import randint, uniform, loguniform
from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.tools.tools import add_constant

# Data preprocessing and pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from category_encoders import BinaryEncoder

# Model selection and evaluation
from sklearn.model_selection import RandomizedSearchCV, cross_val_score, KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Machine learning models
from sklearn.ensemble import (
    RandomForestRegressor,
    GradientBoostingRegressor,
    StackingRegressor
)
from sklearn.linear_model import Ridge
import xgboost as xgb

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# Load the training dataset
train_original = pd.read_csv("/kaggle/input/home-data-for-ml-course/train.csv", index_col='Id')
test_original = pd.read_csv("/kaggle/input/home-data-for-ml-course/test.csv", index_col='Id')

## Custom Transformers and Pipeline Implementation

### Custom Transformers

In [None]:
class DataTypeCleaner(BaseEstimator, TransformerMixin):
    """Convert categorical attributes masquerading as numerical to objects"""
    
    def __init__(self):
        self.categorical_masqueraders = ['MSSubClass', 'OverallCond', 'OverallQual', 'MoSold']
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        for col in self.categorical_masqueraders:
            if col in X_copy.columns:
                X_copy[col] = X_copy[col].astype(str)
        return X_copy

In [None]:
class CompositeFeatureDropper(BaseEstimator, TransformerMixin):
    """Drop composite features that cause multicollinearity"""
    
    def __init__(self):
        self.variables_to_drop = ['GrLivArea', 'TotalBsmtSF']
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return X.drop(columns=[col for col in self.variables_to_drop if col in X.columns])

In [None]:
class FeatureEngineer(BaseEstimator, TransformerMixin):
    """Create new engineered features"""
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        
        # 1. Total Bathrooms
        X_copy["TotalBathrooms"] = (
            X_copy["FullBath"] + 
            X_copy["HalfBath"] * 0.5 + 
            X_copy["BsmtFullBath"] + 
            X_copy["BsmtHalfBath"] * 0.5
        )
        
        # 2. Basement Value Index
        X_copy['BasementValue'] = (
            X_copy['BsmtFinSF1'] * 0.8 + 
            X_copy['BsmtFinSF2'] * 0.6 + 
            X_copy['BsmtFullBath'] * 300
        )
        
        # 3. Comprehensive Garage Score (handle division by zero)
        garage_year_normalized = X_copy['GarageYrBlt'].replace(0, 2000) / 2000
        X_copy['GarageScore'] = (
            X_copy['GarageCars'] * 
            X_copy['GarageArea'] * 
            garage_year_normalized
        )
        
        return X_copy

In [None]:
class MissingValueHandler(BaseEstimator, TransformerMixin):
    """Handle missing values with domain-specific logic"""
    
    def __init__(self):
        self.neighborhood_medians = None
        self.overall_median = None
        
    def fit(self, X, y=None):
        # Calculate neighborhood medians for LotFrontage
        if 'LotFrontage' in X.columns and 'Neighborhood' in X.columns:
            self.neighborhood_medians = X.groupby('Neighborhood')['LotFrontage'].median()
            self.overall_median = X['LotFrontage'].median()
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        
        # 1. Drop features with high missing values
        features_to_drop = ['PoolQC', 'MiscFeature', 'Alley', 'Fence', 'FireplaceQu']
        X_copy = X_copy.drop(columns=[col for col in features_to_drop if col in X_copy.columns])
        
        # 2. Handle straightforward missing values
        X_copy['MasVnrArea'] = X_copy['MasVnrArea'].fillna(0)
        X_copy['MasVnrType'] = X_copy['MasVnrType'].fillna('None')
        X_copy['Electrical'] = X_copy['Electrical'].fillna('None')
        
        # 3. Handle LotFrontage with neighborhood-based imputation
        if 'LotFrontage' in X_copy.columns and self.neighborhood_medians is not None:
            def impute_lotfrontage(row):
                if pd.isnull(row['LotFrontage']):
                    neighborhood = row['Neighborhood']
                    if neighborhood in self.neighborhood_medians:
                        return self.neighborhood_medians[neighborhood]
                    else:
                        return self.overall_median
                return row['LotFrontage']
            
            X_copy['LotFrontage'] = X_copy.apply(impute_lotfrontage, axis=1)
        
        # 4. Handle garage-related features
        garage_categorical = ['GarageType', 'GarageFinish', 'GarageQual', 'GarageCond']
        for col in garage_categorical:
            if col in X_copy.columns:
                X_copy[col] = X_copy[col].fillna('None')
        
        garage_numerical = ['GarageCars', 'GarageArea', 'GarageYrBlt']
        for col in garage_numerical:
            if col in X_copy.columns:
                X_copy[col] = X_copy[col].fillna(0)
        
        # 5. Handle basement-related features
        basement_categorical = ['BsmtQual', 'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2']
        for col in basement_categorical:
            if col in X_copy.columns:
                X_copy[col] = X_copy[col].fillna('None')
        
        basement_numerical = ['BsmtFinSF1', 'BsmtFinSF2', 'BsmtUnfSF', 'BsmtFullBath', 'BsmtHalfBath']
        for col in basement_numerical:
            if col in X_copy.columns:
                X_copy[col] = X_copy[col].fillna(0)
        
        return X_copy

In [None]:
class CategoricalEncoder(BaseEstimator, TransformerMixin):
    """Handle all categorical encoding with proper strategy"""
    
    def __init__(self):
        self.binary_encoder = None
        self.ordinal_mappings = {
            'OverallQual': {'1': 1, '2': 2, '3': 3, '4': 4, '5': 5, 
                            '6': 6, '7': 7, '8': 8, '9': 9, '10': 10},
            'OverallCond': {'1': 1, '2': 2, '3': 3, '4': 4, '5': 5, 
                            '6': 6, '7': 7, '8': 8, '9': 9, '10': 10},
            'ExterQual': {'Po': 1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5},
            'ExterCond': {'Po': 1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5},
            'BsmtQual': {'None': 0, 'Po': 1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5},
            'BsmtCond': {'None': 0, 'Po': 1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5},
            'BsmtExposure': {'None': 0, 'No': 1, 'Mn': 2, 'Av': 3, 'Gd': 4},
            'BsmtFinType1': {'None': 0, 'Unf': 1, 'LwQ': 2, 'Rec': 3, 'BLQ': 4, 'ALQ': 5, 'GLQ': 6},
            'BsmtFinType2': {'None': 0, 'Unf': 1, 'LwQ': 2, 'Rec': 3, 'BLQ': 4, 'ALQ': 5, 'GLQ': 6},
            'HeatingQC': {'Po': 1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5},
            'KitchenQual': {'Po': 1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5},
            'Functional': {'Sal': 1, 'Sev': 2, 'Maj2': 3, 'Maj1': 4, 'Mod': 5, 'Min2': 6, 'Min1': 7, 'Typ': 8},
            'GarageFinish': {'None': 0, 'Unf': 1, 'RFn': 2, 'Fin': 3},
            'GarageQual': {'None': 0, 'Po': 1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5},
            'GarageCond': {'None': 0, 'Po': 1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5},
            'PavedDrive': {'N': 0, 'P': 1, 'Y': 2},
            'Utilities': {'ELO': 1, 'NoSeWa': 2, 'NoSewr': 3, 'AllPub': 4},
            'LandSlope': {'Sev': 1, 'Mod': 2, 'Gtl': 3},
            'Electrical': {'None': 0, 'FuseP': 1, 'FuseF': 2, 'FuseA': 3, 'Mix': 4, 'SBrkr': 5}
        }
        self.label_encoders = {}
        self.onehot_columns = None
        # Store default values for ordinal features (most common/middle value)
        self.ordinal_defaults = {
            'KitchenQual': 3,  # TA (typical/average)
            'Functional': 8,   # Typ (typical)
            'Utilities': 4,    # AllPub (most common)
            'HeatingQC': 3,    # TA (typical/average)
            'ExterQual': 3,    # TA (typical/average)
            'ExterCond': 3,    # TA (typical/average)
        }
        
    def fit(self, X, y=None):
        # Identify categorical columns
        cat_cols = X.select_dtypes(include=['object']).columns.tolist()
        
        # Define encoding strategies
        self.high_cardinality_nominal = ['Neighborhood', 'Exterior1st', 'Exterior2nd', 'MSSubClass', 'MoSold']
        self.high_cardinality_nominal = [col for col in self.high_cardinality_nominal if col in cat_cols]
        
        nominal_features = [
            "MSZoning", "Street", "LotShape", "LandContour", "LotConfig", "Neighborhood",
            "Condition1", "Condition2", "BldgType", "HouseStyle", "RoofStyle", "RoofMatl",
            "Exterior1st", "Exterior2nd", "MasVnrType", "Foundation", "Heating", "CentralAir",
            "GarageType", "SaleType", "SaleCondition", "MSSubClass", "MoSold"
        ]
        
        self.low_cardinality_nominal = [col for col in nominal_features 
                                       if col in cat_cols and col not in self.high_cardinality_nominal]
        
        ordinal_features = [
            "Utilities", "Electrical", "LandSlope", "OverallQual", "OverallCond", "ExterQual",
            "ExterCond", "BsmtQual", "BsmtCond", "BsmtExposure", "BsmtFinType1", "BsmtFinType2",
            "HeatingQC", "KitchenQual", "Functional", "GarageFinish", "GarageQual", "GarageCond", "PavedDrive"
        ]
        self.ordinal_features = [col for col in ordinal_features if col in cat_cols]
        
        # Fit binary encoder for high cardinality features
        if self.high_cardinality_nominal:
            self.binary_encoder = BinaryEncoder(cols=self.high_cardinality_nominal, return_df=True)
            self.binary_encoder.fit(X)
        
        # One-hot encoding: store dummy columns from training data
        if self.low_cardinality_nominal:
            dummies = pd.get_dummies(X[self.low_cardinality_nominal], 
                                   prefix=self.low_cardinality_nominal,
                                   drop_first=True)
            self.onehot_columns = dummies.columns.tolist()
        
        # Fit label encoders for ordinal features without predefined mappings
        for feature in self.ordinal_features:
            if feature not in self.ordinal_mappings:
                le = LabelEncoder()
                le.fit(X[feature].astype(str))
                self.label_encoders[feature] = le
        
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        
        # 1. Binary encoding for high cardinality nominal features
        if self.binary_encoder is not None:
            X_copy = self.binary_encoder.transform(X_copy)
        
        # 2. One-hot encoding for low cardinality nominal features
        if self.low_cardinality_nominal:
            # Create dummies for current data
            dummies = pd.get_dummies(X_copy[self.low_cardinality_nominal], 
                                   prefix=self.low_cardinality_nominal,
                                   drop_first=True)
            
            # Drop original categorical columns
            for col in self.low_cardinality_nominal:
                X_copy.drop(col, axis=1, inplace=True)
            
            # Reindex to match training columns (adds missing cols with 0, removes extra cols)
            dummies = dummies.reindex(columns=self.onehot_columns, fill_value=0)
            
            # Concatenate back to main dataframe
            X_copy = pd.concat([X_copy, dummies], axis=1)
        
        # 3. Ordinal encoding with proper handling of missing/unknown values
        for feature in self.ordinal_features:
            if feature in self.ordinal_mappings:
                # Use predefined mapping
                X_copy[feature] = X_copy[feature].map(self.ordinal_mappings[feature])
                
                # Fill any remaining NaN values with default value
                if X_copy[feature].isnull().any():
                    default_value = self.ordinal_defaults.get(feature, 
                                                            list(self.ordinal_mappings[feature].values())[len(self.ordinal_mappings[feature])//2])
                    X_copy[feature] = X_copy[feature].fillna(default_value)
            else:
                # Use fitted label encoder
                if feature in self.label_encoders:
                    # Handle unknown categories by assigning them to the most frequent class
                    le = self.label_encoders[feature]
                    known_categories = set(le.classes_)
                    
                    # Transform known values
                    mask = X_copy[feature].astype(str).isin(known_categories)
                    X_copy.loc[mask, feature] = le.transform(X_copy.loc[mask, feature].astype(str))
                    
                    # Handle unknown values
                    if not mask.all():
                        # Use the most frequent class (first class in LabelEncoder)
                        unknown_value = le.transform([le.classes_[0]])[0]
                        print(f"Warning: Found {(~mask).sum()} unknown values in {feature}, filling with {unknown_value}")
                        X_copy.loc[~mask, feature] = unknown_value
        
        return X_copy

### Complete Preprocessing Pipeline

In [None]:
def create_preprocessing_pipeline():
    """Create the complete preprocessing pipeline"""
    
    pipeline = Pipeline([
        ('dtype_cleaner', DataTypeCleaner()),
        ('missing_handler', MissingValueHandler()),
        ('feature_engineer', FeatureEngineer()),
        ('composite_dropper', CompositeFeatureDropper()),
        ('categorical_encoder', CategoricalEncoder())
    ])
    
    return pipeline

### Apply Pipeline to Training Data

In [None]:
# Create and fit the preprocessing pipeline
preprocessing_pipeline = create_preprocessing_pipeline()

# Load original training data
X_train_original = train_original.drop(['SalePrice'], axis=1)
y_train = train_original['SalePrice']

# Fit and transform training data
X_train_processed = preprocessing_pipeline.fit_transform(X_train_original)

print(f"Original training shape: {X_train_original.shape}")
print(f"Processed training shape: {X_train_processed.shape}")

if hasattr(X_train_processed, 'isnull'):
    print(f"Missing values in processed training data: {X_train_processed.isnull().sum().sum()}")
else:
    print("Processed training data has no null-check support (likely a NumPy array)")

### Apply Pipeline to Test Data

In [None]:
# Load and process test data
X_test_processed = preprocessing_pipeline.transform(test_original)
print(f"Processed test shape: {X_test_processed.shape}")

if hasattr(X_test_processed, 'isnull'):
    print(f"Missing values in processed test data: {X_test_processed.isnull().sum().sum()}")
else:
    print("Processed test data has no null-check support (likely a NumPy array)")

### Align Train and Test Features

In [None]:
# Ensure both datasets have the same columns
train_cols = set(X_train_processed.columns)
test_cols = set(X_test_processed.columns)

# Columns in train but not in test
missing_in_test = train_cols - test_cols
if missing_in_test:
    print(f"Adding missing columns to test set: {missing_in_test}")
    for col in missing_in_test:
        X_test_processed[col] = 0  # Add with default value

# Columns in test but not in train
extra_in_test = test_cols - train_cols
if extra_in_test:
    print(f"Removing extra columns from test set: {extra_in_test}")
    X_test_processed = X_test_processed.drop(columns=list(extra_in_test))

# Reorder test columns to match train
X_test_processed = X_test_processed[X_train_processed.columns]

print(f"Final aligned shapes - Train: {X_train_processed.shape}, Test: {X_test_processed.shape}")

### Train Final Model and Make Predictions

### Define Best Models from Analysis

In [None]:
# Best GradientBoosting model from hyperparameter tuning
best_gb = GradientBoostingRegressor(
    learning_rate=0.04210216968916263,
    max_depth=7,
    max_features='log2',
    min_samples_leaf=2,
    min_samples_split=10,
    n_estimators=295,
    subsample=0.8771561434767757,
    random_state=42
)

# Best XGBoost model from hyperparameter tuning
best_xgb = xgb.XGBRegressor(
    colsample_bytree=0.7553073535959489,
    gamma=6.841604512504889e-07,
    learning_rate=0.004557854806921613,
    max_depth=7,
    min_child_weight=9,
    n_estimators=839,
    reg_alpha=7.5144413702907125,
    reg_lambda=0.019206818224206303,
    subsample=0.8144385465376481,
    random_state=42,
    n_jobs=-1
)

### Create Best Ensemble Model (Stacking with Ridge)

In [None]:
# Create the best performing ensemble: Stacking with Ridge
final_ensemble = StackingRegressor(
    estimators=[
        ('gb', best_gb),
        ('xgb', best_xgb)
    ],
    final_estimator=Ridge(alpha=1.0),
    cv=5,
    n_jobs=-1
)

print("Created Stacking Ensemble with Ridge meta-learner")
print("Base models: GradientBoosting + XGBoost")

### Cross-Validation of Final Model

In [None]:
# Perform cross-validation to confirm performance
cv_folds = KFold(n_splits=5, shuffle=True, random_state=42)

print("Performing cross-validation on final ensemble...")
cv_scores = cross_val_score(
    final_ensemble, X_train_processed, y_train,
    cv=cv_folds, scoring='neg_mean_squared_error', n_jobs=-1
)

rmse_scores = np.sqrt(-cv_scores)
print(f"\nFinal Ensemble Cross-Validation Results:")
print(f"RMSE: {rmse_scores.mean():.0f} (+/- {rmse_scores.std() * 2:.0f})")
print(f"Individual folds: {[f'{score:.0f}' for score in rmse_scores]}")


### Train Final Model on Full Dataset

In [None]:
print("\nTraining final ensemble on complete training dataset...")
final_ensemble.fit(X_train_processed, y_train)

# Make predictions on test set
print("Making predictions on test set...")
test_predictions = final_ensemble.predict(X_test_processed)

### Create Submission File

In [None]:
# Create submission dataframe
submission = pd.DataFrame({
    'Id': test_original.index,
    'SalePrice': test_predictions
})

# Save submission file
submission.to_csv('housing_price_predictions.csv', index=False)