<a href="https://colab.research.google.com/github/zachmurphy1/facemask-faster-rcnn/blob/main/Faster_R_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Faster R-CNN
This notebook implements and trains the Faster R-CNN network.

## Input
Images and annotations from train, train (OS), val, and test sets
```
Train:
facemask_data/train/images
facemask_data/train/annotations

Train (OS):
facemask_data/train/oversampling/images
facemask_data/train/oversampling/annotations

Val:
facemask_data/val/images
facemask_data/val/annotations

Test:
facemask_data/test/images
facemask_data/test/annotations
```

## Output
Training class instance
```
Models/
```

# Imports

In [None]:
# Imports
import pickle
import sys, os
import numpy as np
import pandas as pd

from PIL import Image
from bs4 import BeautifulSoup
import torch, torchvision
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# Mount data

In [None]:
# Mount data directory
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
%cd /content/gdrive/My\ Drive/facemask-faster-rcnn/

DATADIR = 'facemask_data'
ANNDIR = DATADIR + '/annotations'
IMGDIR = DATADIR + '/images'

SRPATH = 'sr_training/sr_model.pkl'

Mounted at /content/gdrive
/content/gdrive/My Drive/DL Final Project


## SR network architecture
Needed for importing SR network model

In [None]:
# Set upscaling factor
sr_scale=4

class Bblock(nn.Module):
  def __init__(self):
    super(Bblock,self).__init__()

    self.conv1 = nn.Conv2d(64,64,(3,3),stride=1,padding=1)
    self.bn1 = nn.BatchNorm2d(64)
    self.prelu = nn.PReLU(64)
    self.conv2 = nn.Conv2d(64,64,(3,3),stride=1,padding=1)
    self.bn2 = nn.BatchNorm2d(64)
    
  def forward(self, x):
    skip = x
    out = self.conv1(x)
    out = self.bn1(out)
    out = self.prelu(out)
    out = self.conv2(out)
    out = self.bn2(out)
    out = out + skip
    return out


class Upscale(nn.Module):
  def __init__(self):
    super(Upscale,self).__init__()
    self.conv1 = nn.Conv2d(64,256,(3,3),stride=1,padding=1)
    self.pixelShuffle = nn.PixelShuffle(2)
    self.prelu = nn.PReLU()

  def forward(self, x):
    out = self.conv1(x)
    out = self.pixelShuffle(out)
    out = self.prelu(out)
    return out


class SRNetwork(nn.Module):
  def __init__(self):
    super(SRNetwork,self).__init__()
    self.conv1 = nn.Conv2d(3,64,(9,9),stride=1,padding=4)
    self.prelu = nn.PReLU()

    bres_modules = []
    for i in range(16):
      bres_modules.append(Bblock())
    self.Bres = nn.Sequential(*bres_modules)

    self.conv2 = nn.Conv2d(64,64,(3,3),stride=1,padding=1)
    self.bn2 = nn.BatchNorm2d(64)

    self.upscale1 = Upscale()
    self.upscale2 = Upscale()

    self.conv3 = nn.Conv2d(64,3,(9,9),stride=1,padding=4)

  def forward(self, x):
    out = self.conv1(x)
    out = self.prelu(out)
    skip = out
    out = self.Bres(out)
    out = self.conv2(out)
    out = self.bn2(out)
    out = out + skip

    out = self.upscale1(out)
    out = self.upscale2(out)

    out = self.conv3(out)
    return out


# Dataset class

