In [None]:
# Import necessary libraries for file handling, data manipulation, and visualization
import os
import random
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Import libraries for working with images and transformations
from PIL import Image
import cv2 as cv

# Import PyTorch modules for model building, data handling, and evaluation
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
import torchvision.models.quantization as quant_models
from torch.utils.data import Dataset, DataLoader, Subset
from torchinfo import summary

# Import libraries for machine learning metrics and model evaluation
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import roc_auc_score, confusion_matrix, roc_curve, mean_squared_error, mean_absolute_error, r2_score
import torchmetrics
from tqdm import tqdm

import warnings
warnings.filterwarnings('ignore')

# Set the seed.
seed = 42
torch.manual_seed(seed)

In [2]:
data_dir="/Users/romerocruzsa/Workspace/Projects/Research/cp-anemia-detection/data/cp-anemia/"
weights_dir="/Users/romerocruzsa/Workspace/Projects/Research/cp-anemia-detection/notebooks/weights"
anemic_dir=data_dir+"/Anemic"
non_anemic_dir=data_dir+"/Non-anemic"
signature = "02042024"

In [None]:
data_sheet_path = data_dir+"Anemia_Data_Collection_Sheet.csv"
data_sheet = pd.read_csv(data_sheet_path)
display(data_sheet)

In [None]:
# Mapping diagnosis to severity
severity_mapping = {
    "Non-Anemic": 0,
    "Mild": 1,
    "Moderate": 2,
    "Severe": 3,
}

data_sheet['Severity'] = data_sheet['Severity'].map(severity_mapping)
display(data_sheet)

