In [None]:
#===============================================================
# DEBUG TOGGLE SWITCH
def d(*args, **kwargs):
    if d.ON:
        print(*args, **kwargs)
d.ON = True
#===============================================================

#================================================
if __name__ == "__main__":
  try:
    import segmentation_models_pytorch as smp
  except ImportError:
    !pip install -q segmentation_models_pytorch timm
    import segmentation_models_pytorch as smp
#============================================================0

#=============================================================
import os
import numpy as np
import torch
import torch.nn.functional as F
from torchvision import transforms as T
from PIL import Image
from sklearn.metrics import precision_score, recall_score, f1_score, jaccard_score, roc_auc_score, roc_curve
import pandas as pd
from tqdm import tqdm
#===================================================================


#=====================================================================00
# PATHS SETTING
root_dir = "/content/drive/MyDrive/MAGISTRALE/ANNO 1/Computer Vision/Project/RoadObstacleDetection"
frames_dir = os.path.join(root_dir, "Datasets/RoadAnomaly_jpg")
split_file = os.path.join(root_dir, "ProjectWorkspace/splits/roadAnomaly_valid_pairs.txt")
ckpt_path = os.path.join(root_dir, "ProjectWorkspace/ckpts/updated_loss_model.pth")
lambda_path = os.path.join(root_dir, "ProjectWorkspace/src/eval/lambda_hat_3rdattempt.txt")
output_csv = os.path.join(root_dir, "ProjectWorkspace/eval/roadAnomaly_metrics.csv")
#================================================================================0

#==================================================================================
# NAVIGATION TO GET TO THE FOLDER WITH STUFF INSIDE
import sys
sys.path.append("/content/drive/MyDrive/MAGISTRALE/ANNO 1/Computer Vision/Project/RoadObstacleDetection/ProjectWorkspace/src")

#==============================================================================
# MODEL LOADING
from network.deeplab_dualhead import get_model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = get_model().to(device)
model.load_state_dict(torch.load(ckpt_path, map_location = device))
model.eval()
#=================================================================================

#================================================================================
# CALIB LOADING
with open(lambda_path, "r") as f:
    lambda_hat = float(f.read().strip())
d(f"Loaded lambda_hat = {lambda_hat}")
#===================================================================================


#===================================================================================
# PREPROCESSING
transform = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225])
])
#==================================================================================0


#======================================================================
# INFERENCE
threshold_objectness = 0.51
results = []
all_gt_pixels = []
all_scores = []

with open(split_file, "r") as f:
  rel_paths = f.read().splitlines()

for rel_path in tqdm(rel_paths):
  rgb_path = os.path.join(frames_dir, rel_path)
  label_path = rgb_path.replace(".jpg", ".labels/labels_semantic.png")

  rgb_img = Image.open(rgb_path).convert("RGB")
  input_tensor = transform(rgb_img).unsqueeze(0).to(device)

  with torch.no_grad():
    seg_logits, obj_logits = model(input_tensor)
    seg_logits = F.interpolate(seg_logits, size=rgb_img.size[::-1], mode="bilinear", align_corners=False)
    obj_logits = F.interpolate(obj_logits, size=rgb_img.size[::-1], mode="bilinear", align_corners=False)

    softmax = torch.softmax(seg_logits, dim=1)
    objectness = torch.sigmoid(obj_logits)

    conf_score, _ = torch.max(softmax, dim=1)
    nonconformity = 1.0 - conf_score
    unknown_mask = (nonconformity > lambda_hat).squeeze(0).cpu().numpy()
    object_mask = (objectness > threshold_objectness).squeeze().cpu().numpy()
    obstacle_mask = (unknown_mask & object_mask).astype(np.uint8)
    obstacle_score = objectness.squeeze().cpu().numpy() * nonconformity.squeeze().cpu().numpy()

  gt_mask = np.array(Image.open(label_path))
  gt_anomaly = (gt_mask == 2).astype(np.uint8)

  pred_flat = obstacle_mask.flatten()
  gt_flat = gt_anomaly.flatten()
  score_flat = obstacle_score.flatten()

  precision = precision_score(gt_flat, pred_flat, zero_division=0)
  recall = recall_score(gt_flat, pred_flat, zero_division=0)
  f1 = f1_score(gt_flat, pred_flat, zero_division=0)
  iou = jaccard_score(gt_flat, pred_flat, zero_division=0)

  results.append({
      "image": rel_path,
      "precision": precision,
      "recall": recall,
      "f1": f1,
      "iou": iou
  })

  all_gt_pixels.append(gt_flat)
  all_scores.append(score_flat)