In [None]:
class MaskDataset(Dataset):
  def __init__(self, op='train', sr=False, do_transforms=True):
    # Set data dir based on op
    self.op = op
    if self.op in ['train', 'val', 'test', 'train/oversampling']:
      # Get data dirs and metadata
      self.data_dir = DATADIR + '/' + self.op
      self.ann_dir = os.path.join(self.data_dir,'annotations')
      self.img_dir = os.path.join(self.data_dir,'images')

      self.files = next(os.walk(self.img_dir))[2]
      self.n = len(self.files)
      self.do_transforms=do_transforms

      # Get instance counts by class
      # For each image
      counts = {'no_mask':0,
                'masked':0,
                'incorrect':0}
      for i in range(self.n):
        # Get annotations
        ann_path = self.ann_dir + '/' + str(i) + '.xml'
        with open(ann_path, 'r') as f:
          ann_xml = f.read()
        ann_parsed = BeautifulSoup(ann_xml,'lxml')
        objects = ann_parsed.find_all('object')
        n_objs = len(objects)

        for o in objects:
          # Get target path
          mask_class = o.find('name').text.strip()
          prefix = ''
          if mask_class == 'without_mask':
            prefix = 'no_mask'
          elif mask_class == 'with_mask':
            prefix = 'masked'
          elif mask_class == 'mask_weared_incorrect':
            prefix = 'incorrect'
          else:
            print('mask label error')
          # Increment count
          counts[prefix] += 1
      self.counts = counts
      
    else:
      print('op should be train, val, or test')

    # Set SR conditions
    self.sr = sr
    if self.sr:
      with open(SRPATH, 'rb') as f:
        self.sr_model = pickle.load(f)

  def __len__(self):
        return self.n

  def __getitem__(self, idx):
    # Get image
    img_path = self.img_dir + '/' + self.files[idx][:-4] + '.png'
    img = Image.open(img_path).convert('RGB')
    
    # Get annotations
    ann_path = self.ann_dir + '/' + self.files[idx][:-4]  + '.xml'
    with open(ann_path) as f:
      ann_xml = f.read()
      ann_parsed = BeautifulSoup(ann_xml,'xml')
      objects = ann_parsed.find_all('object')
    n_objs = len(objects)

    # Get ground truth bboxes and labels
    def getBbox(obj):
      return [int(o.find('xmin').text),int(o.find('ymin').text),int(o.find('xmax').text),int(o.find('ymax').text)]
    
    def getLabel(obj):
      label = obj.find('name').text.strip()
      if label == 'without_mask':
        return 1
      elif label == 'with_mask':
        return 2
      elif label == 'mask_weared_incorrect':
        return 3
      else:
        raise Exception("Unknown label '{}'".format(label))

    def getSize(obj):
      size_xml = obj.parent.find('size')
      width = int(size_xml.find('width').text)
      height = int(size_xml.find('height').text)
      return [width,height]
    
    bboxes = []
    labels = []
    size = []
    for o in objects:
      bboxes.append(getBbox(o))
      labels.append(getLabel(o))
      size.append(getSize(o))
    
    # Combine annotations into tensor dict
    ann = {
        'boxes': torch.as_tensor(bboxes),
        'labels': torch.as_tensor(labels),
        'image_id': torch.as_tensor([idx]),
        'size': torch.as_tensor(size)
    }

    # To tensor
    to_tensor = transforms.ToTensor()
    img = to_tensor(img)

    # SR
    if self.sr:
      with torch.no_grad():
        if torch.cuda.is_available():
          img = img.cuda()
        img = self.sr_model(img.unsqueeze(0))
        img = img[0]
        img = img.cpu()

      for b in range(len(ann['boxes'])):
        ann['boxes'][b] = ann['boxes'][b]*sr_scale
      ann['size'] = ann['size']*sr_scale


    # Transforms if train
    if self.do_transforms:
      if self.op=='train':
        # Color jitter
        cj = torchvision.transforms.ColorJitter()
        img = cj(img)

        # Random horiz flip
        if np.random.choice([True,False]):
          img = torch.flip(img,[2])
          for b in range(len(ann['boxes'])):
            width = ann['size'][b][0].item()
            xmin = ann['boxes'][b][0].item()
            ymin = ann['boxes'][b][1].item()
            xmax = ann['boxes'][b][2].item()
            ymax = ann['boxes'][b][3].item()

            ann['boxes'][b][0] = torch.Tensor([width - xmax])
            ann['boxes'][b][1] = torch.Tensor([ymin])
            ann['boxes'][b][2] = torch.Tensor([width - xmin])
            ann['boxes'][b][3] = torch.Tensor([ymax])

    # Return image and target
    return img, ann

In [None]:
# Instantiate data sets
do_transforms=True
sr=True

trainData = MaskDataset(op='train/oversampling', sr=sr, do_transforms=do_transforms)
print('Train:', len(trainData), trainData.counts)
valData = MaskDataset(op='val', sr=sr, do_transforms=do_transforms)
print('Val:', len(valData), valData.counts)
testData = MaskDataset(op='test', sr=sr, do_transforms=do_transforms)
print('Test:', len(testData), testData.counts)

Train: 511 {'no_mask': 1511, 'masked': 1862, 'incorrect': 1413}
Val: 171 {'no_mask': 117, 'masked': 614, 'incorrect': 30}
Test: 171 {'no_mask': 139, 'masked': 756, 'incorrect': 27}


# Epoch loss

In [None]:
def evalModel(model,dataLoader):
  with torch.no_grad():
    total_loss = 0
    for img, ann in dataLoader:
      # Put on cuda if available
      if torch.cuda.is_available():
        img = list(i.cuda() for i in img)
        ann = [{k:v.cuda() for k,v in a.items()} for a in ann]

      # Get loss, add to loss container
      loss = model(img,ann)
      loss_sum = sum(l for l in loss.values())
      total_loss += loss_sum.item()
    return total_loss/len(dataLoader)

# Training class

