In [None]:
# Setting up the libraries needed

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from xgboost import XGBRegressor
from sklearn.metrics import mean_absolute_error, make_scorer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder, OrdinalEncoder
from sklearn.model_selection import cross_val_score, RandomizedSearchCV, GridSearchCV, train_test_split

## Reading and Exploring the Data

First, we are going to read the data and then we are going to try to understand its variables and statistics belong it.

In [None]:
# Reading the data

df = pd.read_csv("/kaggle/input/home-data-for-ml-course/train.csv")

pd.set_option("display.max_columns", 999)

In [None]:
print(f"Our data has {df.shape[0]} rows and {df.shape[1]} columns\nAnd the first 5 rows is looking like this:")
df.head()

In [None]:
df.isnull().sum().sort_values(ascending=False).head(20)

As we can see, there are many missing values in our dataset.

The important things to notice here are:

- Variables such as *PoolQC*, *MiscFeature*, *Alley*, etc., are **nominal variables**.  
  The missing values in these columns indicate the *absence* of that particular feature in the house.

- Another important point is that for the *Garage*-related variables, all of them are missing at the same count.  
  This means that these houses **do not have a garage**, which explains why those values are missing.

## Identifying Variable Types

In [None]:
class HouseRules(BaseEstimator, TransformerMixin):
    
    """

    This class handles special domain-specific imputation and feature cleaning rules,
    including:
      - Filling categorical features that indicate absence (e.g., PoolQC, Fence, Alley)
        with "None".
      - Setting numerical features (e.g., Basement or Garage areas) to 0 when the
        corresponding categorical indicators show that the feature is absent.
      - Filling missing 'LotFrontage' values using the median of each Neighborhood
        (learned during fit). If the Neighborhood is unseen, the global median is used.
      - Converting 'MSSubClass' to a string so it is treated as a nominal categorical
        variable instead of numeric.

    The learned medians are stored during `fit()` and applied consistently during
    `transform()`, ensuring no data leakage between training and validation/test sets.
    """

    
    def __init__(self):
        self.nb_lf_med_ = None
        self.lf_med_ = None
        self.none_cats = [
            "PoolQC","MiscFeature","Alley","Fence","FireplaceQu",
            "GarageType","GarageFinish","GarageQual","GarageCond",
            "BsmtQual","BsmtCond","BsmtExposure","BsmtFinType1","BsmtFinType2",
            "MasVnrType"
        ]
        self.bsmt_nums = ["BsmtFinSF1","BsmtFinSF2","BsmtUnfSF",
                          "TotalBsmtSF","BsmtFullBath","BsmtHalfBath"]

    def fit(self, X, y=None):
        X = X.copy()
        if {"Neighborhood","LotFrontage"}.issubset(X.columns):
            self.nb_lf_med_ = X.groupby("Neighborhood")["LotFrontage"].median()
            self.lf_med_ = X["LotFrontage"].median()
        return self

    def transform(self, X):
        X = X.copy()

        # MSSubClass: kod -> nominal metin
        if "MSSubClass" in X.columns:
            X["MSSubClass"] = X["MSSubClass"].astype(str)

        # Var/yok kategorikler → "None"
        for c in self.none_cats:
            if c in X.columns:
                X[c] = X[c].fillna("None")

        # Garaj yoksa sayılsallar 0
        if "GarageQual" in X.columns:
            for c in ["GarageYrBlt","GarageCars","GarageArea"]:
                if c in X.columns:
                    X.loc[X["GarageQual"]=="None", c] = 0

        # MasVnrType None ise alan 0
        if {"MasVnrType","MasVnrArea"}.issubset(X.columns):
            X.loc[X["MasVnrType"]=="None", "MasVnrArea"] = 0

        # Bodrum yoksa bodrum sayılsallar 0
        need = {"BsmtQual","BsmtCond","BsmtExposure","BsmtFinType1","BsmtFinType2"}
        if need.issubset(X.columns):
            mask = (X["BsmtQual"]=="None") & (X["BsmtCond"]=="None") & \
                   (X["BsmtExposure"]=="None") & (X["BsmtFinType1"]=="None") & \
                   (X["BsmtFinType2"]=="None")
            for c in self.bsmt_nums:
                if c in X.columns:
                    X.loc[mask, c] = 0

        # LotFrontage mahalle medyanı
        if self.nb_lf_med_ is not None and {"Neighborhood","LotFrontage"}.issubset(X.columns):
            X["LotFrontage"] = X["LotFrontage"].fillna(X["Neighborhood"].map(self.nb_lf_med_))
            X["LotFrontage"] = X["LotFrontage"].fillna(self.lf_med_)

        return X


