
# House Price Prediction with External Features (End-to-End)
**Goal:** Predict house prices using a base housing dataset and enrich it with external (scraped) neighborhood features — end-to-end ML workflow.

**Workflow:**
1. Problem Definition
2. Data Collection (sklearn California housing dataset as base + HTML scraping example for extra features)
3. Data Understanding (EDA)
4. Data Preprocessing (missing values, outliers, encoding, scaling)
5. Feature Engineering
6. Splitting Data (70/15/15)
7. Model Training (Linear Regression, Random Forest, XGBoost if available)
8. Model Evaluation (RMSE, MAE, R²) + cross-validation
9. Hyperparameter Tuning (RandomizedSearchCV)
10. Monitoring / Logging demo
11. Save Best Model

> **Note:** The classic Boston dataset is deprecated/removed in `scikit-learn`. We use **California Housing** as the base. External features are scraped from a local HTML table to keep things self-contained.


In [None]:

import warnings, logging, math, os, sys
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split, KFold, cross_val_score, RandomizedSearchCV
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor

# Try XGBoost
try:
    from xgboost import XGBRegressor
    XGB_AVAILABLE = True
except Exception as e:
    XGB_AVAILABLE = False

import joblib

# Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("house_price_project")
logger.info("Logger initialized.")



## 1–2) Problem Definition & Data Collection


In [None]:

# Base dataset: California housing
cal = fetch_california_housing(as_frame=True)
df = cal.frame.rename(columns={
    "MedInc": "median_income",
    "HouseAge": "house_age",
    "AveRooms": "avg_rooms",
    "AveBedrms": "avg_bedrooms",
    "Population": "population",
    "AveOccup": "avg_occupancy",
    "Latitude": "latitude",
    "Longitude": "longitude",
    "MedHouseVal": "median_house_value"
})
logger.info(f"Base dataset shape: {df.shape}")
df.head()


In [None]:

# Assign synthetic cities by lat/long
def assign_city(lat, lon):
    if lat > 36 and lon < -119:
        return "Northwest City"
    elif lat > 36 and lon >= -119:
        return "Northeast City"
    elif lat <= 36 and lon < -119:
        return "Southwest City"
    else:
        return "Southeast City"

df["city"] = [assign_city(lat, lon) for lat, lon in zip(df["latitude"], df["longitude"])]
df["city"].value_counts()



### Create a local HTML file to simulate scraping


In [None]:

# Generate a small HTML file with "scraped" features
html_content = """
<html>
  <body>
    <table class="table_indices">
      <tr><th>City</th><th>CrimeIndex</th><th>SchoolRating</th><th>IncomeScore</th></tr>
      <tr><td>Northwest City</td><td>42.1</td><td>7.8</td><td>68</td></tr>
      <tr><td>Northeast City</td><td>35.5</td><td>8.2</td><td>74</td></tr>
      <tr><td>Southwest City</td><td>58.0</td><td>6.5</td><td>60</td></tr>
      <tr><td>Southeast City</td><td>50.3</td><td>7.0</td><td>63</td></tr>
    </table>
  </body>
</html>
"""
os.makedirs("data_external", exist_ok=True)
with open("data_external/neighborhood_stats.html", "w", encoding="utf-8") as f:
    f.write(html_content)
"data_external/neighborhood_stats.html"


In [None]:

# Scrape the local HTML
from bs4 import BeautifulSoup
with open("data_external/neighborhood_stats.html", "r", encoding="utf-8") as f:
    soup = BeautifulSoup(f.read(), "html.parser")

table = soup.find("table", {"class": "table_indices"})
rows = table.find_all("tr")

scraped_records = []
for row in rows[1:]:
    cols = [c.text.strip() for c in row.find_all(["td", "th"])]
    if len(cols) == 4:
        scraped_records.append({
            "city": cols[0],
            "crime_index": float(cols[1]),
            "school_rating": float(cols[2]),
            "income_score": float(cols[3])
        })

df_scraped = pd.DataFrame(scraped_records)
df_scraped


In [None]:

# Merge scraped features
df_merged = df.merge(df_scraped, on="city", how="left")
logger.info(f"Merged shape: {df_merged.shape}")
df_merged.head()



## 3) Data Understanding (EDA)


In [None]:

df_merged.info()


In [None]:

df_merged.describe().T


In [None]:

df_merged.isna().sum()


In [None]:

plt.figure()
df_merged["median_house_value"].hist()
plt.title("Target Distribution: median_house_value")
plt.xlabel("Value")
plt.ylabel("Frequency")
plt.show()



