# Comprehensive k-NN Assignment — Real Estate Price Prediction
**Student:** Mike Maurrasse  
**Filename:** `maurrasse_mike_knn_assignment.ipynb`  
**Date:** 2025-08-31

> Full pipeline with custom k-NN, EDA, preprocessing, manual calculations, tuning, and analysis. All plots use matplotlib.

## Part 0 — Setup

In [None]:

# Basic imports
import os, math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.neighbors import KNeighborsRegressor
from sklearn.datasets import fetch_california_housing

pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 120)

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

print("Environment initialized.")


## Part 1 — Data Loading and Exploration

In [None]:

# 1.1 Data Loading and Initial Inspection
# First, try to fetch California Housing. If that fails, fall back to local CSV.
import pandas as pd

try:
    from sklearn.datasets import fetch_california_housing
    cali = fetch_california_housing(as_frame=True)
    df = cali.frame.copy()
    df.columns = ['MedInc','HouseAge','AveRooms','AveBedrms','Population','AveOccup','Latitude','Longitude','MedHouseVal']
    target_col = 'MedHouseVal'
    print("Loaded California Housing dataset from sklearn.")
except Exception as e:
    print("Fetching California Housing failed. Using synthetic_california_housing.csv fallback. Error:", e)
    df = pd.read_csv("synthetic_california_housing.csv")
    target_col = "MedHouseVal"

print("Shape:", df.shape)
display(df.head())
display(df.tail())
display(df.info())
display(df.describe().T)
print("Target variable:", target_col)
numeric_features = [c for c in df.columns if c != target_col]
print("Numeric features:", numeric_features)


### 1.2 Comprehensive EDA

#### A. Target Variable Analysis

In [None]:

# Target distribution
fig = plt.figure()
plt.hist(df[target_col], bins=40)
plt.title("Target Distribution: " + target_col)
plt.xlabel(target_col); plt.ylabel("Count")
plt.show()

# Boxplot
fig = plt.figure()
plt.boxplot(df[target_col], vert=True)
plt.title("Target Boxplot: " + target_col)
plt.ylabel(target_col)
plt.show()

# Summary + simple IQR outlier estimate
print("Summary stats for target:")
display(df[target_col].describe())
q1, q3 = df[target_col].quantile([0.25, 0.75])
iqr = q3 - q1
lower, upper = q1 - 1.5*iqr, q3 + 1.5*iqr
print("IQR bounds:", lower, upper)
print("Potential outliers (count):", ((df[target_col] < lower) | (df[target_col] > upper)).sum())


#### B. Feature Analysis

In [None]:

# Histograms for features
for c in numeric_features:
    fig = plt.figure()
    plt.hist(df[c], bins=40)
    plt.title(f"Feature Distribution: {c}")
    plt.xlabel(c); plt.ylabel("Count")
    plt.show()

# Correlation heatmap
corr = df[numeric_features + [target_col]].corr()
fig = plt.figure()
plt.imshow(corr, interpolation='nearest')
plt.xticks(range(len(corr.columns)), corr.columns, rotation=90)
plt.yticks(range(len(corr.columns)), corr.columns)
plt.colorbar()
plt.title("Correlation Matrix")
plt.tight_layout()
plt.show()

# Scatter vs target
for c in numeric_features:
    fig = plt.figure()
    plt.scatter(df[c], df[target_col], s=6)
    plt.xlabel(c); plt.ylabel(target_col)
    plt.title(f"{c} vs {target_col}")
    plt.show()


#### C. Geographic Analysis

In [None]:

fig = plt.figure()
plt.scatter(df['Longitude'], df['Latitude'], c=df[target_col], s=6)
plt.xlabel("Longitude"); plt.ylabel("Latitude")
plt.title("Geography: Price colored by target")
plt.show()


#### D. Feature Relationships

In [None]:

corr_target = df[numeric_features + [target_col]].corr()[target_col].drop(target_col).abs().sort_values(ascending=False)
top3 = list(corr_target.head(3).index)
print("Top 3 features by |corr| with target:", top3)

