<a href="https://www.kaggle.com/code/miltiadesgeneral/resnetmodel?scriptVersionId=117681479" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
!pip install -qU wandb
!pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116

## Import Libraries
* Will use pytorch to build and run our model
* Use pandas to aggregate our data

In [None]:
import torch.nn as nn
import torch
from torch import Tensor
import torchvision
import torchvision.transforms as T
from torchvision.utils import make_grid
from torchvision.io import ImageReadMode
from torch.utils.data import DataLoader, Dataset, random_split

import tensorflow as tf

from typing import Type
# from torchvision.models import resnet18

import pydicom as dicom
from PIL import Image
import imageio

import numpy as np 
import pandas as pd
from pathlib import Path
import os
import math

%matplotlib inline

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

from joblib import Parallel, delayed

In [None]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
class CFG:
    # file paths
    train_data = "/kaggle/input/rsna-breast-cancer-detection/train.csv"
    test_data = "/kaggle/input/rsna-breast-cancer-detection/test.csv"
    train_images = "/kaggle/input/rsna-mammography-images-as-pngs/images_as_pngs_512/train_images_processed_512"
    test_images = "/kaggle/input/rsna-breast-cancer-detection/test_images/10008"
    
    # wandb
    project= user_secrets.get_secret("PROJECT")
    entity= user_secrets.get_secret("ENTITY")
    
    # Device config
    device = "GPU"
    
    # image 
    image_size = 256
    
    # batching 
    batch_size = 64
    
    random_seed = 123
    num_workers = 2
    random = True
    
    # loss, optimizer, epochs
    loss_fn = nn.CrossEntropyLoss()
    epochs = 10
    
    

## Device Configuration

In [None]:
if CFG.device == "GPU":
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

## Initialize Weights and Biases

In [None]:
import wandb
try:
    api_key = user_secrets.get_secret("WANDB")

    wandb.login(key=api_key)
    anonymous = None
except:
    anonymous = "must"
    print('To use your W&B account,\nGo to Add-ons -> Secrets and provide your W&B access token. Use the Label name as WANDB. \nGet your W&B access token from here: https://wandb.ai/authorize')
wandb.init(project=CFG.project, entity=CFG.entity)
wandb.config = {
  "learning_rate": 0.01,
  "epochs": CFG.epochs,
  "batch_size": CFG.batch_size
}

## Import train data

In [None]:
TRAIN_DATA_PATH = CFG.train_data
TRAIN_IMG_PATH = CFG.train_images
TEST_DATA_PATH = CFG.test_data
TEST_IMAGE_PATH = CFG.test_images

train_df = pd.read_csv(f'{TRAIN_DATA_PATH}')
train_df['image_path'] = f'{TRAIN_IMG_PATH}'\
                    + '/' + train_df.patient_id.astype(str)\
                    + '/' + train_df.image_id.astype(str)\
                    + '.png'
print('Train:')
display(train_df.head(2))

In [None]:
data = train_df["cancer"].value_counts().values
fig, ax = plt.subplots(figsize=(16, 8), subplot_kw=dict(aspect="equal"))
def func(pct, allvals):
    absolute = int(np.round(pct/100.*np.sum(allvals)))
    return "{:.1f}%\n({:d} g)".format(pct, absolute)

wedges, texts, autotexts = ax.pie(data, autopct=lambda pct: func(pct, data),
                                  textprops=dict(color="w"))
values = ["No Cancer", "Cancer"]
ax.legend(wedges, values,
          title="Diagnosis",
          loc="center left",
          bbox_to_anchor=(1, 0, 0.5, 1))
plt.setp(autotexts, size=8, weight="bold")

ax.set_title("Patient Distribution")

plt.show()

The data is very asymmetrical with 97.9% of patients presenting negative and 2.1% of patients presenting positive.

In [None]:
from sklearn.utils import resample
cancer_df = train_df[train_df["cancer"] == 1]
clear_df = train_df[train_df["cancer"] == 0]

cancer_df = resample(cancer_df,
                    replace=True,
                    n_samples=len(clear_df),
                    random_state=123)
train_df = cancer_df.merge(clear_df, how="outer")

train_df

In [None]:
train_df = train_df.sample(frac=0.5)
train_df = train_df.reset_index()

In [None]:
data = train_df["cancer"].value_counts().values
fig, ax = plt.subplots(figsize=(16, 8), subplot_kw=dict(aspect="equal"))
def func(pct, allvals):
    absolute = int(np.round(pct/100.*np.sum(allvals)))
    return "{:.1f}%\n({:d} g)".format(pct, absolute)

wedges, texts, autotexts = ax.pie(data, autopct=lambda pct: func(pct, data),
                                  textprops=dict(color="w"))
values = ["No Cancer", "Cancer"]
ax.legend(wedges, values,
          title="Diagnosis",
          loc="center left",
          bbox_to_anchor=(1, 0, 0.5, 1))
plt.setp(autotexts, size=8, weight="bold")

ax.set_title("Patient Distribution")

plt.show()

