In [6]:
import torch
import torch.nn as nn

NUM_LANDMARKS = 8
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=NUM_LANDMARKS):
        super(UNet, self).__init__()
        
        def conv_block(in_channels, out_channels):
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True)
            )
        
        def up_conv(in_channels, out_channels):
            return nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
        
        # Encoder
        self.encoder1 = conv_block(in_channels, 64)
        self.encoder2 = conv_block(64, 128)
        self.encoder3 = conv_block(128, 256)
        self.encoder4 = conv_block(256, 512)
        
        # Pooling with ceil_mode to handle odd dimensions
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True)
        
        # Bottleneck
        self.bottleneck = conv_block(512, 1024)
        
        # Decoder
        self.upconv4 = up_conv(1024, 512)
        self.decoder4 = conv_block(1024, 512)
        self.upconv3 = up_conv(512, 256)
        self.decoder3 = conv_block(512, 256)
        self.upconv2 = up_conv(256, 128)
        self.decoder2 = conv_block(256, 128)
        self.upconv1 = up_conv(128, 64)
        self.decoder1 = conv_block(128, 64)
        
        # Final regression head
        self.final_conv = nn.Conv2d(64, out_channels, kernel_size=1)
        self.global_pool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        # Encoder
        e1 = self.encoder1(x)          # (64, H, W)
        e2 = self.encoder2(self.pool(e1))  # (128, H/2, W/2)
        e3 = self.encoder3(self.pool(e2))  # (256, H/4, W/4)
        e4 = self.encoder4(self.pool(e3))  # (512, H/8, W/8)
        
        # Bottleneck
        bottleneck = self.bottleneck(self.pool(e4))  # (1024, H/16, W/16)
        
        # Decoder with cropping
        d4 = self.upconv4(bottleneck)       # (512, H/8, W/8)
        d4 = self.crop(d4, e4)              # Ensure matching dimensions
        d4 = torch.cat([d4, e4], dim=1)     # (1024, H/8, W/8)
        d4 = self.decoder4(d4)              # (512, H/8, W/8)
        
        d3 = self.upconv3(d4)               # (256, H/4, W/4)
        d3 = self.crop(d3, e3)
        d3 = torch.cat([d3, e3], dim=1)     # (512, H/4, W/4)
        d3 = self.decoder3(d3)              # (256, H/4, W/4)
        
        d2 = self.upconv2(d3)               # (128, H/2, W/2)
        d2 = self.crop(d2, e2)
        d2 = torch.cat([d2, e2], dim=1)     # (256, H/2, W/2)
        d2 = self.decoder2(d2)              # (128, H/2, W/2)
        
        d1 = self.upconv1(d2)               # (64, H, W)
        d1 = self.crop(d1, e1)
        d1 = torch.cat([d1, e1], dim=1)     # (128, H, W)
        d1 = self.decoder1(d1)              # (64, H, W)
        
        # Final output
        output = self.final_conv(d1)        # (out_channels, H, W)
        output = self.global_pool(output)   # (out_channels, 1, 1)
        output = output.view(output.size(0), -1)
        return output

    def crop(self, source, target):
        # Crop source to match target dimensions
        _, _, h, w = target.size()
        return source[:, :, :h, :w]
    
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(in_channels=3).to(device)
model.load_state_dict(torch.load("/Users/cilvosimon/Codes/fetal_ultrasound/unet_epoch_iteration_1.pth", map_location=device))
model.eval()