for c in top3:
    fig = plt.figure()
    x = df[c].values; y = df[target_col].values
    plt.scatter(x, y, s=6)
    m, b = np.polyfit(x, y, 1)
    xs = np.linspace(x.min(), x.max(), 120)
    ys = m*xs + b
    plt.plot(xs, ys)
    plt.xlabel(c); plt.ylabel(target_col)
    plt.title(f"{c} vs {target_col} with trend line")
    plt.show()

# Multicollinearity (top correlated pairs among features)
corr_no_target = df[numeric_features].corr().abs()
upper = corr_no_target.where(np.triu(np.ones(corr_no_target.shape), k=1).astype(bool))
top_pairs = upper.stack().sort_values(ascending=False).head(10)
display(top_pairs)


**EDA Write-up (2–3 paragraphs):** Summarize target distribution and outliers; strongest feature relations; geographic patterns; expected modeling challenges (skew/outliers/multicollinearity) and scaling choices.

## Part 2 — Data Cleaning and Preprocessing

### 2.1 Missing Value Analysis

In [None]:

missing = df.isna().sum()
print("Missing values per column:")
display(missing)
# Document handling decisions here if any missing are found.


### 2.2 Outlier Detection and Handling

In [None]:

def detect_outliers_iqr(data, column, factor=1.5):
    # IQR-based boolean mask
    q1 = data[column].quantile(0.25)
    q3 = data[column].quantile(0.75)
    iqr = q3 - q1
    lower, upper = q1 - factor*iqr, q3 + factor*iqr
    return (data[column] < lower) | (data[column] > upper)

def detect_outliers_zscore(data, column, threshold=3.0):
    # Z-score based boolean mask
    vals = data[column].astype(float).values
    mu, sigma = np.mean(vals), np.std(vals)
    if sigma == 0:
        return pd.Series([False]*len(vals), index=data.index)
    z = (vals - mu) / sigma
    return pd.Series(np.abs(z) > threshold, index=data.index)

iqr_flags = pd.DataFrame(index=df.index)
z_flags = pd.DataFrame(index=df.index)
for c in numeric_features:
    iqr_flags[c] = detect_outliers_iqr(df, c, factor=1.5)
    z_flags[c] = detect_outliers_zscore(df, c, threshold=3.0)

print("IQR outlier counts per feature:")
display(iqr_flags.sum())
print("Z-score outlier counts per feature:")
display(z_flags.sum())

# Example before/after hist for the most flagged feature by IQR
focus_feat = iqr_flags.sum().sort_values(ascending=False).index[0]
fig = plt.figure(); plt.hist(df[focus_feat], bins=40); plt.title(f"Before: {focus_feat}"); plt.show()

clean_df = df.copy()
mask_remove = iqr_flags[focus_feat] & z_flags[focus_feat]
clean_df = clean_df.loc[~mask_remove].reset_index(drop=True)

fig = plt.figure(); plt.hist(clean_df[focus_feat], bins=40); plt.title(f"After (example): {focus_feat}"); plt.show()

print("Impact on dataset size (example policy):", df.shape, "->", clean_df.shape)
final_df = df.copy()  # keep full data by default; justify your final policy in write-up.


### 2.3 Feature Engineering

In [None]:

def haversine(lat1, lon1, lat2, lon2):
    # Haversine distance in kilometers
    R = 6371.0
    p1, p2 = np.radians(lat1), np.radians(lat2)
    dphi = np.radians(lat2 - lat1)
    dl = np.radians(lon2 - lon1)
    a = np.sin(dphi/2)**2 + np.cos(p1)*np.cos(p2)*np.sin(dl/2)**2
    return 2*R*np.arcsin(np.sqrt(a))

aug = final_df.copy()
aug['rooms_per_household'] = aug['AveRooms'] / aug['AveOccup']
aug['bedrooms_per_room'] = aug['AveBedrms'] / aug['AveRooms']
aug['population_per_household'] = aug['Population'] / aug['AveOccup']

