# Multimodal House Price Valuation — Explainability & Economic Insight

This notebook focuses on **explaining** our multimodal models, not just scoring them.

We will:
- Use **Grad-CAM (or similar)** to highlight which regions of satellite tiles drive predictions.
- Compare attention patterns for **high vs. low priced** properties and high-error cases.
- Cross-check with **tabular feature attributions** (e.g., SHAP) to understand how visual and tabular signals interact.
- Reject models whose attention maps look **random or economically meaningless**.



## Setup and Assumptions

This notebook assumes you have already run:

- `preprocessing.ipynb` to create leakage-aware `train.parquet` / `val.parquet`.
- `data_fetcher.py` to download Sentinel tiles and `data/satellite/image_metadata.csv`.

Here we:

- Train a **lightweight image-only regressor** (frozen ResNet backbone + small regression head) on a subset of the data.
- Use **Grad-CAM** on the last convolutional block to highlight regions most influential for price predictions.
- Compare attention patterns across:
  - High vs low priced properties.
  - High-error vs low-error cases.

This image-only model is used **only for interpretability**, not as the production predictor (which may be multimodal). The goal is to see whether it focuses on economically meaningful regions (water, greenery, roads, density).


In [None]:
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms

from PIL import Image
import cv2

plt.style.use("seaborn-v0_8")

PROJECT_ROOT = Path("..").resolve()
PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"
SATELLITE_DIR = PROJECT_ROOT / "data" / "satellite"
REPORTS_DIR = PROJECT_ROOT / "reports"

for d in [REPORTS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Load processed training data
train_path = PROCESSED_DIR / "train.parquet"
val_path = PROCESSED_DIR / "val.parquet"

if not train_path.exists():
    raise FileNotFoundError(f"train.parquet not found at {train_path}; run preprocessing first.")

train_df = pd.read_parquet(train_path)
val_df = pd.read_parquet(val_path) if val_path.exists() else None

TARGET_COL = "log_price" if "log_price" in train_df.columns else "price"
ID_COL = "id" if "id" in train_df.columns else "Id" if "Id" in train_df.columns else None

if ID_COL is None:
    raise ValueError("ID column not found in processed train data.")

print("TARGET_COL =", TARGET_COL)
print("ID_COL =", ID_COL)

meta_path = SATELLITE_DIR / "image_metadata.csv"
if not meta_path.exists():
    raise FileNotFoundError(
        f"Satellite metadata not found at {meta_path}; run data_fetcher.py first."
    )

meta_df = pd.read_csv(meta_path)
meta_df = meta_df[meta_df["status"].isin(["ok", "cached"])]
if ID_COL != "id" and "id" in meta_df.columns:
    meta_df = meta_df.rename(columns={"id": ID_COL})

meta_df = meta_df[meta_df["image_path"].apply(lambda p: Path(p).exists())]

print("Usable satellite images:", len(meta_df))

# Merge train data with imagery

train_img_df = train_df.merge(meta_df[[ID_COL, "image_path"]], on=ID_COL, how="inner")

print("Train with images:", train_img_df.shape)

# To keep runtime manageable on CPU, we can subsample for Grad-CAM training
max_train_images = 5000
if len(train_img_df) > max_train_images:
    train_img_df = train_img_df.sample(max_train_images, random_state=42).reset_index(drop=True)
    print(f"Subsampled to {len(train_img_df)} images for Grad-CAM model training.")

IMG_SIZE = 224

img_transform = transforms.Compose(
    [
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
        ),
    ]
)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


class ImagePriceDataset(Dataset):
    def __init__(self, df, id_col, target_col, transform=None):
        self.df = df.reset_index(drop=True)
        self.id_col = id_col
        self.target_col = target_col
        self.transform = transform

    def __len__(self):  # noqa: D401
        """Number of image-price pairs."""
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = Image.open(row["image_path"]).convert("RGB")
        if self.transform is not None:
            img = self.transform(img)
        target = float(row[self.target_col])
        sample_id = row[self.id_col]
        return img, target, sample_id


