In [1]:
# !pip install imantics
# !pip install patchify

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import os

path = "/content/drive/MyDrive/parts_train_data_inter/test_images_labels_targets.tar"
for root, dirs, files in os.walk(path):
    for file in files:
        print(os.path.join(root, file))

In [4]:
!tar -xf /content/drive/MyDrive/parts_train_data_inter/test_images_labels_targets.tar -C /content/

In [5]:
import os

extracted_path = '/content/'
for root, dirs, files in os.walk(extracted_path):
    for name in files[:10]:
        print(os.path.join(root, name))
    break

import os

labels_path = '/content/test/images'

for name in os.listdir(labels_path)[:10]:
    print(name)


mexico-earthquake_00000113_pre_disaster.png
hurricane-florence_00000087_post_disaster.png
hurricane-michael_00000089_pre_disaster.png
socal-fire_00000325_pre_disaster.png
midwest-flooding_00000354_pre_disaster.png
socal-fire_00000885_post_disaster.png
socal-fire_00001138_pre_disaster.png
palu-tsunami_00000163_pre_disaster.png
hurricane-florence_00000077_post_disaster.png
midwest-flooding_00000332_post_disaster.png


In [6]:
import os
import glob

image_dir = '/content/test/images'
target_dir = '/content/test/targets'
label_dir = '/content/test/labels'

train_dict = {}

# Loop through target masks
for target_path in glob.glob(os.path.join(target_dir, '*_target.png')):
    filename = os.path.basename(target_path)

    # Remove _pre_disaster_target.png or _post_disaster_target.png
    if '_pre_disaster_target.png' in filename:
        base_id = filename.replace('_pre_disaster_target.png', '')
    elif '_post_disaster_target.png' in filename:
        base_id = filename.replace('_post_disaster_target.png', '')
    else:
        continue  # skip invalid names

    pre_img = os.path.join(image_dir, base_id + '_pre_disaster.png')
    post_img = os.path.join(image_dir, base_id + '_post_disaster.png')
    label_json = os.path.join(label_dir, base_id + '_post_disaster.json')

    if os.path.exists(pre_img) and os.path.exists(post_img):
        train_dict[base_id] = {
            'pre_image': pre_img,
            'post_image': post_img,
            'target_mask': target_path,
            'label_json': label_json if os.path.exists(label_json) else None
        }
    else:
        print(f"Missing: {base_id}")
        if not os.path.exists(pre_img):
            print(" - Missing pre:", pre_img)
        if not os.path.exists(post_img):
            print(" - Missing post:", post_img)

# Preview one example
from pprint import pprint
pprint(dict(list(train_dict.items())[:1]))


{'socal-fire_00000000': {'label_json': '/content/test/labels/socal-fire_00000000_post_disaster.json',
                         'post_image': '/content/test/images/socal-fire_00000000_post_disaster.png',
                         'pre_image': '/content/test/images/socal-fire_00000000_pre_disaster.png',
                         'target_mask': '/content/test/targets/socal-fire_00000000_pre_disaster_target.png'}}


In [7]:
from tensorflow.keras.models import load_model
import tensorflow as tf



@tf.keras.utils.register_keras_serializable(package="Custom")
def bulinding_iou(y_true, y_pred):
    y_pred = tf.cast(y_pred > 0.5, tf.float32)

    # Only keep patches that have some damage
    mask = tf.reduce_sum(y_true, axis=[1,2,3]) > 0
    y_true = tf.boolean_mask(y_true, mask)
    y_pred = tf.boolean_mask(y_pred, mask)

    # If no damage patches exist, return 0
    def compute_iou():
        intersection = tf.reduce_sum(y_true * y_pred, axis=[1,2,3])
        union = tf.reduce_sum(y_true, axis=[1,2,3]) + tf.reduce_sum(y_pred, axis=[1,2,3]) - intersection
        return tf.reduce_mean(intersection / (union + 1e-7))

    return tf.cond(tf.size(y_true) > 0, compute_iou, lambda: 0.0)

@tf.keras.utils.register_keras_serializable(package="Custom")
def bulinding_f1_per_class(y_true, y_pred):
    y_pred = tf.cast(y_pred > 0.5, tf.float32)

    # Only damage class
    intersection = tf.reduce_sum(y_true * y_pred)
    precision = intersection / (tf.reduce_sum(y_pred) + 1e-7)
    recall = intersection / (tf.reduce_sum(y_true) + 1e-7)
    return 2 * precision * recall / (precision + recall + 1e-7)
model = load_model(
    "/content/drive/MyDrive/bulinding_seg_resnet34.keras",
    custom_objects={
        "bulinding_iou": bulinding_iou,
        "bulinding_f1_per_class": bulinding_f1_per_class
    },
    compile=False  # safer for custom losses
)