aug['distance_to_LA'] = haversine(aug['Latitude'], aug['Longitude'], 34.0522, -118.2437)
aug['distance_to_SF'] = haversine(aug['Latitude'], aug['Longitude'], 37.7749, -122.4194)
aug['coastal_proximity'] = (aug['Longitude'] > -121).astype(int)

def income_category(v):
    if v < 3: return "Low"
    if v < 6: return "Medium"
    if v < 9: return "High"
    return "Very High"

def house_age_category(v):
    if v < 10: return "New"
    if v < 30: return "Medium"
    return "Old"

aug['income_category'] = aug['MedInc'].apply(income_category)
aug['house_age_category'] = aug['HouseAge'].apply(house_age_category)
display(aug.head())


## Part 3 — Custom k-NN Implementation

### 3.1 Distance Metrics

In [None]:

def euclidean_distance(point1, point2):
    # Euclidean distance between two 1D arrays
    diff = point1 - point2
    return float(np.sqrt(np.dot(diff, diff)))

def manhattan_distance(point1, point2):
    # Manhattan (L1) distance
    return float(np.sum(np.abs(point1 - point2)))

def minkowski_distance(point1, point2, p=2):
    # Minkowski distance of order p
    return float(np.power(np.sum(np.abs(point1 - point2)**p), 1.0/p))

# Quick checks
a = np.array([0.,0.]); b = np.array([3.,4.])
print("Euclidean((0,0),(3,4)) =", euclidean_distance(a,b))
print("Manhattan((0,0),(3,4)) =", manhattan_distance(a,b))
print("Minkowski p=3 =", minkowski_distance(a,b,p=3))


### 3.2 k-NN Class

In [None]:

class CustomKNN:
    # Custom k-NN regressor supporting Euclidean, Manhattan, or Minkowski distance and uniform or distance weights.
    def __init__(self, k=5, distance_metric='euclidean', weights='uniform', minkowski_p=2):
        self.k = int(k)
        self.distance_metric = distance_metric
        self.weights = weights
        self.minkowski_p = minkowski_p
        self.X_train = None
        self.y_train = None

    def fit(self, X, y):
        # Store training data
        self.X_train = np.asarray(X, dtype=float)
        self.y_train = np.asarray(y, dtype=float)
        if self.X_train.ndim != 2: raise ValueError("X must be 2D")
        if self.y_train.ndim != 1: raise ValueError("y must be 1D")
        if len(self.X_train) != len(self.y_train): raise ValueError("X and y length mismatch")
        return self

    def _calculate_distance(self, p1, p2):
        # Compute distance according to selected metric
        if self.distance_metric == 'euclidean':
            return euclidean_distance(p1, p2)
        if self.distance_metric == 'manhattan':
            return manhattan_distance(p1, p2)
        if self.distance_metric == 'minkowski':
            return minkowski_distance(p1, p2, p=self.minkowski_p)
        raise ValueError("Unsupported distance_metric")

    def _get_neighbors(self, test_point):
        # Return indices and distances of k nearest neighbors
        dists = np.array([self._calculate_distance(test_point, tr) for tr in self.X_train])
        idx = np.argsort(dists)[:self.k]
        return idx, dists[idx]

    def predict_single(self, test_point):
        # Predict for one point (uniform or distance weights)
        idx, d = self._get_neighbors(test_point)
        y_neighbors = self.y_train[idx]
        if self.weights == 'uniform':
            return float(np.mean(y_neighbors))
        if self.weights == 'distance':
            if np.any(d == 0):  # exact match
                return float(y_neighbors[d == 0][0])
            w = 1.0 / d
            return float(np.sum(w * y_neighbors) / np.sum(w))
        raise ValueError("Unsupported weights")

    def predict(self, X_test):
        X_test = np.asarray(X_test, dtype=float)
        return np.array([self.predict_single(x) for x in X_test])

    def score(self, X_test, y_test):
        y_pred = self.predict(X_test)
        return r2_score(y_test, y_pred)


## Part 4 — Manual Calculations (Proof of Understanding)

