In [None]:
import pandas as pd
import os
import torch
from torch.utils.data import Dataset
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import cv2
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import DataLoader
from typing import List, Dict, Tuple, Optional
from torch import Tensor
from collections import OrderedDict
from torchvision.models.detection.rpn import concat_box_prediction_layers
import torchvision
from collections import Counter
import numpy as np

FILE_PATH = 'labels/BBox_List_2017.csv'
BASE_DIR = 'xray_data'
EXCLUDED_LABELS = ['Nodule', 'Pneumothorax', 'Mass','Pneumonia', 'Infiltrate']

def load_and_filter_data(file_path, exclude_labels):
    data = pd.read_csv(file_path)
    for label in exclude_labels:
        data = data[~data['Finding Label'].str.contains(label, na=False)]
    df_labels = data[['Image Index', 'Finding Label', 'Bbox [x', 'y', 'w', 'h]']]
    df_labels.columns = ['Image Index', 'Finding Label', 'Bbox_x', 'Bbox_y', 'Bbox_w', 'Bbox_h']
    return df_labels

def find_image_path_nested(image_index, base_dir):
    
    for subfolder in sorted(os.listdir(base_dir)):  
        nested_images_path = os.path.join(base_dir, subfolder, 'images')
        if os.path.exists(nested_images_path) and os.path.isdir(nested_images_path):
            image_path = os.path.join(nested_images_path, image_index)
            if os.path.exists(image_path):
                return image_path
    return None
    
def preprocess_data(df_labels: pd.DataFrame, base_dir: str) -> pd.DataFrame:
    
    df_labels['Image Path'] = df_labels['Image Index'].apply(lambda x: find_image_path_nested(x, base_dir))
    df_labels = df_labels.dropna(subset=['Image Path'])
    df_labels['Bbox_xmax'] = df_labels['Bbox_x'] + df_labels['Bbox_w']
    df_labels['Bbox_ymax'] = df_labels['Bbox_y'] + df_labels['Bbox_h']
    return df_labels[['Image Path', 'Finding Label', 'Bbox_x', 'Bbox_y', 'Bbox_xmax', 'Bbox_ymax']]

df_labels = load_and_filter_data(FILE_PATH, EXCLUDED_LABELS)
    
df_labels = preprocess_data(df_labels, BASE_DIR)

class_counts = df_labels['Finding Label'].value_counts()
    
imbalance_ratio = class_counts.max() / class_counts.min()
print("Imbalance Ratio:", imbalance_ratio)



In [None]:
label_counts = df_labels['Finding Label'].value_counts()
fig, ax1 = plt.subplots(1, 1,figsize = (12, 8))
ax1.bar(np.arange(len(label_counts)) + 0.5, label_counts)
ax1.set_xticks(np.arange(len(label_counts)) + 0.5)
_ = ax1.set_xticklabels(label_counts.index, rotation = 90)

In [None]:

df_labels['Bbox_width'] = df_labels['Bbox_xmax'] - df_labels['Bbox_x']
df_labels['Bbox_height'] = df_labels['Bbox_ymax'] - df_labels['Bbox_y']


df_labels['Bbox_area'] = df_labels['Bbox_width'] * df_labels['Bbox_height']


average_bbox_size_per_class = df_labels.groupby('Finding Label')['Bbox_area'].mean()

print("Average Bounding Box Size (Area) per Class:")
print(average_bbox_size_per_class)


In [None]:
label_to_idx = {label: idx for idx, label in enumerate(df_labels['Finding Label'].unique())}
print(label_to_idx)

train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

df_shuffled = df_labels.sample(frac=1, random_state=42).reset_index(drop=True)


n_total = len(df_labels)
n_train = int(n_total * train_ratio)
n_val = int(n_total * val_ratio)


train_data = df_shuffled[:n_train]
val_data = df_shuffled[n_train:n_train + n_val]
test_data = df_shuffled[n_train + n_val:]


print(f"Training set size: {len(train_data)}")
print(f"Validation set size: {len(val_data)}")
print(f"Test set size: {len(test_data)}")