In [None]:
ORDINAL_ORDER = {
    "ExterQual": ["Po","Fa","TA","Gd","Ex"],
    "ExterCond": ["Po","Fa","TA","Gd","Ex"],
    "HeatingQC": ["Po","Fa","TA","Gd","Ex"],
    "KitchenQual": ["Po","Fa","TA","Gd","Ex"],
    "FireplaceQu": ["None","Po","Fa","TA","Gd","Ex"],
    "GarageQual": ["None","Po","Fa","TA","Gd","Ex"],
    "GarageCond": ["None","Po","Fa","TA","Gd","Ex"],
    "PoolQC": ["None","Fa","TA","Gd","Ex"],
    "BsmtQual": ["None","Po","Fa","TA","Gd","Ex"],
    "BsmtCond": ["None","Po","Fa","TA","Gd","Ex"],
    "BsmtExposure": ["None","No","Mn","Av","Gd"],
    "BsmtFinType1": ["None","Unf","LwQ","Rec","BLQ","ALQ","GLQ"],
    "BsmtFinType2": ["None","Unf","LwQ","Rec","BLQ","ALQ","GLQ"],
    "GarageFinish": ["None","Unf","RFn","Fin"],
    "PavedDrive": ["N","P","Y"],
    "LotShape": ["IR3","IR2","IR1","Reg"],
    "LandSlope": ["Sev","Mod","Gtl"],
    "Functional": ["Sal","Sev","Maj2","Maj1","Mod","Min2","Min1","Typ"],
    "Utilities": ["ELO","NoSeWa","NoSewr","AllPub"],
    "Fence": ["None","MnWw","GdWo","MnPrv","GdPrv"],
    "OverallQual": list(range(1,11)),   # 1..10
    "OverallCond": list(range(1,11))
}

X = df.drop(["Id", "SalePrice"], axis=1)
y = df["SalePrice"]

ordinal_cols = [c for c in ORDINAL_ORDER if c in X.columns]
nominal_cols = [c for c in X.select_dtypes(include="object").columns if c not in ordinal_cols]

real_numeric = [c for c in X.select_dtypes(include=["int64","float64"]).columns
                if c not in ["OverallQual","OverallCond"]]

In [None]:
# Let's see how our nominal variables are looking like

df[nominal_cols]

## Defining Features and Prediction Target for Our Model

In [None]:
# Making the train-validation split.
X_train, X_valid, y_train, y_valid = train_test_split(X, y, train_size = 0.8)

In [None]:
X_train.head()

In [None]:
# Train data's missing values
X_train.isnull().sum().sort_values(ascending = False).head(6)

In [None]:
# Our nominal cols in training data is looking like this

X_train[nominal_cols]

## Identifying Skewed Variables for Log Transformation

Log transformation is a way to change data that has very large numbers, very small numbers or a skewed shape. It works by taking the logarithm of each number in the data which helps to “compress” the large values and spread out the small ones.

In [None]:
# Creating a temporary data to keep the train data same for future use.
X_train_tmp = X_train[real_numeric].copy()


# NA values must be filled because log transformation only accept non-na values . So lets use the median of our columns to fill them. (training data)
for col in X_train_tmp.columns:
    X_train_tmp[col] = X_train_tmp[col].fillna(X_train_tmp[col].median())

# Defining skewed variables
skewed_features = X_train_tmp.apply(lambda x: x.skew()).sort_values(ascending=False)
print(f"These are the skewed features and their skew value from our data:\n{skewed_features}\n")
high_skewed = skewed_features[skewed_features > 1].index.tolist()

num_cols_for_log = high_skewed
num_cols_no_log = [c for c in real_numeric if c not in num_cols_for_log]
print(f"{num_cols_for_log}\n\nThese columns skews are higher than 1\nWhich means these are highly right skewed, we are going to apply log transformation to them")

In [None]:
# Let's see how our skewed variables distributed

import warnings
with warnings.catch_warnings():
    warnings.simplefilter("ignore", category=FutureWarning)

    plt.figure(figsize=(12, 6))
    for i, col in enumerate(skewed_features[skewed_features >1].index[:15]): 
        plt.subplot(3, 5, i+1)
        sns.histplot(X_train_tmp[col], kde=True)
        plt.title(f"{col} (skew={df[col].skew():.2f})")
    plt.tight_layout()
    plt.show()

## Constructing Our Pipeline for the Model

In [None]:
# If a column is numeric and it needed to be applied log transformation
numeric_log_pipeline = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")), # fill the na values with median value
    ("log", FunctionTransformer(func=np.log1p, feature_names_out="one-to-one")),
])