## 4–5) Preprocessing & Feature Engineering
- Clip outliers (1st–99th quantiles)
- One-hot city
- Scale numerics
- Engineered features:
  - rooms_per_occupant = avg_rooms / (avg_occupancy + 1e-3)
  - crime_x_income = crime_index * median_income
  - school_x_income = school_rating * median_income
  - pop_per_room = population / (avg_rooms + 1e-3)
  - coastal_proximity = max(0, -118.5 - longitude)


In [None]:

from sklearn.base import BaseEstimator, TransformerMixin

class ClipOutliers(BaseEstimator, TransformerMixin):
    def __init__(self, qlow=0.01, qhigh=0.99):
        self.qlow = qlow
        self.qhigh = qhigh
        self.lower_ = None
        self.upper_ = None
        self.columns_ = None

    def fit(self, X, y=None):
        X = pd.DataFrame(X).copy()
        self.columns_ = X.columns
        self.lower_ = X.quantile(self.qlow)
        self.upper_ = X.quantile(self.qhigh)
        return self

    def transform(self, X):
        X = pd.DataFrame(X).copy()
        X = X.clip(lower=self.lower_, upper=self.upper_, axis=1)
        X.columns = self.columns_
        return X

class FeatureEngineer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X["rooms_per_occupant"] = X["avg_rooms"] / (X["avg_occupancy"] + 1e-3)
        X["crime_x_income"] = X["crime_index"] * X["median_income"]
        X["school_x_income"] = X["school_rating"] * X["median_income"]
        X["pop_per_room"] = X["population"] / (X["avg_rooms"] + 1e-3)
        X["coastal_proximity"] = np.maximum(0.0, -118.5 - X["longitude"])
        return X


In [None]:

target_col = "median_house_value"
categorical_cols = ["city"]
numeric_cols = [c for c in df_merged.columns if c not in categorical_cols + [target_col]]

X = df_merged.drop(columns=[target_col])
y = df_merged[target_col]

X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.30, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.50, random_state=42)

numeric_processor = Pipeline(steps=[
    ("clip", ClipOutliers(0.01, 0.99)),
    ("fe", FeatureEngineer()),
    ("scale", StandardScaler())
])

categorical_processor = OneHotEncoder(handle_unknown="ignore")

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_processor, numeric_cols),
        ("cat", categorical_processor, categorical_cols)
    ]
)

def evaluate(model, X_tr, y_tr, X_v, y_v, name="Model"):
    pred_tr = model.predict(X_tr)
    pred_v = model.predict(X_v)
    rmse_tr = np.sqrt(mean_squared_error(y_tr, pred_tr))
    rmse_v = np.sqrt(mean_squared_error(y_v, pred_v))
    mae_v = mean_absolute_error(y_v, pred_v)
    r2_v = r2_score(y_v, pred_v)
    print(f"{name}:")
    print(f"  RMSE (train): {rmse_tr:.3f}")
    print(f"  RMSE (val)  : {rmse_v:.3f}")
    print(f"  MAE  (val)  : {mae_v:.3f}")
    print(f"  R^2  (val)  : {r2_v:.4f}")
    return {"rmse_tr": rmse_tr, "rmse_val": rmse_v, "mae_val": mae_v, "r2_val": r2_v}



## 6–8) Modeling & Evaluation


In [None]:

# Linear Regression
linreg = Pipeline(steps=[("prep", preprocessor), ("model", LinearRegression())])
linreg.fit(X_train, y_train)
metrics_lin = evaluate(linreg, X_train, y_train, X_val, y_val, "LinearRegression")

# Random Forest
rf = Pipeline(steps=[("prep", preprocessor), ("model", RandomForestRegressor(n_estimators=200, random_state=42, n_jobs=-1))])
rf.fit(X_train, y_train)
metrics_rf = evaluate(rf, X_train, y_train, X_val, y_val, "RandomForestRegressor")

# XGBoost (if available)
if XGB_AVAILABLE:
    xgb = Pipeline(steps=[("prep", preprocessor), ("model", XGBRegressor(n_estimators=400, max_depth=6, subsample=0.8,
                                                                        colsample_bytree=0.8, learning_rate=0.05,
                                                                        reg_lambda=1.0, random_state=42, n_jobs=-1,
                                                                        tree_method="hist"))])
    xgb.fit(X_train, y_train)
    metrics_xgb = evaluate(xgb, X_train, y_train, X_val, y_val, "XGBRegressor")
else:
    print("XGBoost not installed; skipping.")


In [None]:

# Cross-validation with RF on training fold
kf = KFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(rf, X_train, y_train, cv=kf, scoring="neg_root_mean_squared_error", n_jobs=-1)
print("RandomForest 5-fold CV RMSE:", -cv_scores)
print("Mean CV RMSE:", np.mean(-cv_scores))



