# Detecting Breast Tissue Abnormalities in Mammograms through Deep Learning
DS 6050: Project Milestone II  
Project Group 4: Stephanie Landas (sfl7ck), Kristen Rose (krr4de), Michelle Wu (mw3ef)

In [None]:
# IMPORTS
import os
import math
import pickle
from tqdm import tqdm
import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
from sklearn import metrics

import torch
from torch import optim
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as tr
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights

device = "cuda" if torch.cuda.is_available() else "cpu"

## Load in csv

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
root = "/content/drive/MyDrive/DS6050 (Deep Learning) Project/Data"
train_csv_path = os.path.join(root, "csv", "mass_case_description_train_set.csv")
test_csv_path = os.path.join(root, "csv", "mass_case_description_test_set.csv")
images = os.path.join(root, "jpeg")

In [None]:
train_csv = pd.read_csv(train_csv_path)
test_csv = pd.read_csv(test_csv_path)

### View one image of each type

In [None]:
benign_img_path = train_csv.loc[train_csv['pathology'] == "BENIGN"].iloc[[0]]\
['image file path'].values[0]
benign_img = Image.open(os.path.join(images, benign_img_path.split("/")[-2], \
os.listdir(os.path.join(images, benign_img_path.split("/")[-2]))[0])).convert("RGB")
plt.imshow(benign_img)
plt.title("benign");
plt.show()

benign_no_callback_img_path = train_csv.loc[train_csv['pathology'] == "BENIGN_WITHOUT_CALLBACK"].iloc[[0]]\
['image file path'].values[0]
benign_no_callback_img = Image.open(os.path.join(images, benign_no_callback_img_path.split("/")[-2], \
os.listdir(os.path.join(images, benign_no_callback_img_path.split("/")[-2]))[0])).convert("RGB")
plt.imshow(benign_no_callback_img)
plt.title("benign no callback");
plt.show()

malignant_img_path = train_csv.loc[train_csv['pathology'] == "MALIGNANT"].iloc[[0]]\
['image file path'].values[0]
malignant_img = Image.open(os.path.join(images, malignant_img_path.split("/")[-2], \
os.listdir(os.path.join(images, malignant_img_path.split("/")[-2]))[0])).convert("RGB")
plt.imshow(malignant_img)
plt.title("malignant");
plt.show()


## Datasets

In [None]:
train_data = []
val_data = []

for index, row in train_csv.iterrows():
  path = f"{row['image file path'].split('/')[-2]}"
  if(index < np.round(len(train_csv)*0.85)):
    train_data.append(os.path.join(images, path, \
    f"{row['pathology']}_{os.listdir(os.path.join(images, row['image file path'].split('/')[-2]))[0]}"))
  else:
    val_data.append(os.path.join(images, path, \
    f"{row['pathology']}_{os.listdir(os.path.join(images, row['image file path'].split('/')[-2]))[0]}"))

print(f"train data: {len(train_data)}, val data: {len(val_data)}")

In [None]:
class ClassifierDataset(Dataset):
  def __init__(self, image_paths, class_dict, transforms):
    self.paths = image_paths
    self.class_dict = class_dict
    self.transforms = transforms

  def __len__(self):
    return len(self.paths)

  def __getitem__(self, idx):
    path = self.paths[idx]
    actual_path = f"{'/'.join(path.split('/')[:-1])}/{os.path.basename(path)[:-4].split('_')[-1]}.jpg"
    pil_img = Image.open(actual_path).convert("RGB")
    tensor = self.transforms(pil_img)
    label = os.path.basename(path)[:-4].split("_")[0]
    if(label == "BENIGN" or label == "BENIGN_NO_CALLBACK"):
      label = "not_cancer"
    else:
      label = "cancer"
    label = self.class_dict[label]
    return tensor, label


## Dataloader