# If a column is numeric and wont apply log transformation
numeric_plain_pipeline = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median")),
    # no log here
])

# Take the ranked orders from the ordinal order list for every ordinal column and encode it
ordinal_encoder = OrdinalEncoder(
    categories=[ORDINAL_ORDER[c] for c in ordinal_cols],
    handle_unknown = "use_encoded_value", unknown_value = -1
)

# It imputes NA values with mode value
# It encodes each category with an integer value based on its order or ranking
ordinal_pipeline = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")), # fill the na values with the mode value
    ("encode", ordinal_encoder),
])


# If a column is nominal we have to apply label encoding by using OneHotEncoder
# Each row is encoded as 1 or 0 to indicate the presence or absence of a category
nominal_pipeline = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encode", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])


# Clustering all of the pipelines in one column transformer
preprocessor = ColumnTransformer(
    transformers=[
        ("num_log",  numeric_log_pipeline, num_cols_for_log),
        ("num_plain", numeric_plain_pipeline, num_cols_no_log),
        ("ordinal", ordinal_pipeline, ordinal_cols),
        ("nominal", nominal_pipeline, nominal_cols),
    ],
    remainder="passthrough"
)

# Defining our model
model = XGBRegressor(random_state = 0)


# Constructing the full pipeline including column transformer and the model
# Including the HouseRules class for applying all of the special applications for our variables
full_pipeline = Pipeline(steps=[
    ("rules", HouseRules()),
    ("preprocess", preprocessor),
    ("model", XGBRegressor(random_state=0, n_jobs=-1)),
])

### Lets fit our training data and target prediction to our pipeline

By doing this all of these preprocessing steps will be applied:
*  Log transformation applied to skewed features
* Numeric NA values filled with the median value
* Ordinal variables encoded with OrdinalEncoder
* Nominal variables encoded with OneHotEncoder

## Hyperparameter Tuning Using GridSearchCV


Let's use GridSearchCV to find best hyperparameters for our XGBoost model.

In [None]:
def mae_on_original(y_true_log, y_pred_log):
    y_true = np.expm1(y_true_log)   
    y_pred = np.expm1(y_pred_log)
    return mean_absolute_error(y_true, y_pred)

mae_orig_scorer  = make_scorer(mae_on_original,  greater_is_better=False)

param_grid = {
    "model__n_estimators": [100, 500, 1000],
    "model__learning_rate": [0.05, 0.1, 0.5],
    "model__max_depth": [4, 6, 8],
    "model__subsample": [0.7, 0.9],
    "model__colsample_bytree": [0.7, 0.9],
}

grid = GridSearchCV(
    estimator = full_pipeline,
    param_grid = param_grid,
    cv = 5,
    scoring = mae_orig_scorer,
    n_jobs = -1,
    refit = True,
    verbose = 0
)

grid.fit(X_train, np.log1p(y_train))

In [None]:
# Best hyperparameters and their mean score showed.

results = pd.DataFrame(grid.cv_results_)
results[["param_model__colsample_bytree", "param_model__learning_rate", "param_model__max_depth", "param_model__n_estimators", "param_model__subsample", "mean_test_score"]]

In [None]:
print(grid.best_params_)
print("\n",-grid.best_score_)

In [None]:
# Since refit=True in our GridSearchCV our best estimators has been saved and applied to our model
# Thus we are assigning it to a variable

best_pipeline = grid.best_estimator_

In [None]:
# Let's see how our train data looking like after we fitted to the pipeline
# All of the preprocess and encoding methods has been applied

feature_names = best_pipeline.named_steps["preprocess"].get_feature_names_out()
X_train_after = pd.DataFrame(
    best_pipeline.named_steps["preprocess"].transform(X_train),
    columns=feature_names,
    index=X_train.index
)

In [None]:
X_train_after

## Predicting Test data

In [None]:
# Reading the test data

test_df = pd.read_csv("/kaggle/input/home-data-for-ml-course/test.csv.gz")

In [None]:
# First 5 rows of test data

test_df.head()

In [None]:
print(f"Our test data has {test_df.shape[0]} rows and {test_df.shape[1]} columns")

In [None]:
test_df.isnull().sum().sort_values(ascending=False)[0:20]

In [None]:
# Predict the test data (predictions are in log scale)
y_pred_log = grid.predict(test_df)

# Applying the inverse of the log1p transformation to get predictions in the original scale
y_pred = np.expm1(y_pred_log)

In [None]:
submission = pd.DataFrame({
    "Id": test_df["Id"],
    "SalePrice": y_pred
})

submission.to_csv("submission.csv", index=False)