## Model Evaluation for Sewer Defect Classification

This notebook implements comprehensive evaluation procedures for the FINE-TUNED sewer defect classification model, including confusion matrices and specialized metrics.

### 1. Library Imports and Setup

In [None]:
import os
import torch
import torch.nn as nn
import torchvision.models as models
import pandas as pd
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, Subset
from PIL import Image
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.nn import BCEWithLogitsLoss
from sklearn.metrics import precision_recall_fscore_support
import wandb
from torchvision.utils import save_image
import numpy as np
from sklearn.model_selection import train_test_split

### 2. Dataset Preparation

In [None]:
# Custom dataset class
class CustomDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        img_path = f"{self.img_dir}/{self.annotations.iloc[idx, 0]}"
        image = Image.open(img_path).convert("RGB")
        labels = torch.tensor(self.annotations.iloc[idx, 1:].astype('float32').values)
        if self.transform:
            image = self.transform(image)
        return image, labels

In [None]:
inference_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.523, 0.453, 0.345], std=[0.210, 0.199, 0.154])
])

In [None]:
test_dataset = CustomDataset(csv_file='{YOUR_PROJECT_ROOT}/data/fine_tuning/annotations/test/test_labels.csv', 
                            img_dir='{YOUR_PROJECT_ROOT}/data/fine_tuning/images/test', transform=inference_transform)

test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=8, pin_memory=True)

### 3. Model Evaluation Function

In [None]:
import torch
from sklearn.metrics import precision_recall_fscore_support

def evaluate_model(model, dataloader, threshold=0.5, save_images=False):
    model.eval()
    all_labels = []
    all_outputs = []
    images = []
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            all_labels.append(labels.cpu())
            all_outputs.append(outputs.cpu())
            if save_images:
                images.append(inputs.cpu())

    # Concatenate all batches
    all_labels = torch.cat(all_labels)
    all_outputs = torch.cat(all_outputs)

    # Apply sigmoid and thresholding
    all_outputs = torch.sigmoid(all_outputs)
    all_outputs = (all_outputs > threshold).float()

    # Derive "ND" (no defect) as 18th class
    labels_nd = (all_labels.sum(dim=1) == 0).float().unsqueeze(1)
    outputs_nd = (all_outputs.sum(dim=1) == 0).float().unsqueeze(1)

    # Extend label and output tensors
    labels_ext = torch.cat([all_labels, labels_nd], dim=1)
    outputs_ext = torch.cat([all_outputs, outputs_nd], dim=1)

    # Convert to numpy
    y_true = labels_ext.numpy()
    y_pred = outputs_ext.numpy()

    # Compute per-class metrics
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average=None, zero_division=0)
    overall_precision, overall_recall, overall_f1, _ = precision_recall_fscore_support(y_true, y_pred, average='micro', zero_division=0)

    for i in range(len(precision)):
        class_label = f'Class {i}' if i < labels_ext.shape[1] - 1 else 'Class 17 (ND)'
        print(f'{class_label} - Precision: {precision[i]:.4f}, Recall: {recall[i]:.4f}, F1 Score: {f1[i]:.4f}')

    print(f'\nOverall - Precision: {overall_precision:.4f}, Recall: {overall_recall:.4f}, F1 Score: {overall_f1:.4f}')
    return y_true, y_pred

### 4. Model Loading and Testing

In [None]:
import torch
from torch.utils.data import DataLoader
from sklearn.metrics import precision_recall_fscore_support

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the model structure (same as during training)
model = models.resnet101(weights=None)

num_ftrs = model.fc.in_features
model.fc = torch.nn.Sequential(
    torch.nn.Linear(num_ftrs, 17),
    # torch.nn.Sigmoid()
)

# Load the saved model weights
model.load_state_dict(torch.load("{YOUR_PROJECT_ROOT}/checkpoint/fine_tuning/{FINE_TUNED_MODEL_WEIGHT}", map_location=device))

# Move model to GPU if available

model = model.to(device)


In [None]:
y_true, y_pred = evaluate_model(model, test_loader)

### 5. Advanced Metric Analysis
- Class Importance Weighted (CIW) scores
- Per-class F2 scores
- Normal class F1 score (ND)
- Mean Average Precision

In [None]:
from metrics import evaluation

