In [None]:
import os
import csv
import numpy as np
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt

# Module for Google Drive
from google.colab import drive

drive.mount('/content/drive')
%cd './drive/MyDrive/XrayObjectDetection/codes/'

Mounted at /content/drive
[Errno 2] No such file or directory: './drive/MyDrive/XrayObjectDetection/codes/'
/content


In [None]:
import matplotlib.patches as patches
from PIL import Image
from xml.dom.minidom import parse
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Current device is: {device}')

Current device is: cuda


In [None]:
class XrayDataset(Dataset):

    def __init__(self, data_path, phase):
        self.data_path = data_path
        self.phase = phase
        self.images_path = os.path.join(self.data_path, self.phase)
        self.image_names = sorted(os.listdir(self.images_path))
        self.class_to_idx = {"Gun": 1, "Knife": 2, "Wrench": 3, "Pliers": 4}

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        full_image_path = os.path.join(self.images_path, self.image_names[idx])
        img = Image.open(full_image_path)

        # tensor_transform = torchvision.transforms.ToTensor()
        # resize the image, batch size ??
        # VGG16 accepts image of size (3, 224, 224)
        # resize = torchvision.transforms.Resize(size=224)
        # transforms = torchvision.transforms.Compose([tensor_transform
        #                                                      ])
        # img= transforms(img)

        dom = parse("{}/Annotation/{}.xml".format(self.data_path, self.image_names[idx].split(".")[0]))
        data = dom.documentElement

        objects = data.getElementsByTagName('object')
        width = data.getElementsByTagName('width')[0].childNodes[0].nodeValue
        height = data.getElementsByTagName('height')[0].childNodes[0].nodeValue
        height, width = float(height), float(width)
        boxes = []
        labels = []
        for obj in objects:
            name = obj.getElementsByTagName('name')[0].childNodes[0].nodeValue
            bndbox = obj.getElementsByTagName('bndbox')[0]
            xmin = bndbox.getElementsByTagName('xmin')[0].childNodes[0].nodeValue
            ymin = bndbox.getElementsByTagName('ymin')[0].childNodes[0].nodeValue
            xmax = bndbox.getElementsByTagName('xmax')[0].childNodes[0].nodeValue
            ymax = bndbox.getElementsByTagName('ymax')[0].childNodes[0].nodeValue
            labels.append(self.class_to_idx[name])
            boxes.append([xmin, ymin, xmax, ymax])

        H, W = height, width

        
        o_H, o_W = img.size[0], img.size[1]
        for idx, bbox in enumerate(boxes):
            boxes[idx] = self.resize_bbox(bbox, (H, W), (o_H, o_W))

        img = torchvision.transforms.functional.resize(img, (300, 300))
        img = torchvision.transforms.functional.to_tensor(img)
        mean, std = (0.485, 0.456, 0.406), (0.229, 0.224, 0.225)  
        img = torchvision.transforms.functional.normalize(img, mean, std)
       
        return img, boxes, labels


    def resize_bbox(self, bbox, in_size, out_size):
        y_scale = float(out_size[0]) / in_size[0]
        x_scale = float(out_size[1]) / in_size[1]
        xmin = x_scale * float(bbox[0])
        ymin = y_scale * float(bbox[1])
        xmax = x_scale * float(bbox[2])
        ymax = y_scale * float(bbox[3])
        return [xmin, ymin, xmax, ymax]


In [None]:
from model.xray_dataloder import XrayDataset

data_path="./dataset"
# def collate_fn(batch):
#     return (zip(*batch))
batch_size= 1
train_dataset = XrayDataset(data_path, "train")
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, collate_fn=lambda batch:zip(*batch))

val_dataset = XrayDataset(data_path, "val")
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, pin_memory=True, collate_fn=lambda batch:zip(*batch))

print(len(train_dataloader))

ModuleNotFoundError: ignored

In [None]:
idx2class = {1:"Gun", 2:"Knife", 3:"Wrench"}

img, boxes, labels = next(iter(train_dataloader))

fig, ax = plt.subplots(1, 5, figsize=(20, 10))
for i in range(5):
    ax[i].imshow(img[i].permute(1, 2, 0))
    for j, box in enumerate(boxes[i]):
       for label in labels[i]:
          class_name = idx2class[label]
          xy = (box[0], box[1])
          w = float(box[2]) - xy[0] 
          h = float(box[3]) - xy[1] 
          rect = plt.Rectangle(xy, w, h, fill=False, edgecolor = 'red',linewidth=1)
          ax[i].add_patch(rect)
          ax[i].text(xy[0], xy[1]-5, class_name, c='r')

plt.show()


In [None]:
from model.model import *

n_classes=4
# path_pretrained_state_dict="../best_vgg_state_dict.pth"
model = SSD300(n_classes=n_classes,path_pretrained_state_dict=path_pretrained_state_dict)
model.to(device)

## Training and Validating

In [None]:
def train(model, criterion, optimizer, images, bboxes, labels):

    model.train()

    images = torch.stack(images, 0).to(device)
    bboxes = [torch.tensor(b).to(device)/300 for b in bboxes]
    labels = [torch.tensor(l).to(device) for l in labels]
    
    # images, a tensor of dimensions (N, 3, 300, 300)
    pred_locs, pred_scores = model(images) 
    # object bounding boxes in boundary coordinates, a list of N tensors
    # object labels, a list of N tensors
    loss = criterion(pred_locs, pred_scores, bboxes, labels) 
    
    # clear gradient and perform backprop
    optimizer.zero_grad()
    loss.backward()

    # update model
    optimizer.step()
    return loss


