## Import and install dependencies

In [None]:
# Download TorchVision repo
!rm -r vision
!git init
!pip install pycocotools --quiet
!pip install torchvision --quiet
!git config --global --unset http.proxy 
!git config --global --unset https.proxy
!git clone https://github.com/pytorch/vision.git

!cp vision/references/detection/utils.py ./
!cp vision/references/detection/transforms.py ./
!cp vision/references/detection/coco_eval.py ./
!cp vision/references/detection/engine.py ./
!cp vision/references/detection/coco_utils.py ./

!rm -r pngImages
!mkdir pngImages

In [None]:
## Basic libraries
import numpy as np
import pandas as pd
import os
import cv2
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from glob import glob
from sklearn.model_selection import train_test_split
import pydicom
from skimage.transform import resize

## Torchvision libraries
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.tensorboard import SummaryWriter

## Image augmentations
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.transforms as transforms


## Helper libraries
from engine import train_one_epoch, evaluate
import utils
import transforms as T

## Data access and cleaning


In [None]:
## Parent path to data
path = '../path/to/Data/'

boxCSV = 'stage_2_train_labels.csv'
dataCSV = 'stage_2_detailed_class_info.csv'

imageFolders = ['stage_2_test_images', 'stage_2_train_images']

## First class is background
classes = [_, 1]

## Image data
dataDF = pd.read_csv(path + dataCSV)
## Box data
boxDF = pd.read_csv(path + boxCSV)



dataGlob = glob('../path/to/Data/*images/*.png')


## Dictionary of image to path
imagePaths = {os.path.basename(x)[:-4]: x for x in dataGlob}
## Full image path data table
dataPath = dataDF['patientId'].map(imagePaths.get)

## Full path to images with bounding boxes
boxPaths= boxDF['patientId'].map(imagePaths.get)
## Isolate images with bounding boxes
boxImages = pd.merge(left = boxDF, right = dataDF, left_on = 'patientId', right_on = 'patientId', how = 'inner')
boxImages.dropna(axis = 0, inplace = True)
boxDF.dropna(axis = 0, inplace = True)

## Bounding box and label data groupings by image. Using boxDF due to inner join duplication
xBox = boxDF.groupby('patientId')['x'].apply(np.array).reset_index()['x'].values
yBox = boxDF.groupby('patientId')['y'].apply(np.array).reset_index()['y'].values
wBox = boxDF.groupby('patientId')['width'].apply(np.array).reset_index()['width'].values
hBox = boxDF.groupby('patientId')['height'].apply(np.array).reset_index()['height'].values
## Group the finding labels together for the varying bounding boxes in each image
boxLabel = boxDF.groupby('patientId')['Target'].apply(np.array).reset_index()




boxLabel['paths'] = "../path/to/Data/stage_2_train_images/" + boxLabel['patientId'] + ".png"

print("Number of images: ", len(boxLabel))

In [None]:
## Convert dicom files to jpg images and write to directory
import imageio
import os
import shutil
from PIL import Image,ExifTags

def create_png_df():
    imageDict = {'Directory':[], 'ID':[]}
    
    for i in range(len(boxLabel)):
        try:
            imageName = boxLabel.iloc[i]['patientId']
            jpgName = imageName + ('.png')            
            imageDict['Directory'].append(os.path.join('../path/to/Data/stage_2_train_images_png', jpgName))
            imageDict['ID'].append(imageName)
        except Exception as e:
            print(i,e)

    return pd.DataFrame(imageDict)

pngImages = create_png_df()

In [None]:
len(pngImages)

## Data preprocessing