## Data Pipeline

Split the train data into a train, val and test dataset
comprised of 80% train, 10% val, 10% test

In [None]:
# train test split for the train, val
from sklearn.model_selection import train_test_split
train_dataset, val_dataset = train_test_split(train_df, train_size=math.floor(0.80*(len(train_df))), random_state=CFG.random_seed)
val_dataset, test_dataset = train_test_split(val_dataset, train_size=math.floor(0.50*(len(val_dataset))), random_state=CFG.random_seed)

# reset index
train_dataset.reset_index(inplace=True)
val_dataset.reset_index(inplace=True)
test_dataset.reset_index(inplace=True)

# display the length
display(len(train_dataset))
display(len(val_dataset))
display(len(test_dataset))

In [None]:
class buildDataset():
    def __init__(self, X, y, test=False):
        self.X = X
        self.y = y
        self.target_size = [CFG.image_size, CFG.image_size]
        self.test = test
        
    def __getitem__(self, index):
        """generates one sample of the data"""
        # Select sample
        target_size = self.target_size
        path = self.X[index]
        y = self.y[index]
        img = torchvision.io.read_image(path, mode = ImageReadMode.RGB)
        X = self.transform(img) if not self.test else self.test_transform(img)
        return X, y
    
    transform = T.Compose([
        T.ToPILImage(),
        T.Resize(CFG.image_size),
        T.RandomRotation(45),
        T.AutoAugment(),
        T.ToTensor()])
    
    test_transform = T.Compose([
        T.ToPILImage(),
        T.Resize(CFG.image_size),
        T.ToTensor()])

    def __len__(self):
        """denotes number of samples"""
        return len(self.X)

In [None]:
# Define the X, y values for the train and val datasets
X_train = train_dataset["image_path"]
y_train = train_dataset["cancer"]

X_val = val_dataset["image_path"]
y_val = val_dataset["cancer"]

In [None]:
# Construct the pytorch datasets for the train and val data
train_dataset = buildDataset(X_train, y_train)
val_dataset = buildDataset(X_val, y_val)

# Construct the pytorch dataloaders for the train and val data
train_dl = DataLoader(train_dataset, CFG.batch_size, shuffle=CFG.random, num_workers=CFG.num_workers, pin_memory=True)
val_dl = DataLoader(val_dataset, CFG.batch_size, shuffle=CFG.random, num_workers=CFG.num_workers, pin_memory=True)

In [None]:
def show_images(image, label, nmax=64):
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xticks([]); ax.set_yticks([])
    ax.set_title(label)
    ax.imshow(make_grid((image.detach()[:nmax]), nrow=8).permute(1, 2, 0))
def show_batch(dl, nmax=64):
    for images, labels in dl:
        show_images(images, labels, nmax)
        break

In [None]:
for image, label in train_dl:
    print(len(label))
    print(len(image))
    break

In [None]:
show_batch(train_dl)

In [None]:
show_batch(val_dl)

## Pytorch Model

class ResNet18(nn.Module):
    def __init__(self, num_classes=2):
        super(ResNet18, self).__init__()

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True)
        )
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True)
        )
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        
        
    def save(self, model_path):
        torch.save(self.state_dict(), model_path)


In [None]:
torch.cuda.empty_cache()

In [None]:
def build_model():
    # Create a ResNet-18 model
    model = torchvision.models.resnet18(pretrained=True)

    # Replace the last fully connected layer with a new layer for binary classification
    model.fc = nn.Linear(512, 2)

    model = model.to(device)
    return model

    
    

In [None]:
model = build_model()
#wandb magic
wandb.watch(model, log_freq=100)

# Training