In [8]:

# import tensorflow as tf

# @tf.keras.utils.register_keras_serializable()
# class SparseCategoricalFocalLoss(tf.keras.losses.Loss):
#     def __init__(self, gamma=2.0, class_weights=None, label_smoothing=0.05,
#                  reduction=tf.keras.losses.Reduction.SUM_OVER_BATCH_SIZE,
#                  name="sparse_categorical_focal_loss", **kwargs):
#         super().__init__(reduction=reduction, name=name, **kwargs)
#         self.gamma = gamma
#         self.label_smoothing = label_smoothing
#         self.class_weights = tf.constant(class_weights, dtype=tf.float32) if class_weights is not None else None

#     def call(self, y_true, y_pred):
#         """
#         y_true: (batch,) integer class labels
#         y_pred: (batch, num_classes) softmax probabilities
#         """
#         y_true = tf.cast(y_true, tf.int32)
#         y_pred = tf.clip_by_value(y_pred, 1e-8, 1.0 - 1e-8)

#         num_classes = tf.shape(y_pred)[-1]
#         y_true_one_hot = tf.one_hot(y_true, depth=num_classes, dtype=tf.float32)

#         # --- Apply label smoothing ---
#         if self.label_smoothing > 0:
#             smooth_positives = 1.0 - self.label_smoothing
#             smooth_negatives = self.label_smoothing / tf.cast(num_classes - 1, tf.float32)
#             y_true_one_hot = y_true_one_hot * smooth_positives + smooth_negatives

#         # --- Compute focal loss ---
#         pt = tf.reduce_sum(y_true_one_hot * y_pred, axis=-1)
#         focal_factor = tf.pow(1.0 - pt, self.gamma)
#         ce_loss = -tf.reduce_sum(y_true_one_hot * tf.math.log(y_pred), axis=-1)
#         loss = focal_factor * ce_loss

#         # --- Apply class weights (optional) ---
#         if self.class_weights is not None:
#             weights = tf.gather(self.class_weights, y_true)
#             loss = loss * weights

#         return tf.reduce_mean(loss)

#     def get_config(self):
#         config = super().get_config()
#         config.update({
#             "gamma": self.gamma,
#             "label_smoothing": self.label_smoothing,
#             "class_weights": self.class_weights.numpy().tolist() if self.class_weights is not None else None,
#         })
#         return config
# #class_weights = [0.28, 4.27, 2.20, 1.67]
# #class_weights = [0.05, 0.8, 0.6, 0.3]
# #class_weights = [0.1, 0.4, 0.3, 0.2]
# class_weights= [0.05041411, 0.43280042, 0.29408234, 0.22270313]


import tensorflow as tf

@tf.keras.utils.register_keras_serializable()
class HybridWeightedFocalLoss(tf.keras.losses.Loss):
    def __init__(self,
                 class_weights=None,
                 label_smoothing=0.0,
                 gamma=2.0,
                 lam=0.5,
                 reduction=tf.keras.losses.Reduction.SUM_OVER_BATCH_SIZE,
                 name="hybrid_weighted_focal_loss",
                 **kwargs):
        """
        lam in [0,1] mixes between WCCE and Focal:
          loss = (1-lam) * WCCE + lam * FocalWeighted
        gamma is focal gamma.
        class_weights: list/array-like of len=num_classes or None.
        """
        super().__init__(reduction=reduction, name=name, **kwargs)
        self.class_weights = tf.constant(class_weights, dtype=tf.float32) if class_weights is not None else None
        self.label_smoothing = float(label_smoothing)
        self.gamma = float(gamma)
        self.lam = float(lam)

    def call(self, y_true, y_pred):
        # y_true: (batch,) integer labels
        # y_pred: (batch, num_classes) softmax probabilities
        y_true = tf.cast(y_true, tf.int32)
        y_pred = tf.clip_by_value(y_pred, 1e-8, 1.0 - 1e-8)
        num_classes = tf.shape(y_pred)[-1]
        y_true_one_hot = tf.one_hot(y_true, depth=num_classes, dtype=tf.float32)

        # label smoothing
        if self.label_smoothing > 0:
            smooth_pos = 1.0 - self.label_smoothing
            smooth_neg = self.label_smoothing / tf.cast(num_classes - 1, tf.float32)
            y_true_one_hot = y_true_one_hot * smooth_pos + smooth_neg

        # standard cross-entropy per sample
        ce_per_sample = -tf.reduce_sum(y_true_one_hot * tf.math.log(y_pred), axis=-1)

        # class weights per sample (gather by int label)
        if self.class_weights is not None:
            sample_weights = tf.gather(self.class_weights, y_true)
        else:
            sample_weights = 1.0

        # Weighted Categorical Cross-Entropy (scalar per sample)
        wce = ce_per_sample * sample_weights

        # Focal part: focal factor computed w.r.t. true class probability pt
        pt = tf.reduce_sum(y_true_one_hot * y_pred, axis=-1)  # p_t
        focal_factor = tf.pow(1.0 - pt, self.gamma)
        focal_loss = focal_factor * ce_per_sample

        # apply same class weights to focal part (keeps balance)
        focal_loss = focal_loss * sample_weights

        # Mix them
        loss = (1.0 - self.lam) * wce + self.lam * focal_loss

        return tf.reduce_mean(loss)

    def get_config(self):
        cfg = super().get_config()
        cfg.update({
            "class_weights": self.class_weights.numpy().tolist() if self.class_weights is not None else None,
            "label_smoothing": self.label_smoothing,
            "gamma": self.gamma,
            "lam": self.lam,
        })
        return cfg