In [None]:
## Create a dictionary object for each image with its bounding box coords and labels
class LungData(torch.utils.data.Dataset):
    def __init__(self, height, width, transforms = None):
        self.height = height
        self.width = width
        self.len = len(boxLabel)
        self.transforms = transforms
    
    ## Overwrite and return the image dictionary
    def __getitem__(self, index):
        imagePath = pngImages['Directory'].iloc[index]

        # Read and resize the image
        origImage = cv2.imread(imagePath)
        image = cv2.cvtColor(origImage, cv2.COLOR_BGR2GRAY).astype(np.float32)
        image = cv2.resize(image, (self.width, self.height), cv2.INTER_AREA)
        image = image / 255.0
        
        
        ## Combine all boxes for an image together
        boxes = []
        labels = []
        
        ## Original image shape for normalizing (both 1024)
        Wimage = origImage.shape[1]
        Himage = origImage.shape[0]

        ## Create dictionary with image info and boxes
        for member in range(len(boxLabel['Target'].iloc[index])):
            labels.append(classes.index(boxLabel['Target'].iloc[index][member]))
                          
            xMin = xBox[index][member]
            xMax = xBox[index][member] + wBox[index][member]
            
            yMin = yBox[index][member]
            yMax = yBox[index][member] + hBox[index][member]
            
            xMinCorr = (xMin/Wimage) * self.width
            xMaxCorr = (xMax/Wimage) * self.width
            yMinCorr = (yMin/Himage) * self.height
            yMaxCorr = (yMax/Himage) * self.height
            
            boxes.append([xMinCorr, yMinCorr, xMaxCorr, yMaxCorr])
        
        ## Convert bounding boxes to tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        
        ## Calculate area of boxes
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])

        ## Suppose all instances are not crowd
        iscrowd = torch.zeros((boxes.shape[0],), dtype=torch.int64)
        
        labels = torch.as_tensor(labels, dtype=torch.int64)
        
        ## Create output dictionary
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["area"] = area
        target["iscrowd"] = iscrowd
        imageId = torch.tensor([index])
        # target["image_id"] = imageId
        target["image_id"] = index
        
        target["image_path"] = imagePath
        
        # Apply data transformations to reduce overfitting
        if self.transforms:
            sample = self.transforms(image = image,
                                     bboxes = target['boxes'],
                                     labels = labels)
            
            image = sample['image']
            target['boxes'] = torch.Tensor(sample['bboxes'])
               
        return image, target
    
    ## Overwrite function to return length of dataset
    def __len__(self):
        return self.len


## Check dataset length
dataset = LungData(256, 256)
print("Length of dataset: ", len(dataset), "\n")

## Sample index to show image dictionary output
image, target = dataset[878]
print("Image shape: ", image.shape, "\n")
print("Image dictionary object: ", target)

## Visualize an X-Ray image with bounding box

In [None]:
## Plot an image and overlay its bounding box on top of it
def plotImage(image, target, color):
    fig, a = plt.subplots(1,1)
    fig.set_size_inches(10,10)
    a.imshow(image, cmap='gray')
    ## get the context for drawing boxes
    ax = plt.gca()
    ## Plot each box
    for i in range(len(target['boxes'])):
        box = target['boxes'][i]
        x, y, width, height  = box[0], box[1], box[2] - box[0], box[3] - box[1]
        ## Create rectangle patch for bounding box
        rect = patches.Rectangle((x, y), width, height, linewidth = 2, edgecolor = color, facecolor = 'none')
        ax.add_patch(rect)
        ## Draw text on top of box
        label = "%s" % (classes[target['labels'][i]])
        plt.text(x, y, label, color='red')
    ## show the plot
    plt.show()
    # plt.close()
    
## Plot a sample image
image, target = dataset[200]
plotImage(image, target, 'r')

## Import ResNet-50-FPN model

In [None]:
## Create the base model to be trained
def detectionModel(numClasses):
    ## Load a model pretrained resnet model to speed training time
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    
    ## Get number of input features for the classifier
    inFeatures = model.roi_heads.box_predictor.cls_score.in_features
    ## Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(inFeatures, numClasses) 

    return model

## Data transformations during training to reduce overfit
def createTransform(train):
    if train:
        return A.Compose([A.HorizontalFlip(0.5), ToTensorV2(p=1.0)], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})
    else:
        return A.Compose([ToTensorV2(p=1.0)], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

## Split training and test data

In [None]:
## Initialize train and test objects
dataTrain = LungData(256, 256, createTransform(False))
dataTest = LungData(256, 256, createTransform(False))

## Split the dataset into train and test sets
torch.manual_seed(1)
# indices = torch.randperm(len(dataTrain)).tolist()
indices = torch.arange(len(dataTrain)).tolist()
dataSplit = 0.2
testSize = int(len(dataTrain)*dataSplit)
dataTrain = torch.utils.data.Subset(dataTrain, indices[:-testSize])
dataTest = torch.utils.data.Subset(dataTest, indices[-testSize:])

## Define training and validation data loaders
dataTrainLoader = torch.utils.data.DataLoader(dataTrain, batch_size=10, shuffle=True, num_workers=4, collate_fn=utils.collate_fn)
dataTestLoader = torch.utils.data.DataLoader(dataTest, batch_size=10, shuffle=False, num_workers=4, collate_fn=utils.collate_fn)

## Initialize model

In [None]:
## Determine if we can use a GPU
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## Initialize model
numClasses = 2
model = detectionModel(numClasses)

# model.double()
model.to(device)

## Construct an optimizer and learning rate
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.0055, momentum=0.9, weight_decay=0.0005)
# optimizer = torch.optim.Adam(params, lr=0.001, weight_decay=0.0005)
lrScheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

