# MURA Binary Classification
## Pretrained Model: ResNet - Specialised/Unified Models

# Load dependencies

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt

import random
from PIL import Image
import os

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torch

In [None]:
os.listdir('/kaggle/input/')

# Data preparation

In [None]:
# path = '/kaggle/input/mura-data/MURA-v1.1'

# #train
# train_images = pd.DataFrame(pd.read_csv(path + '/train_image_paths.csv', header = None, names =['image_path']))
# train_studies = pd.DataFrame(pd.read_csv(path + '/train_labeled_studies.csv', header= None, names=['study_path', 'label']))

# #test
# test_images = pd.DataFrame(pd.read_csv(path + '/valid_image_paths.csv', header = None, names =['image_path']))
# test_studies = pd.DataFrame(pd.read_csv(path + '/valid_labeled_studies.csv', header= None, names=['study_path', 'label']))

# train_images['study_label'] = train_images['image_path'].str.rsplit('/', n=2, expand=True)[1].str.rsplit('_', n=1, expand=True)[1]
# test_images['study_label'] = test_images['image_path'].str.rsplit('/', n=2, expand=True)[1].str.rsplit('_', n=1, expand=True)[1]

# train_images['label'] = train_images['study_label'].map({'positive': 1, 'negative': 0})
# test_images['label'] = test_images['study_label'].map({'positive': 1, 'negative': 0})

# train_images['XR'] = train_images['image_path'].str.rsplit('/', n=5, expand=True)[2]
# test_images['XR'] = test_images['image_path'].str.rsplit('/', n=5, expand=True)[2]

# prefix = '/kaggle/input/mura-data/'
# train_images['image_path'] = prefix + train_images['image_path']
# test_images['image_path'] = prefix + test_images['image_path']

# train_images['image_size'] = train_images['image_path'].apply(lambda path: Image.open(path).size)
# test_images['image_size'] = test_images['image_path'].apply(lambda path: Image.open(path).size)

# # train_images.to_csv('train_df.csv', index=False)
# # test_images.to_csv('val_df.csv', index=False)

In [None]:
# Load 2 DataFrames directly
train_data = pd.read_csv('/kaggle/input/load-mura/train_df.csv')
test_images = pd.read_csv('/kaggle/input/load-mura/val_df.csv')

In [None]:
train_images, val_images = train_test_split(train_data, test_size=0.2, random_state=42)
print('Number of images in training set:', len(train_images))
print('Number of images in validation set:', len(val_images))
print('Number of images in test set:', len(test_images))

In [None]:
train_images

# EDA

In [None]:
train_images['label'].value_counts()

In [None]:
# prefix = '/kaggle/input/mura-data/'
# image_paths = train_images['image_path'].tolist() 
# image_sizes = [Image.open(prefix + path).size for path in image_paths]
# unique_sizes = set(image_sizes)
# if len(unique_sizes) == 1:
#     print("All images are the same size:", unique_sizes.pop())
# else:
#     print(f"Images have different sizes. Unique sizes found: {unique_sizes}")

In [None]:
img = Image.open('/kaggle/input/mura-data/MURA-v1.1/train/XR_FOREARM/patient09241/study1_positive/image2.png')

In [None]:
# Define the image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to 224x224 pixels
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize (for pre-trained models)
])

# Define augmentations for the minor class
augmentation_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def plot_random_images(df, label, n=5):
    images = df[df['label'] == label]['image_path'].sample(n) 
    plt.figure(figsize=(15, 15))
    for i, path in enumerate(images):
        img = Image.open(path)
        img_rgb = img.convert("RGB")  # Ensure the image is in RGB mode
        
        transformed_tensor = augmentation_transform(img_rgb) # transform(img_rgb)
        transformed_img = transformed_tensor.numpy().transpose((1, 2, 0))  # Convert tensor back to image format
        transformed_img = transformed_img.clip(0, 1)  # Clip values to range [0, 1]
        
        plt.subplot(1, n, i + 1)
        
        # plt.imshow(img)
        plt.imshow(transformed_img)
        
        plt.title(label)
        plt.axis('off')
    plt.show()

# Show samples for each class
for label in train_images['label'].unique():
    plot_random_images(train_images, label)