# Example: Replace with the actual CIW weights from the Sewer-ML paper
# These should sum to 1 and have length 18 (17 classes + ND)
ciw_weights = np.array([
    1.0000,  # RB
    0.5518,  # OB
    0.2896,  # PF
    0.1622,  # DE
    0.6419,  # FS
    0.1847,  # IS
    0.3559,  # RO
    0.3131,  # IN
    0.0811,  # AF
    0.2275,  # BE
    0.2477,  # FO
    0.0901,  # GR
    0.4167,  # PH
    0.4167,  # PB
    0.9009,  # OS
    0.3829,  # OP
    0.4396   # OK
])

# y_true and y_pred should be numpy arrays, shape (num_samples, 18)
# y_pred can be logits or probabilities; use logits if using BCEWithLogitsLoss

# If your model outputs are logits, use them directly; if probabilities, use np.logit if needed
# Here, assume you have already thresholded your predictions at 0.5

# Example: y_pred = model outputs after sigmoid and thresholding
# y_true = ground truth labels

# If you have torch tensors, convert to numpy:
# y_true = y_true_tensor.numpy()
# y_pred = y_pred_tensor.numpy()
y_true_defects = y_true[:, :17]
y_pred_defects = y_pred[:, :17]


new_metrics, main_metrics, aux_metrics = evaluation(y_pred_defects, y_true_defects, ciw_weights, threshold=0.5)
f1_normal = new_metrics["F1_Normal"]
print("F1-score for ND (Normal):", f1_normal)
print("Main metrics:", main_metrics)
print("Class-weighted F2 (CIW-F2):", new_metrics["F2"])
print("Per-class F2:", new_metrics["F2_class"])
print("Macro F1:", main_metrics["MF1"])
print("Micro F1:", main_metrics["mF1"])
print("Mean Average Precision (mAP):", main_metrics["mAP"])
print("Exact Match Accuracy:", main_metrics["EMAcc"])

### 6. Confusion Matrix Visualization
- Focused TP/FP/FN view

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import numpy as np

def create_single_large_confusion_matrix(y_true, y_pred, class_names, include_nd=True):
    """Creates confusion matrices for multi-label classification."""

    if include_nd:
        y_true_nd = (y_true.sum(axis=1) == 0).astype(int).reshape(-1, 1)
        y_pred_nd = (y_pred.sum(axis=1) == 0).astype(int).reshape(-1, 1)
        
        y_true_extended = np.concatenate([y_true, y_true_nd], axis=1)
        y_pred_extended = np.concatenate([y_pred, y_pred_nd], axis=1)
        class_names_extended = class_names + ['ND (No Defect)']
    else:
        y_true_extended = y_true
        y_pred_extended = y_pred
        class_names_extended = class_names
    
    n_classes = y_true_extended.shape[1]
    n_samples = y_true_extended.shape[0]
    
    confusion_matrices = []
    for i in range(n_classes):
        cm = confusion_matrix(y_true_extended[:, i], y_pred_extended[:, i])
        if cm.shape == (1, 1):
            if y_true_extended[:, i].sum() == 0:  
                cm = np.array([[cm[0, 0], 0], [0, 0]])
            else:  
                cm = np.array([[0, 0], [0, cm[0, 0]]])
        confusion_matrices.append(cm)
    
    return confusion_matrices, class_names_extended