class_counts_train = train_data['Finding Label'].value_counts()
print(class_counts_train)





In [None]:
import torch
from torchvision.transforms import functional as F
import numpy as np
from PIL import Image

import random

class XRayDataset(Dataset):
    def __init__(self, label_to_idx,dataframe, transforms=None):
        
        self.dataframe = dataframe
        self.transforms = transforms
        

    def __len__(self):
        return len(self.dataframe)
    
    def __getitem__(self, idx):
       
        row = self.dataframe.iloc[idx]
    
        img_path = row['Image Path']
        image = Image.open(img_path).convert("RGB")
      
        boxes = torch.tensor([[row['Bbox_x'], row['Bbox_y'], row['Bbox_xmax'], row['Bbox_ymax']]], dtype=torch.float32)
        labels = torch.tensor([label_to_idx[row['Finding Label']]], dtype=torch.int64)

        if self.transforms:
            if isinstance(self.transforms, RandomRotationAndFlip):
                image, boxes = self.transforms(image, boxes)
            else:
                image = self.transforms(image)


        target = {
            "boxes": boxes,
            "labels": labels
        }
        
        
        return image, target



           




In [None]:
import torchvision.transforms as transforms


# Note: Augmentation caused decrease in performance
"""train_transforms = transforms.Compose([
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 2.0)),
    transforms.ToTensor(),
])"""

val_transforms = transforms.Compose([
    transforms.ToTensor(),
    #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    
])


train_dataset = XRayDataset(label_to_idx,train_data, transforms=val_transforms)
val_dataset = XRayDataset(label_to_idx,val_data, transforms=val_transforms)
test_dataset = XRayDataset(label_to_idx,test_data, transforms=val_transforms)


train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = 16, shuffle = False, drop_last = True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size = 16)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size = 24)

for images, targets in train_loader:
    print("Training Image Shape:", images.shape) 
    print("Bounding Boxes (First Sample):", targets['boxes'][0])  
    print("Labels (First Sample):", targets['labels'][0])  
    break




In [None]:

import math
import torch
import torchvision
from torchvision.models.detection import RetinaNet_ResNet50_FPN_Weights

def initialize_retinanet(num_classes, pretrained_weights=True):
   
    weights = RetinaNet_ResNet50_FPN_Weights.DEFAULT if pretrained_weights else None
    model = torchvision.models.detection.retinanet_resnet50_fpn(weights=weights)

    # Classification head
    num_anchors = model.head.classification_head.num_anchors
    cls_logits = torch.nn.Conv2d(
        in_channels=256,
        out_channels=num_anchors * num_classes,
        kernel_size=3,
        stride=1,
        padding=1
    )
    
    prior_probability = 0.01
    torch.nn.init.normal_(cls_logits.weight, std=prior_probability) # cls weights
    torch.nn.init.constant_(cls_logits.bias, -math.log((1 - prior_probability) / prior_probability)) # cls bias


    # Modify cls head 
    model.head.classification_head.num_classes = num_classes
    model.head.classification_head.cls_logits = cls_logits

    # Freeze resnet50 backbone parameters
    for param in model.parameters():
        param.requires_grad = False

    # Unfreeze detection head: cls and regression
    for param in model.head.classification_head.parameters():
        param.requires_grad = True
    for param in model.head.regression_head.parameters():
        param.requires_grad = True

    return model




In [None]:
def train_one_epoch(model,device,train_loader,batch_size):
    model.train()
    train_loss = 0
    total_samples = 0 
    for i, (images, targets) in enumerate(train_loader):
        losses = 0
        for j in range(len(images)):

            image = images[j].unsqueeze(0).to(device)

            boxes = targets["boxes"][j].to(device)
            labels = targets["labels"][j].to(device)
            target = [{"boxes":boxes, "labels":labels}]

            loss_dictionary = model(image, target)
            loss = sum(loss for loss in loss_dictionary.values())
            losses += loss
            train_loss += loss.item()
             
        losses /= batch_size
       
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if i % 10 == 9:
            print("epoch:", epoch + 1,  "iteration:", i + 1,  "loss:", loss.item())
            
    avg_train_loss = train_loss / ((i + 1) * batch_size)

    return avg_train_loss