In [None]:
class Train:
    def __init__(self):
        self = self
    # Training Function 
    def train(self):
        num_epochs = CFG.epochs
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

        
        best_accuracy = 0.0 

        print("Begin training...") 
        for epoch in range(1, num_epochs+1): 
            running_train_loss = 0.0 
            running_accuracy = 0.0 
            running_vall_loss = 0.0 
            total = 0 

            # Training Loop 
            for i, data in enumerate(train_dl, 0): 
            #for data in enumerate(train_loader, 0): 
                inputs, outputs = data  # get the input and real species as outputs; data is a list of [inputs, outputs]
                inputs = torch.as_tensor(inputs, device="cuda")
                outputs = torch.as_tensor(outputs, device="cuda")
                optimizer.zero_grad()   # zero the parameter gradients          
                predicted_outputs = model(inputs)   # predict output from the model 
                train_loss = CFG.loss_fn(predicted_outputs, outputs)   # calculate loss for the predicted output  
                train_loss.backward()   # backpropagate the loss 
                optimizer.step()        # adjust parameters based on the calculated gradients 
                running_train_loss +=train_loss.item()  # track the loss value 
                wandb.log({"loss": running_train_loss})
                if i % 100 == 99:    # print every 100 mini-batches
                    print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_train_loss / 2000:.3f}')
                    running_loss = 0.0

            # Calculate training loss value 
            train_loss_value = running_train_loss/len(train_dl) 



            # Validation Loop 
            with torch.no_grad(): 
                model.eval() 
                for data in val_dl:
                    inputs, outputs = data
                    inputs = torch.as_tensor(inputs, device="cuda")
                    outputs = torch.as_tensor(outputs, device="cuda")
                    predicted_outputs = model(inputs) 
                    val_loss = CFG.loss_fn(predicted_outputs, outputs) 

                    # The label with the highest value will be our prediction 
                    _, predicted = torch.max(predicted_outputs, 1) 
                    running_vall_loss += val_loss.item()  
                    total += outputs.size(0) 
                    running_accuracy += (predicted == outputs).sum().item() 

            # Calculate validation loss value 
            val_loss_value = running_vall_loss/len(val_dl) 

            # Calculate accuracy as the number of correct predictions in the validation batch divided by the total number of predictions done.  
            accuracy = (100 * running_accuracy / total)     

            # Save the model if the accuracy is the best 
            if accuracy > best_accuracy: 
                self.saveModel() 
                best_accuracy = accuracy 

            # Print the statistics of the epoch 
            print('Completed training batch', epoch, 'Training Loss is: %.4f' %train_loss_value, 'Validation Loss is: %.4f' %val_loss_value, 'Accuracy is %d %%' % (accuracy))
    
    def saveModel(self): 
        path = "./ResNet18Model2.pth" 
        torch.save(model.state_dict(), path)
    

In [None]:
trainer = Train()
trainer.train()
print('Finished Training\n') 

# Testing

In [None]:
checkpoint_path = "/kaggle/working/ResNet18Model2.pth"
checkpoint  = torch.load(checkpoint_path)
model.load_state_dict(checkpoint, strict=False)
model.eval()

In [None]:
X_test, y_test = test_dataset["image_path"], test_dataset["cancer"]
test_dataset = buildDataset(X_test, y_test, test=True)
test_dl = DataLoader(test_dataset, CFG.batch_size, shuffle=CFG.random, num_workers=CFG.num_workers, pin_memory=True)

In [None]:
class Test:
    def __init__(self, model, test_dl):
        self.model = model
        self.test_dl = test_dl
    def testAccuracy(self):
        test_dl = self.test_dl
        accuracy = 0.0
        total = 0.0

        with torch.no_grad():
            for data in test_dl:
                images, labels = data
                # run the model on the test set to predict labels
                images = torch.as_tensor(images, device="cuda")
                labels = torch.as_tensor(labels, device="cuda")
                outputs = self.model(images)
                # the label with the highest energy will be our prediction
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                accuracy += (predicted == labels).sum().item()

        # compute the accuracy over all test images
        accuracy = (100 * accuracy / total)
        return(accuracy)
                
    # Function to show the images
    def imageshow(self, img):
        img = img.cpu()
        img = img / 2 + 0.5     # unnormalize
        npimg = img.numpy()
        plt.imshow(np.transpose(npimg, (1, 2, 0)))
        plt.show()


    # Function to test the model with a batch of images and show the labels predictions
    def testBatch(self):
        classes = ["No Cancer", "Cancer"]
        batch_size = CFG.batch_size
        test_dl = self.test_dl
        # get batch of images from the test DataLoader  
        images, labels = next(iter(test_dl))
        images = torch.as_tensor(images, device="cuda")
        labels = torch.as_tensor(labels, device="cuda")

        # show all images as one image grid
        self.imageshow(torchvision.utils.make_grid(images))

        # Show the real labels on the screen 
        print('Real labels: ', ' '.join('%5s' % classes[labels[j]] 
                                   for j in range(batch_size)))

        # Let's see what if the model identifiers the  labels of those example
        outputs = model(images)

        # We got the probability for every 10 labels. The highest (max) probability should be correct label
        _, predicted = torch.max(outputs, 1)

        # Let's show the predicted labels on the screen to compare with the real ones
        print('Predicted: ', ' '.join('%5s' % classes[predicted[j]] 
                                  for j in range(batch_size)))

In [None]:
tester = Test(model, test_dl)
tester.testAccuracy()

In [None]:
tester = Test(model, test_dl)
tester.testBatch()

In [None]:
preds = torch.tensor([]).to(device="cuda")
target = torch.tensor([]).to(device="cuda")
for data in test_dl:
    images, labels = data
    # run the model on the test set to predict labels
    images = torch.as_tensor(images, device="cuda")
    labels = torch.as_tensor(labels, device="cuda")
    outputs = model(images)
    # the label with the highest energy will be our prediction
    _, predicted = torch.max(outputs.data, 1)
    preds = torch.cat((preds, predicted), 0)
    target = torch.cat((target, labels), 0)

In [None]:
from torchmetrics.classification import BinaryConfusionMatrix
metric = BinaryConfusionMatrix().to(device="cuda")
cfn_mtrx = metric(preds, target)

In [None]:
cfn_mtrx = pd.DataFrame(cfn_mtrx.cpu().numpy(), columns=["Positive", "Negative"])
cfn_mtrx