In [None]:
# Show image by index
def plot_one_image(df, index):
    path = df.loc[index, 'image_path']
    label = df.loc[index, 'label']
    img = Image.open(path)
    img_rgb = img.convert("RGB")  # Ensure the image is in RGB mode
    transformed_tensor = transform(img_rgb)
    transformed_img = transformed_tensor.numpy().transpose((1, 2, 0))  # Convert tensor back to image format
    transformed_img = transformed_img.clip(0, 1)  # Clip values to range [0, 1]
    plt.subplot(1, 2, 1)
    plt.imshow(img)
    plt.axis('off')
    plt.title(f'original image, label:{label}')

    plt.subplot(1, 2, 2)
    plt.imshow(transformed_img)
    plt.axis('off')
    plt.title('transformed image')
    
    plt.show()

In [None]:
# plot_one_image(train_images, 200)

In [None]:
train_images['XR'].value_counts()

In [None]:
# # On seperate classes
# which_class = 'XR_HAND' # XR_WRIST  XR_SHOULDER   XR_HAND  XR_FINGER   XR_ELBOW    XR_FOREARM  XR_HUMERUS 
# train_df = train_images[train_images['XR'] == which_class]
# val_df = val_images[val_images['XR'] == which_class]
# test_df = test_images[test_images['XR'] == which_class]

# On the whole dataset
train_df = train_images
val_df = val_images
test_df = test_images

print(train_df['label'].value_counts())
print(val_df['label'].value_counts())
print(test_df['label'].value_counts())

# Training

In [None]:
# Define the standard transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to 224x224 pixels
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize (for pre-trained models)
])

# Define augmentations for the minor class
augmentation_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Custom Dataset class to handle loading and transformations
class ImageDataset(Dataset):
    """This class is for validation and test datasets"""
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]['image_path']
        label = self.df.iloc[idx]['label']

        # Load image
        image = Image.open(img_path).convert("RGB")

        # Apply transformations
        if self.transform:
            image = self.transform(image)

        return image, label

# Perform data augmentation on training dataset
class BalancedImageDataset(Dataset):
    def __init__(self, df, transform=None, augment_transform=None):
        self.df_major = df[df['label'] == 0]  # Majority class (label=0)
        self.df_minor = df[df['label'] == 1]  # Minority class (label=1)
        self.transform = transform
        self.augment_transform = augment_transform

        # Balance the dataset by augmenting the minor class
        num_major = len(self.df_major)
        num_minor = len(self.df_minor)
        self.augmented_minor_df = self.df_minor.sample(n=num_major, replace=True, random_state=42)

        # Combine the majority and augmented minority dataframes
        self.balanced_df = pd.concat([self.df_major, self.augmented_minor_df]).reset_index(drop=True)

    def __len__(self):
        return len(self.balanced_df)

    def __getitem__(self, idx):
        img_path = self.balanced_df.iloc[idx]['image_path']
        label = self.balanced_df.iloc[idx]['label']

        # Load image
        image = Image.open(img_path).convert("RGB")

        # Apply augmentations to the minor class
        if label == 1 and self.augment_transform:
            image = self.augment_transform(image)
        elif self.transform:
            image = self.transform(image)

        return image, label

In [None]:
# Create dataset and data loader
# train_dataset = BalancedImageDataset(train_df, transform=transform, augment_transform=augmentation_transform)
train_dataset = ImageDataset(train_df, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)  # Adjust batch_size as needed

val_dataset = ImageDataset(val_df, transform=transform)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)  # Adjust batch_size as needed

test_dataset = ImageDataset(test_df, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)  # Adjust batch_size as needed

In [None]:
# Show the transformed images
def show_image(img_tensor, title=None):
    """
    Display a single image from a PyTorch tensor.
    """
    img = img_tensor.numpy().transpose((1, 2, 0))  # Convert CxHxW to HxWxC
    plt.imshow(img)
    if title:
        plt.title(title)
    plt.axis('off')
    plt.show()

# Access an image and its label
img_tensor, label = train_dataset[1]

# Display the image
show_image(img_tensor, title=f"Label: {label}")
print(img_tensor.shape)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class SimpleCNN(nn.Module): # 70s for 1 epoch
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(64 * 56 * 56, 128)  
        self.fc2 = nn.Linear(128, 1)
        self.dropout = nn.Dropout(0.5)  

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 56 * 56)  
        x = F.relu(self.fc1(x))
        # x = self.dropout(x)
        x = self.fc2(x)
        return x