import tensorflow as tf
import numpy as np

# ---------------- Multi-class Precision ---------------- #
@tf.keras.utils.register_keras_serializable()
class MultiClassPrecision(tf.keras.metrics.Metric):
    def __init__(self, num_classes=4, name="precision", **kwargs):
        super().__init__(name=name, **kwargs)
        self.num_classes = num_classes
        self.tp = self.add_weight(shape=(num_classes,), initializer="zeros", dtype=tf.float32)
        self.fp = self.add_weight(shape=(num_classes,), initializer="zeros", dtype=tf.float32)

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(tf.reshape(y_true, [-1]), tf.int32)
        y_pred = tf.cast(tf.argmax(y_pred, axis=-1), tf.int32)

        cm = tf.math.confusion_matrix(y_true, y_pred, num_classes=self.num_classes, dtype=tf.float32)
        tp = tf.linalg.diag_part(cm)
        fp = tf.reduce_sum(cm, axis=0) - tp

        self.tp.assign_add(tp)
        self.fp.assign_add(fp)

    def result(self):
        precision = self.tp / (self.tp + self.fp + 1e-8)
        return precision  # <-- return per-class precision

    def reset_state(self):
        for v in self.variables:
            v.assign(tf.zeros_like(v))


# ---------------- Multi-class Recall ---------------- #
@tf.keras.utils.register_keras_serializable()
class MultiClassRecall(tf.keras.metrics.Metric):
    def __init__(self, num_classes=4, name="recall", **kwargs):
        super().__init__(name=name, **kwargs)
        self.num_classes = num_classes
        self.tp = self.add_weight(shape=(num_classes,), initializer="zeros", dtype=tf.float32)
        self.fn = self.add_weight(shape=(num_classes,), initializer="zeros", dtype=tf.float32)

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(tf.reshape(y_true, [-1]), tf.int32)
        y_pred = tf.cast(tf.argmax(y_pred, axis=-1), tf.int32)

        cm = tf.math.confusion_matrix(y_true, y_pred, num_classes=self.num_classes, dtype=tf.float32)
        tp = tf.linalg.diag_part(cm)
        fn = tf.reduce_sum(cm, axis=1) - tp

        self.tp.assign_add(tp)
        self.fn.assign_add(fn)

    def result(self):
        recall = self.tp / (self.tp + self.fn + 1e-8)
        return recall  # <-- return per-class recall

    def reset_state(self):
        for v in self.variables:
            v.assign(tf.zeros_like(v))


# ---------------- Per-Class Harmonic Mean ---------------- #
import tensorflow as tf

@tf.keras.utils.register_keras_serializable()
class PerClassHarmonicMean(tf.keras.metrics.Metric):
    def __init__(self, num_classes=4, name="harmonic_mean", **kwargs):
        super().__init__(name=name, **kwargs)
        self.num_classes = num_classes
        self.eps = 1e-8

        # Store confusion components
        self.tp = self.add_weight(shape=(num_classes,), initializer="zeros", dtype=tf.float32)
        self.fp = self.add_weight(shape=(num_classes,), initializer="zeros", dtype=tf.float32)
        self.fn = self.add_weight(shape=(num_classes,), initializer="zeros", dtype=tf.float32)

    def update_state(self, y_true, y_pred, sample_weight=None):
        # Flatten
        y_true = tf.cast(tf.reshape(y_true, [-1]), tf.int32)
        y_pred = tf.cast(tf.argmax(y_pred, axis=-1), tf.int32)
        y_pred = tf.reshape(y_pred, [-1])

        # Compute confusion matrix
        cm = tf.math.confusion_matrix(y_true, y_pred, num_classes=self.num_classes, dtype=tf.float32)
        tp = tf.linalg.diag_part(cm)
        fp = tf.reduce_sum(cm, axis=0) - tp
        fn = tf.reduce_sum(cm, axis=1) - tp

        # Accumulate totals
        self.tp.assign_add(tp)
        self.fp.assign_add(fp)
        self.fn.assign_add(fn)

    def result(self):
        precision = self.tp / (self.tp + self.fp + self.eps)
        recall = self.tp / (self.tp + self.fn + self.eps)
        harmonic = 2.0 / ((1.0 / (precision + self.eps)) + (1.0 / (recall + self.eps)))

        # Return macro-average harmonic mean (mean of all classes)
        return tf.reduce_mean(harmonic)

    def reset_state(self):
        for v in self.variables:
            v.assign(tf.zeros_like(v))