In [None]:
class TrainingModel():
  def collate_fn(self,batch):
      return tuple(zip(*batch))
      
  def __init__(self, model, batch_size, max_epochs, optimizer, 
               performTesting,print_every,saveFile):
    self.model = model
    self.batch_size = batch_size
    self.max_epochs = max_epochs
    self.optimizer = optimizer
    self.performTesting = performTesting
    self.print_every = print_every
    self.saveFile = DATADIR[:DATADIR.rfind('/')] + '/Models/' + saveFile + '.pkl'
    self.losses = {'train':[], 'val':[], 'test':[]}
    self.minibatch_losses = []
    self.epoch = 0

    self.trainLoader = DataLoader(trainData, batch_size=self.batch_size, pin_memory=True, shuffle=True, collate_fn=self.collate_fn)
    self.valLoader = DataLoader(valData, batch_size=self.batch_size, pin_memory=True, shuffle=True, collate_fn=self.collate_fn)
    self.testLoader = DataLoader(testData, batch_size=self.batch_size, pin_memory=True, shuffle=True, collate_fn=self.collate_fn)

  def save(self):
    # Save
    with open(self.saveFile, 'wb') as f:
      pickle.dump(self, f)

  def load(file):
    # Load
    with open(DATADIR[:DATADIR.rfind('/')] + '/Models/' + file + '.pkl', 'rb') as f:
      tm = pickle.load(f)
    return tm

  def train(self):
    # Get if cuda is available
    cuda_available = torch.cuda.is_available()

    # Put model on cuda
    if cuda_available:
      self.model = self.model.cuda()

    # For each epoch
    self.model.train()
    print('starting training...')
    for epoch in range(self.epoch,self.max_epochs,1):
      self.epoch = epoch
      # For each batch
      batch_count = 0
      for img, ann in self.trainLoader:
        # Put on cuda if available
        if cuda_available:
          img = list(i.cuda() for i in img)
          ann = [{k:v.cuda() for k,v in a.items()} for a in ann]

        # Get loss, add to loss container for minibatch train loss
        loss = self.model(img, ann)
        loss_sum = sum(l for l in loss.values())
        self.minibatch_losses.append(loss_sum.item())

        # Update status
        batch_count += 1
        sys.stdout.write('\rEpoch {} (Batch {}/{}) Loss: {:.8f}'.format(epoch,batch_count,len(self.trainLoader), loss_sum))
        sys.stdout.flush()

        # Backprop
        self.optimizer.zero_grad()
        loss_sum.backward()
        self.optimizer.step()

      # Evaluate, print, and save periodically
      with torch.no_grad():
        if epoch % self.print_every == 0:
          # Get val loss, append to container
          self.losses['train'].append(np.sum(self.minibatch_losses[-len(self.trainLoader):])/len(self.trainLoader))
          self.losses['val'].append(evalModel(self.model,self.valLoader))

          # If testing, get testing loss and append to container
          if self.performTesting:
            self.losses['test'].append(evalModel(self.model,self.testLoader))
          
          # Print
          if self.performTesting:
            print('Epoch {}:\tTrain loss: {:.4f}\tVal loss: {:.4f}\tTest loss: {:.4f}'.format(epoch, self.losses['train'][-1], self.losses['val'][-1], self.losses['test'][-1]))
          else:
            print('Epoch {}:\tTrain loss: {:.4f}\tVal loss: {:.4f}'.format(epoch, self.losses['train'][-1], self.losses['val'][-1]))

          # Save
          self.save()

## Perform training

In [None]:
# Instantiate model
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True, rpn_nms_thresh=0.5, min_size=1600)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, 3+1)

fasterRCNN = TrainingModel(model=model,
                        batch_size=4,
                        max_epochs=50,
                        optimizer=torch.optim.Adam([p for p in model.parameters() if p.requires_grad],
                                                   lr=1e-6,
                                                   weight_decay=1e-4),
                        performTesting=True,
                        print_every=1,
                        saveFile='final_os_sr')

# Perform training
fasterRCNN.train()

starting training...
Epoch 0 (Batch 128/128) Loss: 0.48878950Epoch 0:	Train loss: 1.0491	Val loss: 0.4096	Test loss: 0.4506
Epoch 1 (Batch 128/128) Loss: 0.35971051Epoch 1:	Train loss: 0.5122	Val loss: 0.3063	Test loss: 0.3498
Epoch 2 (Batch 128/128) Loss: 0.33409783Epoch 2:	Train loss: 0.4601	Val loss: 0.2705	Test loss: 0.3129
Epoch 3 (Batch 128/128) Loss: 0.49742040Epoch 3:	Train loss: 0.4306	Val loss: 0.2469	Test loss: 0.2902
Epoch 4 (Batch 128/128) Loss: 0.63708031Epoch 4:	Train loss: 0.4073	Val loss: 0.2328	Test loss: 0.2712
Epoch 5 (Batch 128/128) Loss: 0.24576114Epoch 5:	Train loss: 0.3867	Val loss: 0.2211	Test loss: 0.2565
Epoch 6 (Batch 128/128) Loss: 0.40656734Epoch 6:	Train loss: 0.3694	Val loss: 0.2099	Test loss: 0.2537
Epoch 7 (Batch 128/128) Loss: 0.28973192Epoch 7:	Train loss: 0.3496	Val loss: 0.2022	Test loss: 0.2386
Epoch 8 (Batch 128/128) Loss: 0.17637819Epoch 8:	Train loss: 0.3334	Val loss: 0.1939	Test loss: 0.2322
Epoch 9 (Batch 128/128) Loss: 0.20392938Epoch 9:	Tra

### Resume training if interrupted

In [None]:
# fasterRCNN = TrainingModel.load('final_sr')
# fasterRCNN.train()