class BinaryImageClassifierCNN(nn.Module):
    def __init__(self):
        super(BinaryImageClassifierCNN, self).__init__()
        
        # Convolutional layers
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1), 
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),                

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), 
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),             

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),                 

            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)                  
        )
        
        # Fully connected layers
        self.fc_layers = nn.Sequential(
            nn.Flatten(),                                        
            nn.Linear(14 * 14 * 256, 512),                       
            nn.ReLU(),
           # nn.Dropout(0.5),                                    
            nn.Linear(512, 1)                           
        )
    
    def forward(self, x):
        x = self.conv_layers(x)  # Pass through convolutional layers
        x = self.fc_layers(x)    # Pass through fully connected layers
        return x

In [None]:
from torchvision import models
num_classes = 2  
# model = BinaryImageClassifierCNN() #SimpleCNN()

# ResNet-Deeper-----------------------------------
resnet = models.resnet18(pretrained=True)
num_features = resnet.fc.in_features
print(num_features)
resnet.fc = nn.Sequential(
    nn.Dropout(0.5),
    nn.Linear(num_features, 256), 
    nn.Dropout(0.5),
    nn.Linear(256, 128),
    nn.Linear(128, 32),
    nn.Linear(32, 1),  # Binary classification
    # nn.Sigmoid()  # To output probabilities
)
# -----------------------------------------
print(resnet.fc)
# # ResNet-----------------------------------
# resnet = models.resnet18(pretrained=True)
# num_features = resnet.fc.in_features
# print(num_features)
# resnet.fc = nn.Sequential(
#     nn.Dropout(0.5),
#     nn.Linear(num_features, 256), 
#     nn.Dropout(0.5),
#     nn.Linear(256, 64),
#     nn.Linear(64, 1),  # Binary classification
#     # nn.Sigmoid()  # To output probabilities
# )
# # -----------------------------------------

# # Load model
# resnet = torch.load('/kaggle/input/resnet18-on-wrist-acc80/resnet18_model.pth') # Load model trained on WRIST

# Loss function and optimizer
criterion =  nn.BCEWithLogitsLoss() #For Binary classification we use logit loss# For multi-class: nn.CrossEntropyLoss()
# optimizer = optim.Adam(resnet.parameters(), lr=0.001)
optimizer = optim.SGD(resnet.parameters(), lr=0.001, momentum=0.8, weight_decay=0.001)

# # Try L2 in optimizer
# params = [
#     {"params": [param for name, param in resnet.named_parameters() if "bias" not in name], "weight_decay": 0.01},
#     {"params": [param for name, param in resnet.named_parameters() if "bias" in name], "weight_decay": 0.0}
# ]
# optimizer = optim.Adam(params, lr=0.001)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def evaluate_model(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():  # No need to calculate gradients during evaluation
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images).squeeze(1)
            probabilities = torch.sigmoid(outputs)
            predicted = (probabilities > 0.5).long()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")

In [None]:
class EarlyStopping:
    """Save model"""
    def __init__(self, patience=5, delta=0, path='checkpoint.pt', verbose=False):
        """
        Args:
            patience (int): How long to wait after the last improvement.
            delta (float): Minimum change to qualify as an improvement.
            path (str): Path to save the best model.
            verbose (bool): Print a message when stopping.
        """
        self.patience = patience
        self.delta = delta
        self.path = path
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        """Saves the model when validation loss decreases."""
        if self.verbose:
            print(f"Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model...")
        torch.save(model, self.path)
        self.val_loss_min = val_loss

# class EarlyStopping:
#     """Not save model"""
#     def __init__(self, patience=5, delta=0, verbose=False):
#         """
#         Args:
#             patience (int): How long to wait after the last improvement.
#             delta (float): Minimum change to qualify as an improvement.
#             verbose (bool): Print a message when stopping.
#         """
#         self.patience = patience
#         self.delta = delta
#         self.verbose = verbose
#         self.counter = 0
#         self.best_score = None
#         self.early_stop = False
#         self.val_loss_min = np.Inf

#     def __call__(self, val_loss):
#         score = -val_loss

#         if self.best_score is None:
#             self.best_score = score
#             self.val_loss_min = val_loss  # Track the best validation loss
#         elif score < self.best_score + self.delta:
#             self.counter += 1
#             if self.verbose:
#                 print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
#             if self.counter >= self.patience:
#                 self.early_stop = True
#         else:
#             self.best_score = score
#             self.val_loss_min = val_loss  # Update the best validation loss
#             self.counter = 0

In [None]:
# model = model.to(device)
resnet = resnet.to(device)

import time