df = pd.DataFrame(results)
df.to_csv(output_csv, index=False)
d(f"Saved metrics to {output_csv}")
#=============================================================================


#===========================================================================0
# AGGREGATED METRICS — FULL vs. TRIMMED
metrics = ["precision", "recall", "f1", "iou"]

# GLOBAL
global_stats = df[metrics].agg(["mean", "std", "min", "max", "median"])
d("\n== GLOBAL METRICS (all images) ==")
d(global_stats)

# GLOBAL SCORE-BASED
all_gt_pixels = np.concatenate(all_gt_pixels)
all_scores = np.concatenate(all_scores)

# AUROC
auroc = roc_auc_score(all_gt_pixels, all_scores)
# FPR@95TPR
fpr, tpr, thresholds = roc_curve(all_gt_pixels, all_scores)
fpr95 = fpr[np.argmax(tpr >= 0.95)]

d(f"\n== GLOBAL SCORE-BASED METRICS ==")
d(f"AUROC = {auroc:.6f}")
d(f"FPR@95TPR = {fpr95:.6f}")
#==============================================================================


# ONE SIDED LOWER ALPHA-TRIMMED EVALUATION  (about IoU):
#we noticed a lot of images are not reliable (RoadAnomaly is highly extreme in terms of OOD data wrt to training data we had to use) to perform inference, based on Cityscapes training, for example rural areas looking very different from cityscapes
alpha = 0.30
q_alpha = df["iou"].quantile(alpha)
df_trimmed = df[df["iou"] > q_alpha]
d(f"\nExcluded {len(df) - len(df_trimmed)} samples (bottom {int(alpha*100)}% IoU)")
trimmed_stats = df_trimmed[metrics].agg(["mean", "std", "min", "max", "median"])
d(f"== TRIMMED METRICS (top {100 - int(alpha*100)}% samples) ==")
d(trimmed_stats)
#=====================================================================



In [None]:
#============================================================
# DISPLAY PREDICTIONS FOR PRESENTATION
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import LinearSegmentedColormap

save_outputs = True # toggling until they're good to be saved

metrics_df = pd.read_csv(output_csv)

q_alpha = metrics_df["iou"].quantile(alpha) #visibility as variable, high cohesion low coupling
q_top10 = metrics_df["iou"].quantile(0.9)

top_samples = set(metrics_df[metrics_df["iou"] > q_top10]["image"])
good_samples = set(metrics_df[metrics_df["iou"] > q_alpha]["image"])

save_dir_root = os.path.join(root_dir, "ProjectWorkspace/eval/RoadAnomalyInferenceImages")
save_top_root = os.path.join(save_dir_root, "TopSamples")
save_trimmed_root = os.path.join(save_dir_root, "TrimmedSamples")
os.makedirs(save_dir_root, exist_ok=True)
os.makedirs(save_top_root, exist_ok=True)
os.makedirs(save_trimmed_root, exist_ok=True)