### 4.1 Distance Calculations — Fill in by hand and verify below

In [None]:

# After building X later, select three rows by index and print step-by-step math.
def print_manual_distance_steps(p1, p2, p3, feature_names):
    def fmt(v): return "[" + ", ".join(f"{x:.4f}" for x in v) + "]"
    print("Point1:", fmt(p1)); print("Point2:", fmt(p2)); print("Point3:", fmt(p3))
    # Euclidean P1-P2
    dif12 = p1 - p2
    print("\nEuclidean(P1,P2) = sqrt(sum (dif^2))")
    print("Diffs:", fmt(dif12))
    print("Squares:", fmt(dif12**2))
    print("Sum squares:", float(np.sum(dif12**2)))
    print("Euclidean =", float(np.sqrt(np.sum(dif12**2))), " | verify:", euclidean_distance(p1,p2))
    # Manhattan P1-P3
    dif13 = np.abs(p1 - p3)
    print("\nManhattan(P1,P3) = sum |dif|")
    print("Abs diffs:", fmt(dif13))
    print("Sum abs diffs:", float(np.sum(dif13)))
    print("Verify:", manhattan_distance(p1,p3))
    # Minkowski p=3 P2-P3
    dif23 = np.abs(p2 - p3)
    print("\nMinkowski p=3 (P2,P3) = (sum |dif|^3)^(1/3)")
    print("Abs diffs:", fmt(dif23))
    print("Cubes:", fmt(dif23**3))
    print("Sum cubes:", float(np.sum(dif23**3)))
    print("Minkowski p=3 =", float(np.power(np.sum(dif23**3), 1/3)), " | verify:", minkowski_distance(p2,p3,p=3))

print("After feature matrix is built, select 3 indices and call print_manual_distance_steps(...) to include in your PDF.")


### 4.2 k-NN Prediction Walkthrough — Manual Neighbor Finding

In [None]:

def manual_knn_demo(X_train, y_train, test_point, k=5, metric='euclidean'):
    def d(a,b):
        if metric == 'euclidean': return euclidean_distance(a,b)
        if metric == 'manhattan': return manhattan_distance(a,b)
        return minkowski_distance(a,b,p=2)
    dists = np.array([d(test_point, tr) for tr in X_train])
    order = np.argsort(dists)
    print("First 10 distances (idx, dist, y):")
    for i in range(10):
        idx = order[i]
        print(f"{i+1:2d}. idx={idx}, dist={dists[idx]:.6f}, y={y_train[idx]:.4f}")
    k_idx = order[:k]
    k_d = dists[k_idx]
    k_y = y_train[k_idx]
    print("\n5 Nearest Neighbors:")
    for r,(idx,di,yi) in enumerate(zip(k_idx,k_d,k_y), start=1):
        print(f"{r}. idx={idx}, dist={di:.6f}, target={yi:.4f}")
    # Uniform vs distance-weighted
    uniform_pred = float(np.mean(k_y))
    if np.any(k_d == 0): distance_pred = float(k_y[k_d==0][0])
    else:
        w = 1.0/k_d
        distance_pred = float(np.sum(w*k_y)/np.sum(w))
    print("\nUniform Prediction:", uniform_pred)
    print("Distance-weighted Prediction:", distance_pred)
    return uniform_pred, distance_pred


## Part 5 — Model Evaluation and Hyperparameter Tuning

### 5.1 Train–Test Split and Scaling

In [None]:

# Build modeling matrix with engineered features and one-hot categories
X_base = aug.drop(columns=[target_col]).copy()
# One-hot encode categorical features
cat_df = pd.get_dummies(X_base[['income_category','house_age_category']], drop_first=True)
X_num = X_base.drop(columns=['income_category','house_age_category'])
X = pd.concat([X_num, cat_df], axis=1)
y = final_df[target_col].values.astype(float)

X_train, X_test, y_train, y_test = train_test_split(X.values, y, test_size=0.2, random_state=RANDOM_STATE)