# Training function
def train_model(model, train_dataloader,val_dataloader, criterion, optimizer, num_epochs=10):
    train_loss, train_acc, val_loss, val_acc = [], [], [], []
    # Initialize EarlyStopping
    early_stopping = EarlyStopping(patience=5, verbose=True)
    
    for epoch in range(num_epochs):
        start_time = time.time()
        model.train()  # Set the model to training mode
        running_loss = 0.0
        preds, labels = [], []
        for images, label in iter(train_dataloader):
            images, label = images.to(device), label.to(device).float()
            
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images).squeeze(1)
           
            loss = criterion(outputs, label)
            # Accumulate the loss
            running_loss += loss.item()
      

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            probabilities = torch.sigmoid(outputs)
            pred = (probabilities > 0.5).long() 
            preds.append(pred.detach().cpu().numpy())     
            labels.append(label.detach().cpu().numpy())
             
        # train_acc    
        preds = np.hstack(preds)
        labels = np.hstack(labels)
        train_acc.append(accuracy_score(labels, preds))
        
        # train_loss
        loss = running_loss / len(train_dataloader)
        
        train_loss.append(loss)
        
        # Perform validation
        model.eval()
        with torch.no_grad():
            preds, targets = [], []
            running_loss = 0
            for images, label in iter(val_dataloader):
                images, label = images.to(device), label.to(device).float()

                outputs = model(images).squeeze(1)

                loss = criterion(outputs, label)
                running_loss += loss.item()

                # Calculate val_acc
                probabilities = torch.sigmoid(outputs)
                pred = (probabilities > 0.5).long()  # For binary classification
                preds.append(pred.detach().cpu().numpy())
                targets.append(label.detach().cpu().numpy())
        
            preds = np.hstack(preds)
            targets = np.hstack(targets)
        
        val_acc.append(accuracy_score(targets, preds))
        val_loss.append(running_loss / len(val_dataloader))
        
        # Check early stopping
        early_stopping(val_loss[-1], model)

        if early_stopping.early_stop:
            print("Early stopping")
            break

        spent_time = time.time() - start_time

        print(f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {train_loss[-1]:.4f}, Validation Loss: {val_loss[-1]:.4f}. Spent {spent_time:.4f}s.")
        
    print("Training complete.")
    return train_loss, train_acc, val_loss, val_acc

In [None]:
# Start training
num_epochs = 20
train_loss, train_acc, val_loss, val_acc = train_model(resnet, train_dataloader,val_dataloader, criterion, optimizer, num_epochs=num_epochs)

In [None]:
# Loss curves
plt.plot(np.arange(1, len(train_loss) + 1), train_loss,'b-', label='training loss')
plt.plot(np.arange(1, len(val_loss) + 1), val_loss,'r-', label='validation loss')
plt.xlabel('epochs')
plt.ylabel('losses')
plt.legend(loc='best')
plt.title('The Loss Curves')

In [None]:
# Accuracy curves
plt.plot(np.arange(1, len(train_acc) + 1), train_acc,'g-', label='training accuracy')
plt.plot(np.arange(1, len(val_acc) + 1), val_acc,'m-', label='validation accuracy')
plt.xlabel('epochs')
plt.ylabel('accuracies')
plt.legend(loc='best')  
plt.title('The Accuracy Curves')

# Evaluation

In [None]:
from sklearn.metrics import cohen_kappa_score
def calculate_kappa(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    all_predictions = []
    all_labels = []

    with torch.no_grad():  # No need to calculate gradients during evaluation
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images).squeeze(1)
            probabilities = torch.sigmoid(outputs)
            predicted = (probabilities > 0.5).long()
            
            # Collect predictions and true labels
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate Cohen's kappa
    kappa = cohen_kappa_score(all_labels, all_predictions)
    print(f"Cohen's Kappa: {kappa:.2f}")

In [None]:
# Evaluate after training
evaluate_model(resnet, test_dataloader)
calculate_kappa(resnet, test_dataloader)

In [None]:
# Save model
torch.save(resnet, 'resnet_model.pth')

In [None]:
# Load checkpoint.pt
model = torch.load('/kaggle/working/checkpoint.pt')

# Evaluate the best one with early stopping
evaluate_model(model, test_dataloader)

# Evaluate the best one with early stopping
calculate_kappa(model, test_dataloader)

In [None]:
# model = torch.load('/kaggle/working/resnet18_model.pth')
# evaluate_model(model, test_dataloader)