def plot_single_large_confusion_matrix(y_true, y_pred, class_names, include_nd=True, figsize=(25, 20)):
    """Plots confusion matrix for each class."""
    
    confusion_matrices, class_names_extended = create_single_large_confusion_matrix(
        y_true, y_pred, class_names, include_nd
    )
    
    n_classes = len(class_names_extended)
    
    fig, axes = plt.subplots(3, 6, figsize=figsize) 
    axes = axes.flatten()
    
    for i, (cm, class_name) in enumerate(zip(confusion_matrices, class_names_extended)):
        if i < len(axes):
            ax = axes[i]
            
            sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
                       xticklabels=['Pred: No', 'Pred: Yes'],
                       yticklabels=['True: No', 'True: Yes'],
                       cbar=False)
            
            tn, fp, fn, tp = cm[0,0], cm[0,1], cm[1,0], cm[1,1]
            
            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
            
            ax.set_title(f'{class_name}\nP:{precision:.3f} R:{recall:.3f} F1:{f1:.3f}', 
                        fontsize=10, pad=10)
            ax.set_xlabel('Predicted', fontsize=8)
            ax.set_ylabel('Actual', fontsize=8)
            ax.tick_params(labelsize=8)
    
    # Kullanılmayan subplotları gizle
    for i in range(len(class_names_extended), len(axes)):
        axes[i].set_visible(False)
    
    plt.suptitle('Multi-Label Confusion Matrix - All Classes', fontsize=16, y=0.98)
    plt.tight_layout()
    plt.show()
    
    # Özet istatistikler tablosu
    print("\n" + "="*100)
    print("MULTI-LABEL CONFUSION MATRIX SUMMARY")
    print("="*100)
    print(f"{'Class':<20} {'TN':<8} {'FP':<8} {'FN':<8} {'TP':<8} {'Precision':<10} {'Recall':<10} {'F1-Score':<10}")
    print("-"*100)
    
    total_tp = total_fp = total_fn = total_tn = 0
    
    for i, (cm, class_name) in enumerate(zip(confusion_matrices, class_names_extended)):
        tn, fp, fn, tp = cm[0,0], cm[0,1], cm[1,0], cm[1,1]
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        print(f"{class_name:<20} {tn:<8} {fp:<8} {fn:<8} {tp:<8} {precision:<10.4f} {recall:<10.4f} {f1:<10.4f}")
        
        total_tp += tp
        total_fp += fp
        total_fn += fn
        total_tn += tn
    
    # Genel metrikler
    overall_precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0
    overall_recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0
    overall_f1 = 2 * (overall_precision * overall_recall) / (overall_precision + overall_recall) if (overall_precision + overall_recall) > 0 else 0
    
    print("-"*100)
    print(f"{'OVERALL':<20} {total_tn:<8} {total_fp:<8} {total_fn:<8} {total_tp:<8} {overall_precision:<10.4f} {overall_recall:<10.4f} {overall_f1:<10.4f}")
    print("="*100)


# Defect classes
class_names = [
    'RB', 'OB', 'PF', 'DE', 'FS', 'IS', 'RO', 'IN', 
    'AF', 'BE', 'FO', 'GR', 'PH', 'PB', 'OS', 'OP', 'OK'
]

In [None]:
# Tek büyük confusion matrix oluştur ve çiz
# ND sınıfı dahil (include_nd=True) veya hariç (include_nd=False)
plot_single_large_confusion_matrix(y_true[:, :17], y_pred[:, :17], class_names, include_nd=True)

### 7. Error Analysis Export
- Export TP/FP/FN cases to CSV
- Include filenames for manual inspection
- Categorize errors by defect type

In [None]:
def export_tp_fp_fn_csv(y_true, y_pred, dataset, class_names, output_path, include_nd=True):
    """Export per-class TP, FP, FN filenames as a CSV."""
    if include_nd:
        class_names_extended = class_names + ['ND (No Defect)']
    else:
        class_names_extended = class_names
        y_true = y_true[:, :len(class_names)]
        y_pred = y_pred[:, :len(class_names)]
    filenames = dataset.annotations.iloc[:, 0].tolist()
    records = []
    for idx, cls_name in enumerate(class_names_extended):
        true_col = y_true[:, idx]
        pred_col = y_pred[:, idx]
        tp_indices = np.where((true_col == 1) & (pred_col == 1))[0]
        fp_indices = np.where((true_col == 0) & (pred_col == 1))[0]
        fn_indices = np.where((true_col == 1) & (pred_col == 0))[0]
        for i in tp_indices:
            records.append({'class': cls_name, 'filename': filenames[i], 'category': 'True Positive'})
        for i in fp_indices:
            records.append({'class': cls_name, 'filename': filenames[i], 'category': 'False Positive'})
        for i in fn_indices:
            records.append({'class': cls_name, 'filename': filenames[i], 'category': 'False Negative'})
    df = pd.DataFrame(records)
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    df.to_csv(output_path, index=False)
    return df

csv_path = '/workspace/finetuning/results/tp_fp_fn_examples.csv'
tp_fp_fn_df = export_tp_fp_fn_csv(y_true, y_pred, test_dataset, class_names, csv_path, include_nd=True)
print(f"Saved TP/FP/FN details to {csv_path}")
tp_fp_fn_df.head()