import matplotlib.pyplot as plt

# ---------------- Per-Class Metrics Callback ---------------- #
@tf.keras.utils.register_keras_serializable()
class PerClassMetrics(tf.keras.callbacks.Callback):
    def __init__(self, x_val, y_val, num_classes=4, store_history=True):
        super().__init__()
        self.x_val = x_val
        self.y_val = y_val
        self.num_classes = num_classes
        self.store_history = store_history
        self.history = []  # store per-class metrics over epochs
        self.last_recall = None  # store last per-class recall for external logic

    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(self.x_val, verbose=0)
        y_pred = np.argmax(y_pred, axis=1)
        y_true = np.array(self.y_val).reshape(-1)

        cm = tf.math.confusion_matrix(y_true, y_pred, num_classes=self.num_classes).numpy()
        tp = np.diag(cm)
        fp = np.sum(cm, axis=0) - tp
        fn = np.sum(cm, axis=1) - tp

        precision = tp / (tp + fp + 1e-8)
        recall = tp / (tp + fn + 1e-8)
        harmonic = 2.0 / ((1.0 / (precision + 1e-8)) + (1.0 / (recall + 1e-8)))

        print(f"\nEpoch {epoch+1} â€” Per-class metrics:")
        for i in range(self.num_classes):
            print(f"  Class {i}: Precision = {precision[i]:.3f}, Recall = {recall[i]:.3f}, Harmonic = {harmonic[i]:.3f}")
        print("-" * 60)

        # store results
        if self.store_history:
            self.history.append({
                'epoch': epoch + 1,
                'precision': precision.tolist(),
                'recall': recall.tolist(),
                'harmonic': harmonic.tolist()
            })
        self.last_recall = recall

    def get_config(self):
        return {
            "num_classes": self.num_classes,
            "store_history": self.store_history
        }

from tensorflow.keras.applications.resnet50 import preprocess_input

custom_objects = {
    "MultiClassPrecision": MultiClassPrecision,
    "MultiClassRecall": MultiClassRecall,
    "PerClassHarmonicMean": PerClassHarmonicMean,
    "preprocess_input": preprocess_input  # <-- add the lambda function here
}

dc_model = tf.keras.models.load_model("/content/drive/MyDrive/updated_efficientNETB0_1111.keras", custom_objects=custom_objects)


In [9]:
import os
import cv2
import numpy as np
from PIL import Image
from imantics import Mask
from tensorflow.keras.applications.efficientnet import preprocess_input

# ----------------- Config -----------------
PATCH_SIZE = 256
STEP_SIZE = 128      # 50% overlap
BATCH_SIZE = 8
CROP_SIZE = 224
PAD_RATIO = 0.15

# ----------------- Utilities -----------------
def load_image_rgb(path):
    """Load image as RGB array"""
    return np.array(Image.open(path).convert("RGB"))

