# Model Training & Evaluation

This notebook trains a Multimodal Regression Model (Satellite Images + Tabular Data) to predict property prices.

In [1]:
import pandas as pd
import numpy as np
import os
import cv2
import tensorflow as tf
from tensorflow.keras import layers, models, applications
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# Constants
IMAGE_DIR = "satellite_images"
IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 10  # Adjustable

ModuleNotFoundError: No module named 'pandas'

In [None]:
# Load Data
train_df = pd.read_excel('train.xlsx')
test_df = pd.read_excel('test.xlsx')

# Preprocessing Tabular Data
numerical_cols = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated', 'lat', 'long', 'sqft_living15', 'sqft_lot15']

# Handle missing if any (fill with 0 or mean)
train_df[numerical_cols] = train_df[numerical_cols].fillna(0)
test_df[numerical_cols] = test_df[numerical_cols].fillna(0)

scaler = StandardScaler()
X_train_num = scaler.fit_transform(train_df[numerical_cols])
y_train = train_df['price'].values

X_test_num = scaler.transform(test_df[numerical_cols])

In [None]:
# Data Generator
class MultimodalDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, ids, num_data, labels=None, batch_size=32, img_dir=IMAGE_DIR, dim=(224, 224)):
        self.ids = ids
        self.num_data = num_data
        self.labels = labels
        self.batch_size = batch_size
        self.img_dir = img_dir
        self.dim = dim
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.ids) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        ids_temp = [self.ids[k] for k in indexes]
        
        X_img, X_num = self.__data_generation(ids_temp, indexes)
        
        if self.labels is not None:
            y = self.labels[indexes]
            return [X_num, X_img], y
        else:
            return [X_num, X_img]

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.ids))
        if self.labels is not None:
             np.random.shuffle(self.indexes)

    def __data_generation(self, ids_temp, indexes):
        # Initialization
        X_img = np.empty((len(ids_temp), *self.dim, 3))
        X_num = self.num_data[indexes]

        for i, ID in enumerate(ids_temp):
            img_path = os.path.join(self.img_dir, f"{ID}.jpg")
            if os.path.exists(img_path):
                img = cv2.imread(img_path)
                if img is not None:
                    img = cv2.resize(img, self.dim)
                    img = img / 255.0  # Normalize
                else:
                    img = np.zeros((*self.dim, 3)) # Black image fallback
            else:
                img = np.zeros((*self.dim, 3)) # Black image fallback
            X_img[i,] = img

        return X_img, X_num

In [None]:
# Split Train/Val
X_num_train, X_num_val, y_train_split, y_val_split, id_train, id_val = train_test_split(
    X_train_num, y_train, train_df['id'].values, test_size=0.2, random_state=42
)

train_gen = MultimodalDataGenerator(id_train, X_num_train, y_train_split, BATCH_SIZE)
val_gen = MultimodalDataGenerator(id_val, X_num_val, y_val_split, BATCH_SIZE)

In [None]:
# Build Model
def create_multimodal_model():
    # Numerical Branch
    input_num = layers.Input(shape=(X_train_num.shape[1],))
    x_num = layers.Dense(128, activation='relu')(input_num)
    x_num = layers.Dropout(0.3)(x_num)
    x_num = layers.Dense(64, activation='relu')(x_num)

    # Image Branch
    input_img = layers.Input(shape=(224, 224, 3))
    # Using a simple CNN for speed, can swap with EfficientNetB0
    x_img = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
    x_img = layers.MaxPooling2D((2, 2))(x_img)
    x_img = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x_img)
    x_img = layers.MaxPooling2D((2, 2))(x_img)
    x_img = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x_img)
    x_img = layers.Flatten()(x_img)
    x_img = layers.Dense(64, activation='relu')(x_img)

    # Fusion
    combined = layers.concatenate([x_num, x_img])
    z = layers.Dense(128, activation='relu')(combined)
    z = layers.Dense(64, activation='relu')(z)
    output = layers.Dense(1, activation='linear')(z)

    model = models.Model(inputs=[input_num, input_img], outputs=output)
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    return model

model = create_multimodal_model()
model.summary()

In [None]:
# Train
history = model.fit(train_gen, validation_data=val_gen, epochs=EPOCHS)

In [None]:
# Generate Predictions
test_gen = MultimodalDataGenerator(test_df['id'].values, X_test_num, batch_size=BATCH_SIZE, labels=None, dim=(224,224))
predictions = model.predict(test_gen)

submission_df = pd.DataFrame({'id': test_df['id'], 'predicted_price': predictions.flatten()})
submission_df.to_csv('enrollno_final.csv', index=False)
print("Saved predictions to enrollno_final.csv")

In [None]:
# Grad-CAM Implementation
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    # Create a model that maps the input image to the activations of the last conv layer
    # as well as the output predictions
    
    # Filter layers to find the image branch part up to the last conv layer
    # Since we can't easily extract sub-graph from Functional API without names,
    # we relies on the fact that we can tap into the tensor graph.
    
    grad_model = models.Model(
        [model.inputs[0], model.inputs[1]], [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model([img_array[0], img_array[1]])
        if pred_index is None:
            pred_index = 0 # Single output
        class_channel = preds[:, pred_index]

    grads = tape.gradient(class_channel, last_conv_layer_output)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

# Find last conv layer
last_conv_layer = None
for layer in model.layers:
    if 'conv2d' in layer.name:
        last_conv_layer = layer.name
# Use the last one found
print(f"Using layer {last_conv_layer} for Grad-CAM")

if last_conv_layer:
    # Visualize for a sample
    try:
        sample_idx = 0
        # Get batches
        X_batch, y_batch = val_gen[0]
        sample_num = X_batch[0][sample_idx:sample_idx+1] # Shape (1, 17)
        sample_img = X_batch[1][sample_idx:sample_idx+1] # Shape (1, 224, 224, 3)

        # Generate Heatmap
        heatmap = make_gradcam_heatmap([sample_num, sample_img], model, last_conv_layer)

        # Display
        plt.figure(figsize=(10, 5))
        plt.subplot(1, 2, 1)
        plt.imshow(sample_img[0])
        plt.title("Original Image")
        plt.subplot(1, 2, 2)
        plt.imshow(heatmap)
        plt.title("Grad-CAM Heatmap")
        plt.show()
    except Exception as e:
        print(f"Grad-CAM Error: {e}")