In [4]:
import os
import sys
import itertools
import random
import glob
import tqdm
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, multilabel_confusion_matrix, precision_score, recall_score, f1_score, roc_auc_score, roc_curve, auc
import torch
from torch.utils.data import Dataset, DataLoader 
import torch.nn as nn
import torchvision
from torchvision import transforms
import torchvision.models as models
from icecream import ic

from PIL import Image

# Loading the data

In [39]:
# Getting the directories for data
DATA_DIR = os.path.join(os.getcwd(), 'dataset')
IMAGE_DIR = [os.path.join(os.path.join(os.getcwd(), 'dataset'), f'images_{str(i).zfill(3)}', 'images') for i in range(1, 13)]
LABELS_CSV = os.path.join(DATA_DIR, 'Data_Entry_2017.csv')
BBOX_CSV = os.path.join(DATA_DIR, 'BBox_List_2017.csv')

labels = pd.read_csv(LABELS_CSV)
bbox = pd.read_csv(BBOX_CSV)

# Splitting the labels into multiple rows
labels_expanded = labels.copy()
labels_expanded['Finding Label'] = labels_expanded['Finding Labels'].str.split('|')

# Making a unique Id column for each image
labels_expanded['Id'] = list(zip(labels_expanded['Patient ID'], labels_expanded['Follow-up #']))
labels_expanded = labels_expanded.explode('Finding Label')
data = labels_expanded[['Id','Image Index', 'Finding Label', 'Patient Age', 'Patient Gender', 'View Position']].copy()

# Adding the disease code to the data
unique_diseases = data['Finding Label'].unique()
disease_to_number = {disease: idx for idx, disease in enumerate(unique_diseases)}
data.loc[:, 'Disease Code'] = data['Finding Label'].map(disease_to_number)
data['Id'] = data.apply(lambda row: (row['Id'][0], row['Id'][1], row['Disease Code']), axis=1)
data = data[['Id', 'Image Index', 'Finding Label', 'Patient Age', 'Patient Gender', 'View Position']]

# Resetting the index
data.reset_index(drop=True, inplace=True)

print(data.head())
# img = data.iloc[3]
# print(type(data.loc[1,'Id']))

# train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)
# val_data, test_data = train_test_split(val_test_data, test_size=0.5, random_state=42)

# Printing an Image

In [40]:
image_paths = glob.glob(os.path.join(DATA_DIR, 'images_*', 'images', '*.png'))
image_dict = {os.path.basename(path): path for path in image_paths}

sample_images = data['Image Index'].unique()
img_name = np.random.choice(sample_images, size=1, replace=False)[0]
img_path = image_dict.get(img_name)
img = Image.open(img_path)
data1 = data[data['Image Index'] == img_name]
# print(img)

# plt.figure(figsize=(6, 6))
# plt.imshow(img)
# plt.axis('off')
# plt.title(f"Findings: {', '.join(data1['Finding Label'].unique())}", fontsize=14)
# plt.show()


img = torchvision.io.decode_image(image_dict.get("00000001_000.png"))
# print(img.shape)
img = img.permute(1,2,0)
plt.imshow(img, cmap = 'gray')
plt.axis('off')
plt.show()

## Creating the DataLoader

In [41]:
class NIH_dataset(Dataset):
    '''
    Custom dataset class for the NIH\n
    params:\n
    df: Pandas dataframe containing the data\n
    image_directories: list of directories in string containing the images\n
    transform: torchvision.transforms.transforms. DO NOT PASS transforms.to_tensor() here\n
    target_transform: Not used here\n
    '''
    def __init__(self, df: pd.DataFrame, image_directories: list[str], transform: torchvision.transforms=None, target_transform=None):
        self.df = df
        self.image_dir = image_directories
        self.transform = transform
        self.image_to_path = {os.path.basename(path): path for path in self.image_dir}

    def __len__(self):
        """
        Returns the length of the dataset
        """
        return len(self.df)

    def __getitem__(self, idx: int):
        img_path = self.image_to_path.get(self.df.loc[idx, 'Image Index'])
        label: str = self.df.loc[idx, 'Finding Label']
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, label #returns img as tensor matrix, label as string