## 9) Hyperparameter Tuning (RandomizedSearchCV on RF)


In [None]:

param_dist = {
    "model__n_estimators": [200, 300, 400, 600, 800],
    "model__max_depth": [None, 10, 15, 20, 30],
    "model__min_samples_split": [2, 5, 10],
    "model__min_samples_leaf": [1, 2, 4],
    "model__max_features": ["sqrt", "log2", None]
}

rf_tune = Pipeline(steps=[("prep", preprocessor), ("model", RandomForestRegressor(random_state=42, n_jobs=-1))])

rs = RandomizedSearchCV(
    rf_tune,
    param_distributions=param_dist,
    n_iter=15,
    scoring="neg_root_mean_squared_error",
    cv=3,
    verbose=1,
    random_state=42,
    n_jobs=-1
)
rs.fit(X_train, y_train)
print("Best params:", rs.best_params_)
print("Best CV score (neg RMSE):", rs.best_score_)

best_rf = rs.best_estimator_
_ = evaluate(best_rf, X_train, y_train, X_val, y_val, "Best RandomForest (Tuned)")



## 8) Final Test Evaluation


In [None]:

val_rmses = [("LinearRegression", metrics_lin["rmse_val"]), ("RandomForest", metrics_rf["rmse_val"])]
models = {"LinearRegression": linreg, "RandomForest": rf}

if 'best_rf' in globals():
    pred_v_best = best_rf.predict(X_val)
    rmse_v_best = np.sqrt(mean_squared_error(y_val, pred_v_best))
    val_rmses.append(("BestRandomForest", rmse_v_best))
    models["BestRandomForest"] = best_rf

if XGB_AVAILABLE:
    val_rmses.append(("XGBRegressor", metrics_xgb["rmse_val"]))
    models["XGBRegressor"] = xgb

best_name, _ = sorted(val_rmses, key=lambda t: t[1])[0]
best_model = models[best_name]
print("Selected best model:", best_name)

pred_test = best_model.predict(X_test)
rmse_test = np.sqrt(mean_squared_error(y_test, pred_test))
mae_test = mean_absolute_error(y_test, pred_test)
r2_test = r2_score(y_test, pred_test)

print(f"Test RMSE: {rmse_test:.3f}")
print(f"Test MAE : {mae_test:.3f}")
print(f"Test R^2 : {r2_test:.4f}")



## 10) Monitoring / Logging Demo


In [None]:

def log_prediction(example_features: dict, model, label="prediction"):
    x_df = pd.DataFrame([example_features])
    y_pred = model.predict(x_df)[0]
    logger.info(f"{label} -> input: {example_features} | predicted_price: {y_pred:.3f}")
    return y_pred

mean_row = X_train.mean(numeric_only=True).to_dict()
mean_row.update({"city": "Northwest City"})
_ = log_prediction(mean_row, best_model, label="demo_prediction")



## 11) Save Best Model


In [None]:

os.makedirs("artifacts", exist_ok=True)
model_path = "artifacts/best_model.joblib"
joblib.dump(best_model, model_path)
model_path



## Inference Helper


In [None]:

def load_model(path="artifacts/best_model.joblib"):
    return joblib.load(path)

def predict_price(model, features: dict):
    df_in = pd.DataFrame([features])
    return float(model.predict(df_in)[0])

mdl = load_model()
example = {
    "median_income": 3.0,
    "house_age": 25.0,
    "avg_rooms": 5.2,
    "avg_bedrooms": 1.0,
    "population": 750.0,
    "avg_occupancy": 3.0,
    "latitude": 35.0,
    "longitude": -120.0,
    "city": "Southwest City",
    "crime_index": 58.0,
    "school_rating": 6.5,
    "income_score": 60.0
}
print("Predicted price:", predict_price(mdl, example))



## Appendix: Real Web Scraping Template

```python
import requests
from bs4 import BeautifulSoup

url = "https://www.example.com/city-stats"  # replace with a real source
resp = requests.get(url, timeout=20)
soup = BeautifulSoup(resp.text, "html.parser")

table = soup.find("table", {"class": "table_indices"})
rows = table.find_all("tr")

scraped_records = []
for row in rows[1:]:
    cols = [c.text.strip() for c in row.find_all(["td", "th"])]
    scraped_records.append({
        "city": cols[0],
        "crime_index": float(cols[1]),
        "school_rating": float(cols[2]),
        "income_score": float(cols[3])
    })

df_scraped = pd.DataFrame(scraped_records)
```