In [None]:
def validate(model, criterion, images, bboxes, labels):
    model.eval()

    with torch.no_grad():
        images = torch.stack(images, 0).to(device)
        bboxes = [torch.tensor(b).to(device)/300 for b in bboxes]
        labels = [torch.tensor(l).to(device) for l in labels]

        pred_locs, pred_scores = model(images)
        det_boxes_batch, det_labels_batch, det_scores_batch = model.detect_objects(pred_locs, pred_scores,
                                                                                       min_score=0.01, max_overlap=0.45,
                                                                                       top_k=200)
        loss = criterion(pred_locs, pred_scores, bboxes, labels)

    return loss, det_boxes_batch, det_labels_batch, det_scores_batch



In [None]:


max_epoch = 50
save_stride = 10
print_stride = 1
max_val_mAP=0

version = 'original_ssd_pretrained'
model_dir = '../results'
tmp_path = '../results/checkpoint_{}.pth'.format(version)


# opitmizer
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=5e-4)
# scheduler 
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=2)
# loss function
# criterion = multi_box_loss.MultiBoxLoss(priors_cxcy=model.priors_cxcy).to(device)
criterion = MultiBoxLoss(priors_cxcy=model.priors_cxcy).to(device)

train_losses = []
val_losses = []
val_mAPs = []

for epoch in tqdm(range(max_epoch)):        
    ###Train Phase

    # Initialize Loss and Accuracy
    train_loss = 0.0
    
    # Load the saved MODEL AND OPTIMIZER after evaluation.
    if epoch >= 0:
        checkpoint = torch.load(tmp_path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    # Iterate over the train_dataloader
    with tqdm(total=len(train_dataloader)) as pbar:
        for idx, (images, bboxes, labels) in enumerate(train_dataloader):

          
            curr_loss = train(model, criterion, optimizer, images, bboxes, labels)
            train_loss += curr_loss / len(train_dataloader)
            pbar.update(1)

    
    checkpoint = {
        'model' : SSD300(n_classes=n_classes,path_pretrained_state_dict=path_pretrained_state_dict),
        'epoch' : epoch+1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }
    
    
    torch.save(checkpoint, tmp_path)
    # torch.save(checkpoint, os.path.join(model_dir, 'model_recent.pth'))

    ### Validation Phase
    # Initialize Loss and Accuracy
    val_loss = 0.0

    det_boxes = list()
    det_labels = list()
    det_scores = list()
    true_boxes = list()
    true_labels = list()

    # Iterate over the val_dataloader
    with tqdm(total=len(val_dataloader)) as pbar:
        
        for idx, (images, bboxes, labels) in enumerate(val_dataloader):
            curr_loss, det_boxes_batch, det_labels_batch, det_scores_batch = validate(model, criterion, images, bboxes, labels)
            val_loss += curr_loss / len(val_dataloader)
   
    
            bboxes = [torch.tensor(b).to(device)/300 for b in bboxes]
            labels = [torch.tensor(l).to(device) for l in labels]
           
            det_boxes.extend(det_boxes_batch)
            det_labels.extend(det_labels_batch)
            det_scores.extend(det_scores_batch)
            true_boxes.extend(bboxes)
            true_labels.extend(labels)
            
            pbar.update(1)

    val_APs, val_mAP = calculate_mAP(det_boxes, det_labels, det_scores, true_boxes, true_labels,device)

    if max_val_mAP < val_mAP:
          torch.save(checkpoint, os.path.join(model_dir, 'best_model_{}.pth'.format(version)))
          max_val_mAP = val_mAP
          print(f'new min val loss: {max_val_mAP}')

  
    if (epoch) % print_stride == 0:
       print(f'Epoch: {epoch+1} Training loss: {train_loss} Validation loss: {val_loss}')   
       print(f'mAP of the validation set {val_mAP}: \n {val_APs}')

    

    train_losses.append(train_loss.cpu().detach().numpy())
    val_losses.append(val_loss.cpu().detach().numpy())
    val_mAPs.append(val_mAP)
   
    scheduler.step(val_loss)

torch.save(train_losses, os.path.join(model_dir, 'train_losses_{}.pth'.format(version)))
torch.save(val_losses, os.path.join(model_dir, 'val_losses_{}.pth'.format(version)))
torch.save(val_mAPs, os.path.join(model_dir, 'val_mAPs_{}.pth'.format(version)))


In [None]:
torch.save(train_losses, os.path.join(model_dir, 'train_losses_{}.pth'.format(version)))
torch.save(val_losses, os.path.join(model_dir, 'val_losses_{}.pth'.format(version)))
torch.save(val_mAPs, os.path.join(model_dir, 'val_mAPs_{}.pth'.format(version)))

In [None]:
# plt.plot(train_losses)
# plt.plot(val_losses)
epochs=np.arange(len(val_mAPs))+1
plt.plot(epochs,val_mAPs,)
plt.xlabel('epoch')
plt.ylabel('map')
plt.title('mean of average precision')
plt.savefig(os.path.join(model_dir, 'map_{}.png'.format(version)))
plt.show()
#plt.clear()

In [None]:
plt.plot(epochs,train_losses,label="train losses")
plt.plot(epochs,val_losses,label="validation losses")
plt.legend()
plt.savefig(os.path.join(model_dir,'train_losses_{}.png'.format(version)))
plt.show()