UNet(
  (encoder1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
  )
  (encoder2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
  )
  (encoder3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=T

In [8]:
import os
import cv2
import torch
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader

# Define constants
IMAGE_SIZE = (800, 540)  # Ensure this matches the training image size
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def predict_and_save_to_csv(image_folder, output_csv, model):
    model.to(DEVICE)
    model.eval()  # Set model to evaluation mode

    # Initialize a list to store predictions
    predictions = []

    # Iterate over all images in the folder
    for img_name in os.listdir(image_folder):
        img_path = os.path.join(image_folder, img_name)

        # Load image in grayscale
        image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        if image is None:
            print(f"Warning: Failed to read image {img_name}")
            continue

        # Convert grayscale to 3-channel RGB
        image_rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)

        # Resize image to match the input size expected by the model
        image_resized = cv2.resize(image_rgb, IMAGE_SIZE)
        image_resized = image_resized.astype(np.float32) / 255.0  # Normalize

        # Convert to torch tensor and add batch dimension
        image_tensor = torch.from_numpy(image_resized).permute(2, 0, 1).unsqueeze(0).to(DEVICE)  # Shape: (1, 3, 224, 224)

        # Get the predictions from the model
        with torch.no_grad():
            landmarks = model(image_tensor)  # Output shape: (1, 8)

        # Convert predictions to numpy and flatten
        landmarks_np = landmarks.cpu().numpy().flatten()

        # Append predictions with image name
        predictions.append([img_name] + landmarks_np.tolist())

    # Create a dataframe and save to CSV
    df_predictions = pd.DataFrame(predictions, columns=[
        "image_name", "ofd_1_x", "ofd_1_y", "ofd_2_x", "ofd_2_y", 
        "bpd_1_x", "bpd_1_y", "bpd_2_x", "bpd_2_y"
    ])
    df_predictions.to_csv(output_csv, index=False)
    print(f"Predictions saved to {output_csv}")

# Test folder path and output CSV path
image_folder = "/Users/cilvosimon/Codes/landmark_detection/test"  # Update with your test folder
output_csv = "predictions_unet.csv"  # Update output file name

# Ensure the model is loaded before calling the function
predict_and_save_to_csv(image_folder, output_csv, model)


Predictions saved to predictions_unet.csv


In [9]:
import pandas as pd

def compare_predictions(pred_csv, ground_truth_csv, image_size=299):
    # Load predicted CSV
    df_pred = pd.read_csv(pred_csv)
    
    # Load ground truth CSV
    df_gt = pd.read_csv(ground_truth_csv)
    
    # Merge both dataframes on the image_name column
    df_merged = pd.merge(df_pred, df_gt, on="image_name", suffixes=('_pred', '_gt'))
    
    # Compute the errors for each landmark (for each coordinate pair: x and y)
    errors = []
    relative_errors = []

    for i in range(1, 5):  # Assuming 4 landmarks: ofd_1, ofd_2, bpd_1, bpd_2
        x_pred_col = f"ofd_{i}_x_pred" if i <= 2 else f"bpd_{i-2}_x_pred"
        y_pred_col = f"ofd_{i}_y_pred" if i <= 2 else f"bpd_{i-2}_y_pred"
        
        x_gt_col = f"ofd_{i}_x_gt" if i <= 2 else f"bpd_{i-2}_x_gt"
        y_gt_col = f"ofd_{i}_y_gt" if i <= 2 else f"bpd_{i-2}_y_gt"
        
        # Check if the columns exist in the dataframe
        if all(col in df_merged.columns for col in [x_pred_col, y_pred_col, x_gt_col, y_gt_col]):
            # Compute absolute errors
            x_error = abs(df_merged[x_pred_col] - df_merged[x_gt_col])
            y_error = abs(df_merged[y_pred_col] - df_merged[y_gt_col])
            
            # Compute mean absolute error (AAE)
            mean_x_error = x_error.mean()
            mean_y_error = y_error.mean()
            mean_error = (mean_x_error + mean_y_error) / 2  # Average of x and y errors
            
            errors.append(mean_error)
            
            # Compute relative error in percentage
            relative_error = (mean_error / image_size) * 100
            relative_errors.append(relative_error)
        else:
            print(f"Warning: One or more columns for landmark {i} are missing.")
    
    # Print average absolute errors
    print("Average Absolute Errors (in pixels):")
    for i, error in enumerate(errors):
        print(f"Landmark {i+1}: {error:.4f} pixels")

    # Print relative errors
    print("\nRelative Errors (% of image size):")
    for i, rel_error in enumerate(relative_errors):
        print(f"Landmark {i+1}: {rel_error:.2f}%")

# Paths to the predictions CSV and ground truth CSV
pred_csv = "predictions_unet.csv"
ground_truth_csv = "/Users/cilvosimon/Codes/landmark_detection/test.csv"

# Compare predictions with ground truth
compare_predictions(pred_csv, ground_truth_csv)


Average Absolute Errors (in pixels):
Landmark 1: 243.0951 pixels
Landmark 2: 417.3865 pixels
Landmark 3: 212.1321 pixels
Landmark 4: 451.2837 pixels

Relative Errors (% of image size):
Landmark 1: 81.30%
Landmark 2: 139.59%
Landmark 3: 70.95%
Landmark 4: 150.93%