train_dataset = ImagePriceDataset(train_img_df, id_col=ID_COL, target_col=TARGET_COL, transform=img_transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)


In [None]:
# Define a lightweight image-only regressor (frozen backbone)

try:
    base_resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
except AttributeError:
    base_resnet = models.resnet18(pretrained=True)

for param in base_resnet.parameters():
    param.requires_grad = False

# We will unfreeze only the final fully-connected head
feature_dim = base_resnet.fc.in_features

class ResNetRegressor(nn.Module):
    def __init__(self, backbone, feature_dim):
        super().__init__()
        self.backbone = backbone
        self.backbone.fc = nn.Identity()
        self.head = nn.Linear(feature_dim, 1)

    def forward(self, x):
        feats = self.backbone(x)
        out = self.head(feats)
        return out.squeeze(-1), feats


model = ResNetRegressor(base_resnet, feature_dim=feature_dim).to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.head.parameters(), lr=1e-3)


def train_image_regressor(num_epochs=3):
    model.train()
    for epoch in range(num_epochs):
        epoch_losses = []
        for imgs, targets, _ in train_loader:
            imgs = imgs.to(device)
            targets = targets.to(device).float()

            optimizer.zero_grad()
            preds, _ = model(imgs)
            loss = criterion(preds, targets)
            loss.backward()
            optimizer.step()

            epoch_losses.append(loss.item())
        print(f"Epoch {epoch + 1}/{num_epochs} - MSE: {np.mean(epoch_losses):.4f}")


# Optionally train (or retrain) the image-only regressor before Grad-CAM
# This can be commented out once a model is trained and saved.
train_image_regressor(num_epochs=3)

# Save model for reuse
image_model_path = REPORTS_DIR / "image_regressor_resnet18.pt"
torch.save(model.state_dict(), image_model_path)
print("Saved image regressor to", image_model_path)


In [None]:
# Grad-CAM implementation for regression

class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None
        self._register_hooks()

    def _register_hooks(self):
        def forward_hook(module, inputs, output):  # noqa: D401, ANN001
            """Store activations from the target layer."""
            self.activations = output.detach()

        def backward_hook(module, grad_input, grad_output):  # noqa: D401, ANN001
            """Store gradients from the target layer."""
            self.gradients = grad_output[0].detach()

        self.target_layer.register_forward_hook(forward_hook)
        self.target_layer.register_backward_hook(backward_hook)

    def __call__(self, x):
        self.model.eval()
        self.model.zero_grad()

        x = x.requires_grad_(True)
        preds, _ = self.model(x)

        # For regression, backpropagate from the scalar prediction
        preds.mean().backward()

        gradients = self.gradients  # (B, C, H, W)
        activations = self.activations  # (B, C, H, W)

        # Global average pooling over gradients
        weights = gradients.mean(dim=(2, 3), keepdim=True)

        cam = (weights * activations).sum(dim=1, keepdim=True)  # (B, 1, H, W)
        cam = torch.relu(cam)

        # Normalize to [0, 1]
        cam = cam.squeeze(0).squeeze(0).cpu().numpy()
        cam = (cam - cam.min()) / (cam.max() - cam.min() + 1e-8)

        return cam


def overlay_cam_on_image(img_np, cam, alpha=0.4):
    """Overlay a CAM heatmap onto an RGB image.

    Args:
        img_np: HxWx3 uint8 RGB image.
        cam: HxW float32 mask in [0, 1].
    """
    heatmap = (cam * 255).astype(np.uint8)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)

    overlay = (1 - alpha) * img_np.astype(float) + alpha * heatmap.astype(float)
    overlay = np.clip(overlay, 0, 255).astype(np.uint8)
    return overlay


In [None]:
# Prepare Grad-CAM object on the last convolutional block

# For ResNet18, layer4 is the last conv block
cam_extractor = GradCAM(model, target_layer=model.backbone.layer4[-1])