def val_one_epoch(model,device,val_loader,batch_size):
    model.train()
    val_loss = 0
    total_samples = 0 
    for i, (images, targets) in enumerate(val_loader):
        
        losses = 0
        for j in range(len(images)):

            image = images[j].unsqueeze(0).to(device)

            boxes = targets["boxes"][j].to(device)
            labels = targets["labels"][j].to(device)
            target = [{"boxes":boxes, "labels":labels}]

            loss_dictionary = model(image, target)
            loss = sum(loss for loss in loss_dictionary.values())
            losses += loss
            val_loss += loss.item()
             
        losses /= batch_size

        if i % 10 == 9:
            print("epoch:", epoch + 1,  "iteration:", i + 1,  "loss:", loss.item())
            
    avg_val_loss = val_loss / ((i + 1) * batch_size)

    return avg_val_loss

    
    

In [None]:
num_classes = len(label_to_idx) + 1
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = initialize_retinanet(num_classes)
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr = 0.005, momentum = 0.9, weight_decay = 0.0005)


train_losses = []
val_losses = []
epoch_number = []

epochs = 10
batch_size = 16

for epoch in range(epochs):
    train_loss = train_one_epoch(model,device, train_loader, batch_size)
    train_losses.append(train_loss)
    print("Train loss:", train_loss)
 
    val_loss = val_one_epoch(model, device,val_loader,batch_size)
    val_losses.append(val_loss)
    print("Val loss: ", val_loss)
    epoch_number.append(epoch + 1)
    
    
 

In [None]:
plt.plot(epoch_number, train_losses, label = "Training Loss")
plt.plot(epoch_number, val_losses, label = "Validation Loss")
plt.legend(['Train', 'Validation'], loc = 'upper right')
plt.title("Training vs Validation Loss")
plt.xlabel('epoch')
plt.ylabel('loss')
plt.savefig('results_retinanet/loss_retina.png', dpi=300) 
plt.show()

In [None]:
idx_to_label = {idx: label for label, idx in label_to_idx.items()}

model.eval()
results = []

with torch.no_grad():
    for images, targets in test_loader:
        images = images.cuda()
        predictions = model(images)

        for i in range(len(images)):
    
            scores = predictions[i]["scores"].cpu().numpy()
            if len(scores) > 0: 
                max_score_idx = scores.argmax()
                best_score = scores[max_score_idx]
                best_box = predictions[i]["boxes"][max_score_idx].cpu().numpy()
                best_label_idx = predictions[i]["labels"][max_score_idx].item()

                best_label = idx_to_label.get(best_label_idx, "Unknown")
            else:
                best_score = None
                best_box = None
                best_label = None

    
            true_boxes = targets["boxes"][i].cpu().numpy()
            true_labels = [idx_to_label[label.item()] for label in targets["labels"][i]]

            results.append({
                "image_index": i,
                "best_score": best_score,
                "predicted_box": best_box,
                "predicted_label": best_label,
                "true_boxes": true_boxes,
                "true_labels": true_labels
            })



In [None]:

for result in results:
    print(f"Image Index: {result['image_index']}")
    print(f"Best Score: {result['best_score']}")
    print(f"Predicted Box: {result['predicted_box']}")
    print(f"Predicted Label: {result['predicted_label']}")
    print(f"True Boxes: {result['true_boxes']}")
    print(f"True Labels: {result['true_labels']}")


In [None]:
from sklearn.metrics import precision_recall_fscore_support

def calculate_iou(box1, box2):
    x_min = max(box1[0], box2[0])
    y_min = max(box1[1], box2[1])
    x_max = min(box1[2], box2[2])
    y_max = min(box1[3], box2[3])


    intersection = max(0, x_max - x_min) * max(0, y_max - y_min)
    area_box1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area_box2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union = area_box1 + area_box2 - intersection

    if union == 0:
        return 0

    return intersection / union