def pad_to_multiple(image, patch_size=PATCH_SIZE, step=STEP_SIZE):
    """Pad image so its dimensions are compatible with patch and step"""
    h, w = image.shape[:2]
    new_h = ((h - patch_size) // step + 1) * step + patch_size
    new_w = ((w - patch_size) // step + 1) * step + patch_size
    pad_h, pad_w = new_h - h, new_w - w
    pad_width = ((0, pad_h), (0, pad_w), (0, 0))
    return np.pad(image, pad_width, mode='constant', constant_values=0), h, w

def gaussian_kernel(size=PATCH_SIZE, sigma=40):
    """Generate 2D Gaussian kernel for soft blending"""
    ax = np.linspace(-(size - 1) / 2., (size - 1) / 2., size)
    xx, yy = np.meshgrid(ax, ax)
    kernel = np.exp(-0.5 * (np.square(xx) + np.square(yy)) / np.square(sigma))
    return kernel / np.max(kernel)

def sharpen_image(image):
    gaussian = cv2.GaussianBlur(image, (3, 3), 1.0)
    return cv2.addWeighted(image, 1.5, gaussian, -0.5, 0)

def apply_clahe(image):
    lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    cl = clahe.apply(l)
    merged = cv2.merge((cl, a, b))
    return cv2.cvtColor(merged, cv2.COLOR_LAB2BGR)

# ----------------- Segmentation Inference -----------------
def infer_segmentation_mask(seg_model, image, patch_size=PATCH_SIZE, step=STEP_SIZE):
    """Inference with overlapping patches and Gaussian blending"""
    h, w, c = image.shape
    padded_img, orig_h, orig_w = pad_to_multiple(image, patch_size, step)
    H, W = padded_img.shape[:2]

    pred_accum = np.zeros((H, W), dtype=np.float32)
    weight_accum = np.zeros((H, W), dtype=np.float32)
    g_kernel = gaussian_kernel(size=patch_size, sigma=40)

    for y in range(0, H - patch_size + 1, step):
        for x in range(0, W - patch_size + 1, step):
            patch = padded_img[y:y+patch_size, x:x+patch_size]
            patch_input = np.expand_dims(patch.astype('float32') / 255.0, axis=0)
            pred = seg_model.predict(patch_input, verbose=0)[0]

            if pred.ndim == 3 and pred.shape[-1] == 1:
                pred = np.squeeze(pred, axis=-1)

            pred_accum[y:y+patch_size, x:x+patch_size] += pred * g_kernel
            weight_accum[y:y+patch_size, x:x+patch_size] += g_kernel

    seg_full = pred_accum / np.maximum(weight_accum, 1e-7)
    seg_full = seg_full[:orig_h, :orig_w]
    building_mask = (seg_full > 0.8).astype(np.uint8)
    return building_mask

# ----------------- Crop Polygon With Context -----------------
def crop_polygon_with_context(img_pre, img_post, poly, pad_ratio=PAD_RATIO):
    coords = np.array(poly, dtype=np.int32)
    if coords.ndim == 1:
        coords = coords.reshape(-1, 2)
    if len(coords) < 3:
        return None

    x, y, w, h = cv2.boundingRect(coords)
    pad = int(pad_ratio * max(w, h))
    x1, y1 = max(0, x - pad), max(0, y - pad)
    x2, y2 = min(img_pre.shape[1], x + w + pad), min(img_pre.shape[0], y + h + pad)
    cropped_pre = img_pre[y1:y2, x1:x2]
    cropped_post = img_post[y1:y2, x1:x2]
    return cropped_pre, cropped_post, (x1, y1, x2, y2)

# ----------------- Main Pipeline -----------------
def extract_buildings_and_predict_damage(info, seg_model, dmg_model):
    """Predict building polygons and classify their damage."""
    img_pre = load_image_rgb(info['pre_image'])
    img_post = load_image_rgb(info['post_image'])

    # --- Load ground truth mask ---
    mask_gt = None
    mask_path = info.get('target_mask') or info.get('mask_image')
    if mask_path and os.path.exists(mask_path):
        mask_gt = np.array(Image.open(mask_path).convert('L'))

    # --- Segmentation prediction ---
    building_mask = infer_segmentation_mask(seg_model, img_pre)
    polygons = Mask(building_mask).polygons().points

    predictions = []

    for idx, poly in enumerate(polygons):
        crops = crop_polygon_with_context(img_pre, img_post, poly)
        if crops is None:
            continue
        cropped_pre, cropped_post, (x1, y1, x2, y2) = crops

        # --- Enhancement first ---
        cropped_pre = apply_clahe(sharpen_image(cropped_pre))
        cropped_post = apply_clahe(sharpen_image(cropped_post))

        # --- Resize after enhancement ---
        cropped_pre = cv2.resize(cropped_pre, (CROP_SIZE, CROP_SIZE))
        cropped_post = cv2.resize(cropped_post, (CROP_SIZE, CROP_SIZE))

        # --- Preprocess for EfficientNet ---
        img_pre_p = preprocess_input(np.expand_dims(cropped_pre.astype(np.float32), axis=0))
        img_post_p = preprocess_input(np.expand_dims(cropped_post.astype(np.float32), axis=0))

        # --- Damage prediction (0â€“3) ---
        pred = dmg_model.predict([img_pre_p, img_post_p], verbose=0)
        damage_level_pred = np.argmax(pred, axis=1)[0]   # 0â€“3, same as training
        confidence = np.max(pred)

        # --- True label from mask (shifted 0â€“3) ---
        true_label_shifted = None
        if isinstance(mask_gt, np.ndarray) and mask_gt.ndim == 2:
            mask_crop = mask_gt[y1:y2, x1:x2]
            unique_vals = np.unique(mask_crop)
            unique_vals = unique_vals[unique_vals > 0]  # ignore background
            if len(unique_vals) > 0:
                true_label_shifted = int(np.median(unique_vals)) - 1  # shift 1â€“4 â†’ 0â€“3

        predictions.append({
            "building_id": idx,
            "polygon": poly.tolist(),
            "damage_level_pred": int(damage_level_pred),
            "damage_level_true": true_label_shifted,
            "confidence": float(confidence)
        })

    return predictions

# ------------------ Predict Damage for Test ------------------ #
def predict_damage_classes_for_test(train_dict, seg_model, dmg_model, num_samples=10):
    """Run inference and print Predicted vs True damage."""
    import random
    keys = random.sample(list(train_dict.keys()), min(num_samples, len(train_dict)))
    results = []

    for key in keys:
        info = train_dict[key]
        preds = extract_buildings_and_predict_damage(info, seg_model, dmg_model)

        if len(preds) == 0:
            print(f"{key}: No buildings detected")
            results.append((-1, -1))
            continue

        best_pred = max(preds, key=lambda x: x['confidence'])
        pred_class = best_pred['damage_level_pred']  # 0â€“3
        true_class = best_pred.get('damage_level_true', 'N/A')  # 0â€“3

        # --- For display only (optional) ---
        display_pred = pred_class + 1 if pred_class >= 0 else 'N/A'
        display_true = true_class + 1 if true_class is not None else 'N/A'

        print(f"{key}: Predicted Damage â†’ {display_pred}, True Damage â†’ {display_true}")
        results.append((pred_class, true_class))  # store 0â€“3 for metric evaluation

    return results


# visualization

In [31]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from PIL import Image
import random

# --- Color mapping for damage levels (0â€“3) --- #
DAMAGE_COLORS = {
    0: (0, 255, 0),      # Green â€“ No damage
    1: (255, 255, 0),    # Yellow â€“ Minor
    2: (255, 165, 0),    # Orange â€“ Major
    3: (255, 0, 0)       # Red â€“ Destroyed
}

def load_image_rgb(path):
    """Load image as RGB array"""
    return np.array(Image.open(path).convert("RGB"))

def visualize_predictions_with_gt(info, seg_model, dmg_model):
    """Visualize pre, post, predicted seg mask, GT seg mask, predicted damage, and GT damage."""
    img_pre = load_image_rgb(info['pre_image'])
    img_post = load_image_rgb(info['post_image'])

    # --- Run segmentation prediction separately (binary mask) ---
    pred_seg_mask = infer_segmentation_mask(seg_model, img_pre)

    # --- Ground truth building mask (if exists) ---
    gt_seg_mask = None
    mask_path = info.get('building_mask') or info.get('target_mask') or info.get('mask_image')
    if mask_path and os.path.exists(mask_path):
        gt_seg_mask = np.array(Image.open(mask_path).convert('L'))
        gt_seg_mask = (gt_seg_mask > 0).astype(np.uint8)

    # --- Run damage classification prediction ---
    preds = extract_buildings_and_predict_damage(info, seg_model, dmg_model)
    if len(preds) == 0:
        print("No buildings detected.")
        return

    # ---------------- Predicted damage overlay ---------------- #
    overlay_pred = img_post.copy()
    for p in preds:
        poly = np.array(p["polygon"], dtype=np.int32)
        dmg = p["damage_level_pred"]  # 0â€“3
        color = DAMAGE_COLORS.get(dmg, (255, 255, 255))

        overlay_fill = overlay_pred.copy()
        cv2.fillPoly(overlay_fill, [poly], color)
        overlay_pred = cv2.addWeighted(overlay_pred, 0.7, overlay_fill, 0.3, 0)
        cv2.polylines(overlay_pred, [poly], isClosed=True, color=(0, 0, 0), thickness=1)

    # ---------------- Ground truth damage overlay ---------------- #
    gt_overlay = img_post.copy()
    gt_mask_color = None
    mask_path = info.get('target_mask') or info.get('mask_image')

    if mask_path and os.path.exists(mask_path):
        mask_gt = np.array(Image.open(mask_path))
        if mask_gt.ndim == 3:
            gt_mask_color = mask_gt
            mask_gt = cv2.cvtColor(mask_gt, cv2.COLOR_RGB2GRAY)
            mask_gt_shifted = mask_gt - 1  # 1â€“4 â†’ 0â€“3
        else:
            mask_gt_shifted = mask_gt - 1
            gt_mask_color = np.zeros((*mask_gt.shape, 3), dtype=np.uint8)
            for dmg, color in DAMAGE_COLORS.items():
                gt_mask_color[mask_gt_shifted == dmg] = color

        # Overlay version
        for dmg, color in DAMAGE_COLORS.items():
            mask_binary = (mask_gt_shifted == dmg).astype(np.uint8)
            gt_overlay[mask_binary == 1] = (
                0.6 * gt_overlay[mask_binary == 1] + 0.4 * np.array(color)
            ).astype(np.uint8)

    # ---------------- Plotting (6 horizontally) ---------------- #
    fig, axes = plt.subplots(1, 6, figsize=(35, 7))

    axes[0].imshow(img_pre)
    axes[0].set_title("Pre-Disaster Image")

    axes[1].imshow(img_post)
    axes[1].set_title("Post-Disaster Image")

    axes[2].imshow(pred_seg_mask, cmap='gray')
    axes[2].set_title("Predicted Building Mask")

    if gt_seg_mask is not None:
        axes[3].imshow(gt_seg_mask, cmap='gray')
        axes[3].set_title("Ground Truth Building Mask")
    else:
        axes[3].imshow(np.zeros_like(pred_seg_mask), cmap='gray')
        axes[3].set_title("No GT Building Mask")

    axes[4].imshow(overlay_pred)
    axes[4].set_title("Predicted Damage Overlay")

    axes[5].imshow(gt_overlay)
    axes[5].set_title("Ground Truth Damage Overlay")

    for ax in axes:
        ax.axis('off')

    # Legend for damage levels
    legend_patches = [
        mpatches.Patch(color=np.array(c) / 255, label=f"Damage {i}")
        for i, c in DAMAGE_COLORS.items()
    ]
    plt.legend(
        handles=legend_patches,
        loc='lower center',
        bbox_to_anchor=(1.1, -0.1),
        ncol=4,
        frameon=False
    )

    plt.tight_layout()
    plt.show()


def visualize_random_samples(train_dict, seg_model, dmg_model, num_samples=5):
    keys = list(train_dict.keys())
    random_keys = random.sample(keys, min(num_samples, len(keys)))

    for i, key in enumerate(random_keys, 1):
        print(f"\nðŸ”¹ Visualizing sample {i}/{len(random_keys)} â†’ {key}")
        info = train_dict[key]

        visualize_predictions_with_gt(info, seg_model, dmg_model)


visualize_random_samples(train_dict, model, dc_model, num_samples=15)


Output hidden; open in https://colab.research.google.com to view.

## visualization

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from PIL import Image
import random

# --- Color mapping for damage levels (0â€“3) --- #
DAMAGE_COLORS = {
    0: (0, 255, 0),      # Green
    1: (255, 255, 0),    # Yellow
    2: (255, 165, 0),    # Orange
    3: (255, 0, 0)       # Red
}

def load_image_rgb(path):
    """Load image as RGB array"""
    return np.array(Image.open(path).convert("RGB"))

def visualize_predictions_with_gt(info, seg_model, dmg_model):
    """Visualize pre, post, predicted overlays, ground truth overlay, and GT mask."""
    img_pre = load_image_rgb(info['pre_image'])
    img_post = load_image_rgb(info['post_image'])

    # Run full prediction pipeline
    preds = extract_buildings_and_predict_damage(info, seg_model, dmg_model)
    if len(preds) == 0:
        print("No buildings detected.")
        return

    # ---------------- Predicted overlay ---------------- #
    overlay_pred = img_post.copy()
    for p in preds:
        poly = np.array(p["polygon"], dtype=np.int32)
        dmg = p["damage_level_pred"]  # 0â€“3
        color = DAMAGE_COLORS.get(dmg, (255, 255, 255))

        overlay_fill = overlay_pred.copy()
        cv2.fillPoly(overlay_fill, [poly], color)
        overlay_pred = cv2.addWeighted(overlay_pred, 0.7, overlay_fill, 0.3, 0)
        cv2.polylines(overlay_pred, [poly], isClosed=True, color=(0, 0, 0), thickness=1)

    # ---------------- Ground truth overlay + mask ---------------- #
    gt_overlay = img_post.copy()
    gt_mask_color = None
    mask_path = info.get('target_mask') or info.get('mask_image')

    if mask_path and os.path.exists(mask_path):
        mask_gt = np.array(Image.open(mask_path))
        if mask_gt.ndim == 3:  # already RGB
            gt_mask_color = mask_gt
            mask_gt = cv2.cvtColor(mask_gt, cv2.COLOR_RGB2GRAY)
            mask_gt_shifted = mask_gt - 1  # 1â€“4 â†’ 0â€“3
        else:
            mask_gt_shifted = mask_gt - 1  # shift 1â€“4 â†’ 0â€“3
            gt_mask_color = np.zeros((*mask_gt.shape, 3), dtype=np.uint8)
            for dmg, color in DAMAGE_COLORS.items():
                gt_mask_color[mask_gt_shifted == dmg] = color

        # Overlay ground truth
        for dmg, color in DAMAGE_COLORS.items():
            mask_binary = (mask_gt_shifted == dmg).astype(np.uint8)
            gt_overlay[mask_binary == 1] = (
                0.6 * gt_overlay[mask_binary == 1] + 0.4 * np.array(color)
            ).astype(np.uint8)

    # ---------------- Plotting ---------------- #
    fig, axes = plt.subplots(1, 5, figsize=(25, 6))

    axes[0].imshow(img_pre)
    axes[0].set_title("Pre-Disaster Image")

    axes[1].imshow(img_post)
    axes[1].set_title("Post-Disaster Image")

    axes[2].imshow(overlay_pred)
    axes[2].set_title("Predicted Damage Overlay")

    axes[3].imshow(gt_overlay)
    axes[3].set_title("Ground Truth Overlay")

    if gt_mask_color is not None:
        axes[4].imshow(gt_mask_color)
        axes[4].set_title("Ground Truth Mask (Colored)")
    else:
        axes[4].imshow(np.zeros_like(img_post))
        axes[4].set_title("No Ground Truth Mask")

    for ax in axes:
        ax.axis('off')

    # Legend
    legend_patches = [
        mpatches.Patch(color=np.array(c) / 255, label=f"Damage {i}")
        for i, c in DAMAGE_COLORS.items()
    ]
    plt.legend(
        handles=legend_patches,
        loc='lower center',
        bbox_to_anchor=(1.1, -0.1),
        ncol=4,
        frameon=False
    )

    plt.tight_layout()
    plt.show()


def visualize_random_samples(train_dict, seg_model, dmg_model, num_samples=5):
    keys = list(train_dict.keys())
    random_keys = random.sample(keys, min(num_samples, len(keys)))

    for i, key in enumerate(random_keys, 1):
        print(f"\nðŸ”¹ Visualizing sample {i}/{len(random_keys)} â†’ {key}")
        info = train_dict[key]
        visualize_predictions_with_gt(info, seg_model, dmg_model)


# --- Example Run --- #
visualize_random_samples(train_dict, model, dc_model, num_samples=15)


Output hidden; open in https://colab.research.google.com to view.

In [None]:
def debug_label_offset(train_dict, seg_model, dmg_model, num_samples=5):

    import random
    keys = random.sample(list(train_dict.keys()), min(num_samples, len(train_dict)))

    offset_counts = []  # Store GT - Pred for analysis

    for key in keys:
        info = train_dict[key]
        preds = extract_buildings_and_predict_damage(info, seg_model, dmg_model)

        if len(preds) == 0:
            print(f"{key}: No buildings detected")
            continue

        print(f"\nSample: {key}")
        for p in preds:
            pred_class = p["damage_level_pred"]
            true_class = p["damage_level_true"]
            if true_class is None:
                continue
            offset = pred_class - true_class
            offset_counts.append(offset)
            print(f"Building {p['building_id']}: GT={true_class}, Pred={pred_class}, Offset={offset}")

    # Summary of offsets
    if offset_counts:
        import collections
        offset_summary = collections.Counter(offset_counts)
        print("\n--- Offset Summary ---")
        for k, v in offset_summary.items():
            print(f"Offset {k}: {v} instances")
    else:
        print("No valid GT labels found for comparison.")
# Run debug on 5 random samples
debug_label_offset(train_dict, model, dc_model, num_samples=5)



Sample: mexico-earthquake_00000117
Building 1: GT=0, Pred=0, Offset=0
Building 2: GT=0, Pred=0, Offset=0
Building 4: GT=0, Pred=2, Offset=2
Building 5: GT=0, Pred=0, Offset=0
Building 7: GT=0, Pred=0, Offset=0
Building 8: GT=0, Pred=0, Offset=0
Building 9: GT=0, Pred=0, Offset=0
Building 10: GT=0, Pred=0, Offset=0
Building 11: GT=0, Pred=0, Offset=0
Building 12: GT=0, Pred=2, Offset=2
Building 14: GT=0, Pred=0, Offset=0
Building 15: GT=0, Pred=0, Offset=0
Building 16: GT=0, Pred=0, Offset=0
Building 17: GT=0, Pred=0, Offset=0
Building 18: GT=0, Pred=0, Offset=0
Building 19: GT=0, Pred=0, Offset=0
Building 20: GT=0, Pred=0, Offset=0
Building 21: GT=0, Pred=0, Offset=0
Building 24: GT=0, Pred=3, Offset=3
Building 25: GT=0, Pred=0, Offset=0
Building 26: GT=0, Pred=0, Offset=0
Building 27: GT=0, Pred=3, Offset=3
Building 28: GT=0, Pred=1, Offset=1
Building 29: GT=0, Pred=0, Offset=0
Building 31: GT=0, Pred=0, Offset=0
Building 32: GT=0, Pred=0, Offset=0
Building 34: GT=0, Pred=3, Offset=3

socal-fire_00000325