# Loading the parameters and dataloaders

In [None]:
# Getting the image paths
image_paths = glob.glob(os.path.join(DATA_DIR, 'images_*', 'images', '*.png'))

# Setting the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyperparameters
epochs = 100
learning_rate = 0.001
batch_size = 2048
test_split_size = 0.001
train_size = 0.0005
# num_of_workers = 16

# Defining the model, loss function and optimizer
weights = models.ResNet50_Weights.DEFAULT # Pretrained weights
model = models.resnet50(weights=weights)
model.fc = nn.Linear(model.fc.in_features, 15) # Change the output layer to 15 classes
model = model.to(device) # Move the model to the device
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

# Try different transforms
transform = transforms.Compose([
    transforms.Resize(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    # transforms.RandomRotation(degrees=30),
    # transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5),
    # transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485], [0.229])
    
])

#splitting the dataset
train_df, test_df = train_test_split(data,test_size=test_split_size,train_size=train_size)
train_df.reset_index(drop=True, inplace=True)
test_df.reset_index(drop=True, inplace=True)

train_dataset = NIH_dataset(df=train_df, image_directories=image_paths, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# image_to_path = {os.path.basename(path): path for path in IMAGE_DIR}
# print(image_to_path.get(train_df.loc[85178, 'Image Index']))


# Train data

In [None]:
def train(epochs: int, model: torch.nn.Module, dataloader: torch.utils.data.DataLoader, criterion: torch.nn.Module, optimizer: torch.optim.Optimizer) -> torch.nn.Module:
    losses, precisions, recalls, f1_scores, accuracies = [], [], [], [], []
    true_negatives, false_positives, false_negatives, true_positives = [], [], [], []
    for epoch in tqdm.tqdm(range(epochs)):
        for inputs, labels in tqdm.tqdm(dataloader, leave=False):
            
            # Move input and label tensors to the device
            inputs = inputs.to(device)
            labels = torch.tensor([disease_to_number[label] for label in labels]).to(device)
            
            # Zero out the optimizer
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            logits, out_lab = torch.max(outputs, 1)

            # Calculate the loss
            loss = criterion(outputs, labels)
            losses.append(loss.item())

            # Calculate the accuracy
            accuracy = accuracy_score(labels.cpu(), out_lab.cpu())
            accuracies.append(accuracy)

            # Calculate the precision
            precision = precision_score(labels.cpu(), out_lab.cpu(), average='macro', zero_division=0)
            precisions.append(precision)

            # Calculate the recall
            recall = recall_score(labels.cpu(), out_lab.cpu(), average='macro', zero_division=0)
            recalls.append(recall)

            # Calculate the f1 score
            f1 = f1_score(labels.cpu(), out_lab.cpu(), average='macro', zero_division=0)
            f1_scores.append(f1)

            # Confusion matrix
            conf_matrices = multilabel_confusion_matrix(labels.cpu(), out_lab.cpu())
            conf_matrix = conf_matrices.sum(axis=0)
            tn, fp, fn, tp = conf_matrix.ravel()
            true_negatives.append(tn)
            true_positives.append(tp)
            false_positives.append(fp)
            false_negatives.append(fn)

            # Backward pass
            loss.backward()
            optimizer.step()
    
    # Print the metrics
    print(f'{np.mean(recalls)=} ,{np.mean(precisions)=} , {np.mean(f1_scores)=} , {np.mean(accuracies)=}', sep='\n')
    
    return model

resnet_trained = train(epochs, model, train_loader, criterion, optimizer)

In [None]:
# Save the model
torch.save(resnet_trained.state_dict(), f"{model.__class__.__name__}_{epochs}_epochs_{learning_rate}_lr_{batch_size}_batch_size.pth")