In [None]:
batch_size = 4
class_dict = {"cancer": 0, "not_cancer": 1}
transforms = tr.Compose([tr.ToTensor(), tr.Resize((1024, 1024), antialias = True), tr.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
train_dataloader = DataLoader(dataset=ClassifierDataset(image_paths=train_data, class_dict=class_dict, transforms=transforms),
                              batch_size=batch_size,
                              num_workers=0,
                              shuffle=True)
val_dataloader = DataLoader(dataset=ClassifierDataset(image_paths=val_data, class_dict=class_dict, transforms=transforms),
                              batch_size=batch_size,
                              num_workers=0,
                              shuffle=True)
dataloader_dict = {"train": train_dataloader, "val": val_dataloader}



## Hyperparameters

In [None]:
epochs = 100
learning_rate = 0.001
weight_decay = 0.00001
momentum = 0.9

## Load in pretrained model

In [None]:
model = efficientnet_v2_s(weights=EfficientNet_V2_S_Weights)
model_name = "effnet_classify_breast_cancer"
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)
criterion = nn.CrossEntropyLoss()

In [None]:
model.to(device)
metrics = {"epoch": [], "train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}
best_loss = np.inf
for epoch in range(epochs):
  print(f"Epoch {epoch}")
  for phase in ["train", "val"]:
    if(phase == "train"):
      model.train()
    else:
      model.eval()
    running_loss = 0.0
    running_correct = 0.0
    loader = dataloader_dict[phase]
    for inputs, labels in tqdm(loader):
      inputs = inputs.to(device)
      labels = labels.to(device)

      optimizer.zero_grad()
      with torch.set_grad_enabled(phase == "train"):
        outputs = model(inputs).to(device)
        loss = criterion(outputs, labels)
        preds = torch.argmax(outputs, dim=1)

        running_loss += loss.item() * inputs.size(0)
        running_correct += torch.sum(preds == labels.data)

      if(phase == "train"):
        loss.backward()
        optimizer.step()

    epoch_loss = running_loss/len(dataloader_dict[phase].dataset)
    epoch_acc = running_correct/len(dataloader_dict[phase].dataset)
    print(f"{phase}; Loss: {epoch_loss}, Acc: {epoch_acc}")

    if(phase == "train"):
      metrics["epoch"].append(epoch)
      metrics["train_loss"].append(epoch_loss)
      metrics["train_acc"].append(epoch_acc)
    else:
      metrics["val_loss"].append(epoch_loss)
      metrics["val_acc"].append(epoch_acc)

    if(epoch_loss < best_loss and phase == "val"):
      best_loss = epoch_loss
      torch.save(model, f"/content/drive/MyDrive/DS6050 (Deep Learning) Project/model - efficientnet/{model_name}_best_model.pth")

    torch.save(model, f"/content/drive/MyDrive/DS6050 (Deep Learning) Project/model - efficientnet/history/{model_name}_epoch{epoch}.pth")

    with open(f"/content/drive/MyDrive/DS6050 (Deep Learning) Project/model - efficientnet/metrics.pkl", 'wb') as handle:
      pickle.dump(metrics, handle)



## Inference/ Evaluation results

In [None]:
batch_size = 16
class_dict = {"cancer": 0, "not_cancer": 1}
transforms = tr.Compose([tr.ToTensor(), tr.Resize((1024, 1024), antialias = True), tr.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])


In [None]:
test_data = []

for index, row in test_csv.iterrows():
  path = f"{row['image file path'].split('/')[-2]}"
  test_data.append(os.path.join(images, path, \
  f"{row['pathology']}_{os.listdir(os.path.join(images, row['image file path'].split('/')[-2]))[0]}"))


print(f"test data: {len(test_data)}")

test_dataloader = DataLoader(dataset=ClassifierDataset(image_paths=test_data, class_dict=class_dict, transforms=transforms),
                             batch_size=1,
                             num_workers=0,
                             shuffle=True)

In [None]:
model_path  = "/content/drive/MyDrive/DS6050 (Deep Learning) Project/model - efficientnet/effnet_classify_breast_cancer_best_model.pth"
model = torch.load(model_path, map_location=device)
model = model.eval()

In [None]:
preds = []
actuals = []
for inputs, labels in tqdm(test_dataloader):
  inputs = inputs.to(device)
  labels = labels.to(device)
  output = model(inputs)
  pred = torch.argmax(output, dim=1).cpu().detach().numpy()
  preds.append(pred)
  actuals.append(labels.cpu().detach().numpy())


In [None]:
recall = metrics.recall_score(actuals, preds, average="micro")
precision = metrics.precision_score(actuals, preds, average="micro")
print(f"recall: {recall}")
print(f"precision: {precision}")

In [None]:
confusion_matrix = metrics.confusion_matrix(actuals, preds, normalize="true")
disp = metrics.ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=list(class_dict.keys()))
disp.plot(cmap="Blues")
plt.title("Confusion Matrix")
plt.show()