true_positives = 0
false_positives = 0
false_negatives = 0

iou_theshold = 0.5
ious = []


for result in results:
    true_boxes = result["true_boxes"]
    predicted_box = result["predicted_box"]
    best_label = result["predicted_label"]
    true_labels = result["true_labels"]

    if predicted_box is not None:
       
        iou_scores = [calculate_iou(predicted_box, true_box) for true_box in true_boxes]
        max_iou = max(iou_scores) if iou_scores else 0
        ious.append(max_iou)

        if max_iou > iou_theshold:  
            true_positives += 1
        else:
            false_positives += 1
    else:
        
        false_negatives += len(true_boxes)


precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0


mean_iou = sum(ious) / len(ious) if ious else 0

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Mean IoU: {mean_iou:.4f}")


In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report


true_labels_all = []
predicted_labels_all = []

for result in results:
   
    true_labels_all.extend([label for label in result["true_labels"]])

    
    predicted_labels_all.append(result["predicted_label"])

true_indices = [label_to_idx[label] for label in true_labels_all if label in label_to_idx]
predicted_indices = [label_to_idx[label] for label in predicted_labels_all if label in label_to_idx]


conf_matrix = confusion_matrix(true_indices, predicted_indices)


labels = list(label_to_idx.keys())
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=labels)

fig, ax = plt.subplots(figsize=(10, 10))
disp.plot(ax=ax, cmap="Blues", xticks_rotation=45)
plt.title("Confusion Matrix")
plt.savefig('results_retinanet/cm_retina.png', dpi=300) 
plt.show()

report = classification_report(true_indices, predicted_indices, target_names=list(label_to_idx.keys()))
print("Classification Report:\n", report)


In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

def plot_results(image, true_boxes, true_labels, predicted_box, predicted_label, predicted_score):
    " Code generate by ChatGPT4"
    
    image = image.permute(1, 2, 0).cpu().numpy() 

    fig, ax = plt.subplots(1, figsize=(10, 10))
    ax.imshow(image)

    for i, box in enumerate(true_boxes):
        x_min, y_min, x_max, y_max = box
        width, height = x_max - x_min, y_max - y_min
        rect = patches.Rectangle(
            (x_min, y_min), width, height,
            linewidth=2, edgecolor='green', facecolor='none', label="True" if i == 0 else ""
        )
        ax.add_patch(rect)
        ax.text(
            x_min, y_min - 5,
            f"True: {true_labels[i]}",
            color="green",
            fontsize=10,
            bbox=dict(facecolor="white", alpha=0.5)
        )

    if predicted_box is not None:
        x_min, y_min, x_max, y_max = predicted_box
        width, height = x_max - x_min, y_max - y_min
        rect = patches.Rectangle(
            (x_min, y_min), width, height,
            linewidth=2, edgecolor='red', facecolor='none', label="Predicted"
        )
        ax.add_patch(rect)
        ax.text(
            x_min, y_min - 20,
            f"Pred: {predicted_label} ({predicted_score:.2f})",
            color="red",
            fontsize=10,
            bbox=dict(facecolor="white", alpha=0.5)
        )

    ax.legend()
    plt.axis('off')
    plt.show()



model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
with torch.no_grad():
    for images, targets in test_loader:
        images = images.cuda()
        predictions = model(images)
        
        for i in range(len(images)):
    
            scores = predictions[i]["scores"].cpu().numpy()
            if len(scores) > 0: 
                max_score_idx = scores.argmax()
                best_score = scores[max_score_idx]
                best_box = predictions[i]["boxes"][max_score_idx].cpu().numpy()
                best_label_idx = predictions[i]["labels"][max_score_idx].item()

                best_label = idx_to_label.get(best_label_idx, "Unknown")
            else:
                best_score = None
                best_box = None
                best_label = None


    
            true_boxes = targets["boxes"][i].cpu().numpy()
            true_labels = [idx_to_label[label.item()] for label in targets["labels"][i]]
            
            plot_results(images[i], true_boxes, true_labels, best_box, best_label, best_score)
        