In [None]:
# Define data augmentations or transformations
transform = transforms.Compose([
    transforms.Resize((256, 192)),
    transforms.RandomHorizontalFlip(p=np.random.rand()),
    transforms.RandomVerticalFlip(p=np.random.rand()),
    transforms.RandomRotation(degrees=np.random.randint(0, 360)),
    transforms.RandomAffine(degrees=np.random.randint(0, 360)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Custom dataset class
class CPAnemiCDataset(Dataset):
    def __init__(self, dir, df, transform=None):
        self.dir = dir
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_id = row['IMAGE_ID']
        img_folder = row['REMARK']
        img_path = os.path.join(self.dir, img_folder, img_id + ".png")
        img = Image.open(img_path).convert('RGB')

        if self.transform:
            img = self.transform(img)

        multiclass_label = torch.tensor(row['Severity'])
        hb_level = torch.tensor(row['HB_LEVEL'])

        return img, multiclass_label, hb_level

    # Load the dataset
image_dataset = CPAnemiCDataset(data_dir, data_sheet, transform=transform)
train_dataset, test_dataset = train_test_split(image_dataset, test_size=0.20, shuffle=True)

print(f"Image Dataset Size (All): {len(image_dataset)}, \
        Train Size: {len(train_dataset)}, \
        Test Size: {len(test_dataset)}")

BATCH_SIZE = 32
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# Default device
device = torch.device('cpu')

# Check for CUDA availability
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    print("CUDA is not available, using CPU.")

print(f"Selected device: {device}")

In [22]:
def get_model_size(mdl):
    torch.save(mdl.state_dict(), "tmp.pt")
    model_size = "Model Size: %.2f MB" %(os.path.getsize("tmp.pt")/1e6)
    os.remove('tmp.pt')
    return model_size

# Static Weighting Function. Set eta_class to desired importance (Classification > .5, Regression < .5, Equal == .5)
def sw_loss(loss_class, loss_reg, eta_class=0.5):
    eta_reg = 1 - eta_class
    total_loss = (eta_class * loss_class) + (eta_reg * loss_reg)
    return total_loss

In [23]:
def train(dataloader, model, loss_fn_class, loss_fn_reg, optimizer):
    model.train()
    total_loss = 0
    correct = 0
    total_samples = 0

    # print("prob_class_0 | prob_class_1 | prob_class_2 | prob_class_3 | highest_prob_class | hb_estimate")

    for _, (img, multiclass, hb_level) in enumerate(dataloader):
        img = img.to(device)
        multiclass = multiclass.to(device)
        hb_level = hb_level.to(device).unsqueeze(1).float()

        optimizer.zero_grad()

        # Forward pass (model outputs both classification and regression)
        class_pred, reg_pred = model(img)

        # Compute losses
        loss_class = loss_fn_class(class_pred, multiclass)
        loss_reg = loss_fn_reg(reg_pred, hb_level)
        loss = sw_loss(loss_class, loss_reg, 0.7)

        # Backpropagation
        loss.backward()
        optimizer.step()

        # Track total loss
        total_loss += loss.item()

        # Compute probabilities
        class_probs = F.softmax(class_pred, dim=1)
        highest_prob_class = torch.argmax(class_probs, dim=1)

        # Track accuracy
        correct += (highest_prob_class == multiclass).sum().item()
        total_samples += multiclass.size(0)

        # Print probabilities, predicted class, and Hb estimate
        # for i in range(class_probs.size(0)):
        #     print(f"{class_probs[i, 0]:.4f} | {class_probs[i, 1]:.4f} | {class_probs[i, 2]:.4f} | {class_probs[i, 3]:.4f} | {highest_prob_class[i].item()} | {reg_pred[i].item():.2f}")

    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total_samples

    return avg_loss, accuracy

In [24]:
def eval(dataloader, model, loss_fn_class, loss_fn_reg):
    model.eval()
    total_loss = 0
    correct = 0
    total_samples = 0

    # print("prob_class_0 | prob_class_1 | prob_class_2 | prob_class_3 | highest_prob_class | hb_estimate")

    with torch.no_grad():
        for _, (img, multiclass, hb_level) in enumerate(dataloader):
            img = img.to(device)
            multiclass = multiclass.to(device)
            hb_level = hb_level.to(device).unsqueeze(1).float()

            # Forward pass (model outputs both classification and regression)
            class_pred, reg_pred = model(img)

            # Compute losses
            loss_class = loss_fn_class(class_pred, multiclass)
            loss_reg = loss_fn_reg(reg_pred, hb_level)
            loss = sw_loss(loss_class, loss_reg, 0.7)

            # Track total loss
            total_loss += loss.item()

            # Compute probabilities
            class_probs = F.softmax(class_pred, dim=1)
            highest_prob_class = torch.argmax(class_probs, dim=1)

            # Track accuracy
            correct += (highest_prob_class == multiclass).sum().item()
            total_samples += multiclass.size(0)

            # # Print probabilities, predicted class, and Hb estimate
            # for i in range(class_probs.size(0)):
            #     print(f"{class_probs[i, 0]:.4f} | {class_probs[i, 1]:.4f} | {class_probs[i, 2]:.4f} | {class_probs[i, 3]:.4f} | {highest_prob_class[i].item()} | {reg_pred[i].item():.2f}")

    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total_samples

    return avg_loss, accuracy

In [25]:
class MobileNetMultiOutput(nn.Module):
    def __init__(self):
        super(MobileNetMultiOutput, self).__init__()
        self.mobilenet = models.mobilenet_v2(pretrained=False)
        num_ftrs = self.mobilenet.classifier[1].in_features
        self.mobilenet.classifier = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(num_ftrs, 128),
            nn.ReLU(),
            nn.Linear(128, 5)  # 4-class classification + 1 regression output
        )

    def forward(self, x):
        output = self.mobilenet(x)
        class_output = output[:, :4]  # First 4 values = class probabilities
        reg_output = output[:, 4]  # Last value = Hb level estimate
        return class_output, reg_output  # Return as two separate outputs

# Load the modified MobileNet
model = MobileNetMultiOutput().to(device)

In [None]:
# Training parameters
BATCH_SIZE = 32
EPOCHS = 3
FOLDS = 5

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Ensure dataset exists (Modify as needed)
image_dataset = train_dataset  # Assuming train_dataset is defined

# Initialize model and loss functions
model = MobileNetMultiOutput().to(device)

loss_fn_class = torch.nn.CrossEntropyLoss()  # Multi-class classification loss
loss_fn_reg = torch.nn.MSELoss()  # Regression loss
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# 5-Fold Cross Validation
kf = KFold(n_splits=FOLDS, shuffle=True, random_state=42)

# Directory to save the best model
weights_dir = "model_weights"
os.makedirs(weights_dir, exist_ok=True)

best_val_acc = -float('inf')  # Track best validation accuracy
train_metrics_df = []
val_metrics_df = []

# Training loop
for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    fold = 1

    for train_idx, val_idx in kf.split(range(len(image_dataset))):  # FIX: Ensure correct splitting
        train_subset = Subset(image_dataset, train_idx)
        val_subset = Subset(image_dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=False)

        if fold == FOLDS:
            # Validation phase
            avg_val_loss, val_acc = eval(val_loader, model, loss_fn_class, loss_fn_reg)

            print(f"\nValidation: Fold {fold} - Loss: {avg_val_loss:.4f}, Accuracy: {val_acc:.4f}")

            # Save model with the best validation accuracy
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                val_metrics_dict = {"Loss": avg_val_loss, "Accuracy": val_acc}
                val_metrics_df.append(val_metrics_dict)
                torch.save(model.state_dict(), f"{weights_dir}/model_best_accuracy.pth")
                print(f"Best model saved with Accuracy: {best_val_acc:.4f}")

        else:
            # Training phase
            avg_train_loss, train_acc = train(train_loader, model, loss_fn_class, loss_fn_reg, optimizer)

            print(f"Training: Fold {fold} - Loss: {avg_train_loss:.4f}, Accuracy: {train_acc:.4f}")

            train_metrics_dict = {"Loss": avg_train_loss, "Accuracy": train_acc}
            train_metrics_df.append(train_metrics_dict)

        fold += 1  # Move to next fold

# Ensure `get_model_size()` exists or remove this line
print(get_model_size(model))