for sample in results:
  rel_path = sample["image"]
  rgb_path = os.path.join(frames_dir, rel_path)
  label_path = rgb_path.replace(".jpg", ".labels/labels_semantic.png")
  name = os.path.splitext(os.path.basename(rgb_path))[0]

  is_top = rel_path in top_samples
  is_good = rel_path in good_samples
  should_save = save_outputs
  if should_save:
    if is_top:
      save_dir = os.path.join(save_top_root, name) #save_dir assumes this value for top samples (need em to show in which conditions the model performs well and the uncertainty is low)
    elif is_good:
      save_dir = os.path.join(save_dir_root, name) #save_dir assumes this value for good samples
    else:
      save_dir = os.path.join(save_trimmed_root, name) #save_dir assumes this value for trimmed samples (need em as well to be shown in the presentation and justify the choice)
    os.makedirs(save_dir, exist_ok=True)

  rgb_img = Image.open(rgb_path).convert("RGB")

  gt_mask = np.array(Image.open(label_path))
  gt_anomaly = (gt_mask == 2).astype(np.uint8)

  input_tensor = transform(rgb_img).unsqueeze(0).to(device)
  with torch.no_grad():
      seg_logits, obj_logits = model(input_tensor)
      seg_logits = F.interpolate(seg_logits, size=rgb_img.size[::-1], mode="bilinear", align_corners=False)
      obj_logits = F.interpolate(obj_logits, size=rgb_img.size[::-1], mode="bilinear", align_corners=False)
      softmax = torch.softmax(seg_logits, dim=1)
      objectness = torch.sigmoid(obj_logits)
      conf_score, _ = torch.max(softmax, dim=1)
      nonconformity = 1.0 - conf_score
      unknown_mask = (nonconformity > lambda_hat).squeeze(0).cpu().numpy()
      object_mask = (objectness > threshold_objectness).squeeze().cpu().numpy()
      obstacle_mask = (unknown_mask & object_mask).astype(np.uint8)
      conf_map = conf_score.squeeze().cpu().numpy()
      obstacle_score = (objectness.squeeze().cpu().numpy()) * (nonconformity.squeeze().cpu().numpy())


  # VISUAL DISPLAY (always)
  fig, axs = plt.subplots(1, 7, figsize=(42, 6))
  axs[0].imshow(rgb_img)
  axs[0].set_title("RGB Image")
  axs[1].imshow(gt_anomaly, cmap="gray")
  axs[1].set_title("GT Anomaly")
  axs[2].imshow(unknown_mask, cmap="gray")
  axs[2].set_title("Unknown Mask")
  axs[3].imshow(object_mask, cmap="gray")
  axs[3].set_title("Objectness Mask")
  axs[4].imshow(obstacle_mask, cmap="gray")
  axs[4].set_title("Obstacle = Unknown AND Object")

  # CONFIDENCE MAP USING ONLY SOFTMAX
  im1 = axs[5].imshow(conf_map, cmap="viridis", vmin=0.0, vmax=1.0)
  axs[5].set_title("Confidence Map (Max Softmax)")
  cbar1 = fig.colorbar(im1, ax=axs[5], fraction=0.046, pad=0.04)
  cbar1.set_label("Confidence scale")
  # CONFIDENCE MAP USING SOFTMAX AND SIGMOID HEAD COMBINED
  im2 = axs[6].imshow(obstacle_score, cmap="viridis_r", vmin=0.0, vmax=1.0)  #doing viridis"_r" in order to revert the behaviour of the viridis heatmap and return an image prone to be compared to the other confidence map
  axs[6].set_title("Obstacle Score = Objectness combined with Nonconformity")
  cbar2 = fig.colorbar(im2, ax=axs[6], fraction=0.046, pad=0.04)
  cbar2.set_label("Obstacle Score")

  for ax in axs:
      ax.axis("off")
  plt.suptitle(name, fontsize=18)
  plt.tight_layout()
  plt.show()

  # PER-IMAGE SAVING (only if save_outputs and in top samples)
  if should_save:
      rgb_img.save(os.path.join(save_dir, "rgb.png"))

      #Softmax confidence map saving
      fig, ax = plt.subplots(figsize=(6, 5))
      im = ax.imshow(conf_map, cmap="viridis", vmin=0.0, vmax=1.0)
      ax.axis("off")
      cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
      cbar.set_label("Confidence scale")
      plt.savefig(os.path.join(save_dir, "confidence_map.png"), dpi=200)
      plt.close()

      #Softmax and sigmoid confidence map saving
      fig, ax = plt.subplots(figsize=(6, 5))
      im = ax.imshow(obstacle_score, cmap="viridis_r", vmin=0.0, vmax=1.0)
      ax.axis("off")
      cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
      cbar.set_label("Obstacle Score")
      plt.savefig(os.path.join(save_dir, "obstacle_score.png"), dpi=200)
      plt.close()

      if is_good: #additional masks only for good samples
        Image.fromarray((gt_anomaly * 255).astype(np.uint8)).save(os.path.join(save_dir, "gt_anomaly.png"))
        Image.fromarray((unknown_mask * 255).astype(np.uint8)).save(os.path.join(save_dir, "unknown_mask.png"))
        Image.fromarray((object_mask * 255).astype(np.uint8)).save(os.path.join(save_dir, "object_mask.png"))
        Image.fromarray((obstacle_mask * 255).astype(np.uint8)).save(os.path.join(save_dir, "obstacle_mask.png"))
#===============================================================