scalers = {"standard": StandardScaler(), "minmax": MinMaxScaler()}
scaled_data = {}
for name, sc in scalers.items():
    sc_fit = sc.fit(X_train)
    scaled_data[name] = (sc_fit, sc_fit.transform(X_train), sc_fit.transform(X_test))
print("Prepared StandardScaler and MinMaxScaler versions. Justify your scaling choice in write-up.")


### 5.2 Manual Grid Search (5-fold CV)

In [None]:

def manual_grid_search(X_train, y_train, X_val, y_val, param_grid):
    results = []
    kf = KFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    for k in param_grid.get('k', [5]):
        for dm in param_grid.get('distance_metric', ['euclidean']):
            for w in param_grid.get('weights', ['uniform']):
                # CV
                cv_scores = []
                for tr_idx, va_idx in kf.split(X_train):
                    Xtr, Xva = X_train[tr_idx], X_train[va_idx]
                    ytr, yva = y_train[tr_idx], y_train[va_idx]
                    model = CustomKNN(k=k, distance_metric=dm, weights=w)
                    model.fit(Xtr, ytr)
                    yva_pred = model.predict(Xva)
                    cv_scores.append(r2_score(yva, yva_pred))
                cv_mean = float(np.mean(cv_scores))
                # Hold-out validation
                model = CustomKNN(k=k, distance_metric=dm, weights=w)
                model.fit(X_train, y_train)
                val_score = float(r2_score(y_val, model.predict(X_val)))
                results.append({'k':k, 'distance_metric':dm, 'weights':w, 'cv_mean_r2':cv_mean, 'val_r2':val_score})
    res_df = pd.DataFrame(results).sort_values(by=['val_r2','cv_mean_r2'], ascending=False)
    best = res_df.iloc[0].to_dict()
    best_params = {'k': int(best['k']), 'distance_metric': best['distance_metric'], 'weights': best['weights']}
    return best_params, res_df

# Use StandardScaler set for tuning
scaler, Xtr_s, Xte_s = scaled_data['standard']
Xtr_g, Xval_g, ytr_g, yval_g = train_test_split(Xtr_s, y_train, test_size=0.2, random_state=RANDOM_STATE)

param_grid = {'k':[3,5,7,9,11,15,21], 'distance_metric':['euclidean','manhattan'], 'weights':['uniform','distance']}
best_params, results_df = manual_grid_search(Xtr_g, ytr_g, Xval_g, yval_g, param_grid)
print("Best params:", best_params)
display(results_df.head(10))

# Simple visualization: CV mean vs k
fig = plt.figure()
plt.plot(results_df['k'], results_df['cv_mean_r2'], 'o')
plt.xlabel("k"); plt.ylabel("CV mean R^2"); plt.title("CV mean R^2 vs k (all combos)")
plt.show()


### 5.3 Performance Analysis

In [None]:

# Final model and metrics
final_model = CustomKNN(**best_params)
final_model.fit(Xtr_s, y_train)
y_pred_test = final_model.predict(Xte_s)
print("Final Test R^2:", r2_score(y_test, y_pred_test))
print("Test RMSE:", mean_squared_error(y_test, y_pred_test, squared=False))

# Learning curves vs k (using test as generalization proxy here)
k_values = [3,5,7,9,11,15,21]
train_scores, test_scores = [], []
for k in k_values:
    m = CustomKNN(k=k, distance_metric=best_params['distance_metric'], weights=best_params['weights'])
    m.fit(Xtr_s, y_train)
    train_scores.append(r2_score(y_train, m.predict(Xtr_s)))
    test_scores.append(r2_score(y_test, m.predict(Xte_s)))

fig = plt.figure()
plt.plot(k_values, train_scores, marker='o', label='Train R^2')
plt.plot(k_values, test_scores, marker='o', label='Test R^2')
plt.xlabel("k"); plt.ylabel("R^2"); plt.title("Learning Curve: R^2 vs k"); plt.legend()
plt.show()