def visualize_cam_for_row(row, title=None):
    img_path = row["image_path"]
    img = Image.open(img_path).convert("RGB")
    img_resized = img.resize((IMG_SIZE, IMG_SIZE))
    img_np = np.array(img_resized)

    x = img_transform(img).unsqueeze(0).to(device)
    cam = cam_extractor(x)

    # Resize CAM to match image
    cam_resized = cv2.resize(cam, (IMG_SIZE, IMG_SIZE))
    overlay = overlay_cam_on_image(img_np, cam_resized)

    fig, axes = plt.subplots(1, 3, figsize=(12, 4))
    axes[0].imshow(img_np)
    axes[0].set_title("Original")
    axes[0].axis("off")

    axes[1].imshow(cam_resized, cmap="jet")
    axes[1].set_title("Grad-CAM mask")
    axes[1].axis("off")

    axes[2].imshow(overlay)
    axes[2].set_title("Overlay")
    axes[2].axis("off")

    if title is not None:
        fig.suptitle(title)

    plt.tight_layout()
    plt.show()


## Grad-CAM: High vs Low Prices and High-Error Cases

We now inspect Grad-CAM maps for three types of properties:

1. **High-price decile** – do maps focus on water, greenery, low-density areas, or premium amenities?
2. **Low-price decile** – do maps highlight busy roads, high-density housing, or industrial areas?
3. **High-error cases** – where the image-only model is most wrong; does it attend to misleading artefacts (clouds, borders, irrelevant land)?

We visually compare these patterns to validate whether the model is using economically meaningful signals.


In [None]:
# Compute predictions and residuals for train_img_df

model.eval()

all_preds = []
all_targets = []
all_ids = []

with torch.no_grad():
    for imgs, targets, ids in DataLoader(train_dataset, batch_size=32, shuffle=False, num_workers=0):
        imgs = imgs.to(device)
        targets = targets.to(device).float()
        preds, _ = model(imgs)
        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(targets.cpu().numpy())
        all_ids.extend(list(ids))

preds = np.array(all_preds)
targets = np.array(all_targets)

residuals = targets - preds

results_df = pd.DataFrame(
    {
        ID_COL: all_ids,
        "target": targets,
        "pred": preds,
        "residual": residuals,
    }
)

results_df = results_df.merge(train_img_df[[ID_COL, "image_path"]], on=ID_COL, how="left")

# High- and low-price deciles (based on target)

high_price = results_df.sort_values("target", ascending=False).head(12)
low_price = results_df.sort_values("target", ascending=True).head(12)

# High-error cases (absolute residuals)

high_error = results_df.assign(abs_res=lambda d: d["residual"].abs()).sort_values(
    "abs_res", ascending=False
).head(12)

print("High price examples:", len(high_price))
print("Low price examples:", len(low_price))
print("High error examples:", len(high_error))


def visualize_group(df, group_name):
    print(f"\n=== {group_name} ===")
    for _, row in df.iterrows():
        price = row["target"]
        pred = row["pred"]
        resid = row["residual"]
        title = f"{group_name} | ID={row[ID_COL]} | target={price:.1f}, pred={pred:.1f}, resid={resid:.1f}"
        visualize_cam_for_row(row, title=title)


# Uncomment to generate Grad-CAM visualizations (can be many figures)
# visualize_group(high_price, "High price")
# visualize_group(low_price, "Low price")
# visualize_group(high_error, "High error")


## Tabular Feature Importance and SHAP (Optional but Recommended)

To understand how **tabular and image-derived features** interact in the final multimodal model:

- Load the best-performing tabular or fusion model from `model_training.ipynb` (e.g., via `joblib.load`).
- Use **SHAP** on the tabular feature space (and, if feasible, image embedding dimensions) to quantify feature contributions.
- Compare the relative impact of classical drivers (`sqft_living`, `grade`, `waterfront`, etc.) vs. image features.

This complementary view helps ensure that any **visual premiums** (e.g., water, greenery) align with both Grad-CAM attention and tabular attributions, rather than being artefacts of a single model.