## Train the model

In [None]:
## Choose number of training epochs
numEpochs = 5

## Tensorboard writer
# writer = SummaryWriter()

## Train one epoch at a time for numEpochs
for epoch in range(numEpochs):
    train_one_epoch(model, optimizer, dataTrainLoader, device, epoch, print_freq=10)
    ## Update the learning rate
    lrScheduler.step()
    ## Evaluate on the test dataset
    evaluate(model, dataTestLoader, device=device)
    
    torch.save(model.state_dict(), f"../path/to/save/epoch_{epoch+1}")

## Implement bounding box threshold

In [None]:
## Threshold to determine best bounding box based on IOU threshold
def thresholdBB(prediction, thresh):
    ## Indeces of best bounding boxes
    bestBox = []
    for box in range(len(prediction['boxes'])):
        if prediction['scores'][box] >= thresh:
            bestBox.append(box)
    newPrediction = prediction
    newPrediction['boxes'] = newPrediction['boxes'][bestBox]
    newPrediction['scores'] = newPrediction['scores'][bestBox]
    newPrediction['labels'] = newPrediction['labels'][bestBox]
    return newPrediction

## Convert torch tensor to PIL image
def torchToPIL(image):
    return transforms.ToPILImage()(image).convert('RGB')

## Test model on test dataset

In [None]:
## Random image from test dataset
image, target = dataTest[57]
model.eval()
with torch.no_grad():
    prediction = model([image.to(device)])[0]
    
print('Predicted number of boxes: ', len(prediction['labels']))
print('True number of boxes: ', len(target['labels']))

## Visualize predicted vs. GT

In [None]:
## Plot both the predicted and ground truth boxes on the image
def plotPredictGT(image,width, height, targetGT, targetPredict, colorGT, colorPredict, isWP = False):
    fig, a = plt.subplots(1,1)
    fig.set_size_inches(10,10)
    image = image.reshape(width, height, -1)
    a.imshow(image, cmap='gray')
    ## get the context for drawing boxes
    ax = plt.gca()
    ## Plot each box
    for i in range(len(targetGT['boxes'])):
        box = targetGT['boxes'][i]
        x, y, width, height  = box[0], box[1], box[2] - box[0], box[3] - box[1]
        ## Create rectangle patch for bounding box
        rect = patches.Rectangle((x, y), width, height, linewidth = 2, edgecolor = colorGT, facecolor = 'none')
        ax.add_patch(rect)
        ## Draw text on top of box
        label = "%s" % (classes[targetGT['labels'][i]])
        plt.text(x, y, label, color= colorGT)
    for i in range(len(targetPredict['boxes'].cpu())):
        box = targetPredict['boxes'][i].cpu()
        x, y, width, height  = box[0], box[1], box[2] - box[0], box[3] - box[1]
        ## Create rectangle patch for bounding box
        rect = patches.Rectangle((x, y), width, height, linewidth = 2, edgecolor = colorPredict, facecolor = 'none')
        ax.add_patch(rect)
        ## Draw text on top of box
        label = "%s   %0.2f" % (classes[targetPredict['labels'][i].cpu()], targetPredict['scores'][i].cpu())
        plt.text(x, y, label, color= colorPredict)
        
    
    ## show the plot
    plt.show()
    
## Plot a sample image. Blue is predicted, red is GT
threshPredict = thresholdBB(prediction, 0.55)
plotPredictGT(image.cpu(), 256, 256, target, threshPredict, 'r', 'b')