# Distance metric comparison
for dm in ['euclidean','manhattan']:
    m = CustomKNN(k=best_params['k'], distance_metric=dm, weights=best_params['weights'])
    m.fit(Xtr_s, y_train)
    print(f"{dm} -> Test R^2: {r2_score(y_test, m.predict(Xte_s)):.4f}")

# Residual analysis
resid = y_test - y_pred_test
fig = plt.figure(); plt.hist(resid, bins=40); plt.title("Residuals"); plt.xlabel("Residual"); plt.ylabel("Count"); plt.show()
fig = plt.figure(); plt.scatter(y_pred_test, resid, s=6); plt.axhline(0); plt.xlabel("Predicted"); plt.ylabel("Residual"); plt.title("Residuals vs Predicted"); plt.show()

# Neighbor "feature contribution" proxy
def neighbor_feature_contributions(Xref, Xpts, k=5, max_points=200):
    n = Xref.shape[1]
    contrib = np.zeros(n)
    take = min(max_points, len(Xpts))
    for i in range(take):
        x = Xpts[i]
        d = np.linalg.norm(Xref - x, axis=1)
        idx = np.argsort(d)[:k]
        diffs = np.abs(Xref[idx] - x)
        rs = diffs.sum(axis=1, keepdims=True); rs[rs==0] = 1.0
        frac = diffs / rs
        contrib += frac.mean(axis=0)
    return contrib / take

feat_contrib = neighbor_feature_contributions(Xtr_s, Xte_s, k=best_params['k'])
contrib_series = pd.Series(feat_contrib, index=list(X.columns)).sort_values(ascending=False)
display(contrib_series.head(15))


## Part 6 — Comparison and Advanced Analysis

### 6.1 Sklearn Comparison

In [None]:

# Compare custom vs sklearn
metric = 'minkowski' if best_params['distance_metric']=='euclidean' else best_params['distance_metric']
p = 2 if best_params['distance_metric']=='euclidean' else 1
sk = KNeighborsRegressor(n_neighbors=best_params['k'], metric=metric, p=p, weights=best_params['weights'])
sk.fit(Xtr_s, y_train)
sk_pred = sk.predict(Xte_s)
print("CustomKNN Test R^2:", r2_score(y_test, y_pred_test))
print("Sklearn KNN Test R^2:", r2_score(y_test, sk_pred))
print("Absolute R^2 diff:", abs(r2_score(y_test, y_pred_test) - r2_score(y_test, sk_pred)))


### 6.2 Curse of Dimensionality

In [None]:

from sklearn.datasets import make_regression

dims = [2,4,8,16,32,64]
mse_list = []
for d in dims:
    Xsyn, ysyn = make_regression(n_samples=2000, n_features=d, noise=10.0, random_state=RANDOM_STATE)
    Xtr, Xte, ytr, yte = train_test_split(Xsyn, ysyn, test_size=0.2, random_state=RANDOM_STATE)
    sc = StandardScaler().fit(Xtr)
    Xtr_s = sc.transform(Xtr); Xte_s = sc.transform(Xte)
    knn = KNeighborsRegressor(n_neighbors=7, metric='euclidean', weights='distance')
    knn.fit(Xtr_s, ytr)
    pred = knn.predict(Xte_s)
    mse_list.append(mean_squared_error(yte, pred))
    print(f"d={d:2d} -> Test MSE: {mse_list[-1]:.2f}")

fig = plt.figure()
plt.plot(dims, mse_list, marker='o')
plt.xlabel("Dimensions"); plt.ylabel("Test MSE"); plt.title("Curse of Dimensionality for k-NN")
plt.show()


## Conceptual Questions — Answer in your own words

1) Why might Manhattan distance be preferable to Euclidean distance in certain scenarios?

2) How does k affect the bias–variance tradeoff in k-NN?

3) Computational implications of different distance metrics?

4) How would you modify k-NN for categorical features?

## Implementation Decisions

- Alternatives considered; limitations; scaling choice justification.

## Personal Reflection

- Most challenging part; improvements with more time; real-world applications that benefit from this analysis.