The purpose of this notebook: 
to train PyTorch mask-rcnn network for detection of russian car plates

In [None]:
!pip install pyheif

In [None]:
import torch
import os
import pyheif
import json
from PIL import Image
import numpy as np
import random
import torchvision
from torchvision.io import read_image
import matplotlib.pyplot as plt
import torchvision.transforms.functional as F
import skimage
from skimage import draw
from torchvision.utils import draw_segmentation_masks, draw_bounding_boxes
from torchvision import transforms as T
from torch.utils.data import Dataset, DataLoader
from torchvision.ops import box_iou
from tqdm import tqdm

## Download data in storage

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
data_path = '/content/drive/MyDrive/Carplate_dataset2/'

## Description of data format for mask-crnn

The only specificity that we require is that the dataset __getitem__ should return:

> image: a PIL Image of size (H, W)

> target: a dict containing the following fields:

1.  boxes (FloatTensor[N, 4]): the ground-truth boxes in [x1, y1, x2, y2] format, with 0 <= x1 < x2 <= W and 0 <= y1 < y2 <= H. 
2.  labels (Int64Tensor[N]): the class label for each ground-truth box
3.  masks (UInt8Tensor[N, H, W]): the segmentation binary masks for each instance



        

        



## Dataset class

In [None]:
class Carplate_Dataset(Dataset):
    def __init__(self, data_path, transforms):
        self.data_path = data_path
        self.annotations = self.get_annotations(os.path.join(data_path, 'via_region_data.json'))['_via_img_metadata']
        self.images = list(set(os.listdir(data_path)) & set(self.annotations.keys()))
        self.transforms = transforms
        
    def __len__(self):
        return len(self.images)
    
    def get_annotations(self, path_to_file):
        annotations_dict = json.load(open(path_to_file))
        return annotations_dict
        
    def get_bboxes_and_labels(self, idx):
        img_annotations = self.annotations[self.images[idx]]
        boxes = []
        for r in img_annotations['regions']:
            polygon = r['shape_attributes']
            if polygon.get('all_points_x') is None or polygon.get('all_points_y') is None:
                continue
            boxes.append([min(polygon['all_points_x']), min(polygon['all_points_y']),
                        max(polygon['all_points_x']), max(polygon['all_points_y'])])
        boxes = torch.tensor(boxes, dtype=torch.float64)
        labels = torch.tensor([1 for i in range(boxes.size(0))], dtype=torch.int64)
        return boxes, labels
    
    def get_masks(self, idx, w, h):
        polygons = [r['shape_attributes'] 
                    for r in self.annotations[self.images[idx]]['regions']]
        masks = np.zeros([len(polygons), h, w], dtype=np.uint8)
        for i, polygon in enumerate(polygons):
            if polygon.get('all_points_x') is None or polygon.get('all_points_y') is None:
                continue
            rr, cc = draw.polygon(polygon['all_points_y'], polygon['all_points_x'], (h,w))
            masks[i, rr, cc] = 1
        masks = torch.tensor(masks, dtype=torch.bool)
        return masks
        
    def __getitem__(self, idx):
        img_path = self.data_path + self.images[idx]
        img = Image.open(img_path).convert('RGB')
        width = img.size[0]
        height = img.size[1]  
        
        image_id = torch.tensor([idx],dtype=torch.int64)
        boxes, labels = self.get_bboxes_and_labels(idx)
        masks = self.get_masks(idx, width, height)
        target = {}
        target['boxes'] = boxes
        target['labels'] = labels
        target['image_id'] = image_id
        target['masks'] = masks
        
        if self.transforms is not None:
            img, target = self.transforms(img, target)
        
        return img, target
    
    def vizualize_masks(self, idx):
        img_path = self.data_path + self.images[idx]
        img = Image.open(img_path).convert('RGB')
        transform = T.PILToTensor()
        img_tensor = transform(img)
        
        masks = self.get_masks(idx, img.size[0], img.size[1])

        transform = T.ToPILImage()
        annotated_img = transform(draw_segmentation_masks(img_tensor, masks))
        annotated_img.show()
    
    def vizualize_boxes(self, idx):
        img_path = self.data_path + self.images[idx]
        img = Image.open(img_path).convert('RGB')
        transform = T.PILToTensor()
        img_tensor = transform(img)
        
        boxes, _ = self.get_bboxes_and_labels(idx)
        
        transform = T.ToPILImage()
        annotated_img = transform(draw_bounding_boxes(img_tensor, boxes))
        annotated_img.show()
        

## Define transformations

In [None]:
class Compose():
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target
    
class ImageToTensor():
    def __call__(self, image, target):
        transform = T.ToTensor()
        image = transform(image)
        return image, target

In [None]:
data_transforms = [ImageToTensor()]
transforms_sequence = Compose(data_transforms)

## Create Datasets and Dataloaders

In [None]:
train_dataset = Carplate_Dataset(os.path.join(data_path, 'train/'), transforms_sequence)
val_dataset = Carplate_Dataset(os.path.join(data_path, 'val/'), transforms_sequence)

In [None]:
def collate_function(batch):
    return tuple(zip(*batch))

In [None]:
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True,
                        collate_fn=collate_function)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=True,
                        collate_fn=collate_function)

## Train function

In [None]:
!pip install neptune-client

In [None]:
from torch.optim import lr_scheduler
import torch.optim as optim
from tqdm import trange
from tqdm import tqdm
import neptune.new as neptune

In [None]:
def fit_epoch(model, train_loader, optimizer, lr_scheduler):
    model.train()
    device = torch.device("cuda")
    num_batches = 0
    total_loss = 0
    torch.cuda.empty_cache()
    for _, batch in enumerate(tqdm(train_loader)):
        images, targets = batch
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        with torch.cuda.amp.autocast():
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        if lr_scheduler is not None:
            lr_scheduler.step()
        num_batches += 1
        run["train/loss"].log(float(losses))
        run["train/loss_classifier"].log(float(loss_dict['loss_classifier']))
        run["train/loss_box_reg"].log(float(loss_dict['loss_box_reg']))
        run["train/loss_mask"].log(float(loss_dict['loss_mask']))
        run["train/loss_objectness"].log(float(loss_dict['loss_objectness']))
        run["train/loss_rpn_box_reg"].log(float(loss_dict['loss_rpn_box_reg']))
        total_loss += float(losses)
        
        
    return total_loss/num_batches

In [None]:
def get_IoU_metric(target_masks, predicted_masks):
    if target_masks.size(0)>1:
      return 0
    intersection = torch.sum(target_masks*predicted_masks, (1,2))
    intersection = torch.sum(intersection)/len(intersection)
    union = torch.sum(target_masks+predicted_masks, (1,2))
    union = torch.sum(union)/len(union)
    IoU_metric=intersection/union
    return float(IoU_metric)

In [None]:
def eval_epoch(model, val_loader):
    model.eval()
    device = torch.device("cuda")
    num_batches = 0
    total_iou_masks = 0
    total_iou_boxes = 0
    torch.cuda.empty_cache()
    for _, batch in enumerate(tqdm(val_loader)):
        images, targets = batch
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        
        with torch.no_grad():
            output = model(images)
        iou_masks_per_batch = np.mean([get_IoU_metric(targets[i]['masks'], output[i]['masks'][:,0,:,:]) 
                             for i in range(len(output))])
        iou_boxes_per_batch = np.mean([float(torch.sum(box_iou(output[i]['boxes'], targets[i]['boxes'])) / output[i]['boxes'].size(0))
                             for i in range(len(output))])
        run['eval/IoU_for_masks'].log(iou_masks_per_batch)
        run['eval/IoU_for_boxes'].log(iou_boxes_per_batch)
        total_iou_masks += iou_masks_per_batch
        total_iou_boxes += iou_boxes_per_batch
        num_batches += 1
        
    return total_iou_masks/num_batches, total_iou_boxes/num_batches

In [None]:
def train(train_dataloader, val_dataloader, model, epochs, opt, lr_scheduler):
    device = torch.device("cuda")
    model.to(device)
    for epoch in trange(epochs, desc="Epoch:"):
        linearLR_scheduler = None
        if epoch == 0:
           warmup_factor = 1.0 / 1000
           warmup_iters = min(1000, len(train_dataloader) - 1)

           linearLR_scheduler = torch.optim.lr_scheduler.LinearLR(
               optimizer, start_factor=warmup_factor, total_iters=warmup_iters
           )
        train_loss = fit_epoch(model, train_dataloader, opt, linearLR_scheduler)
        lr_scheduler.step()
        iou_masks, iou_boxes = eval_epoch(model, val_dataloader)
        run['train/epoch/loss'].log(train_loss)
        run['eval/epoch/iou_masks'].log(iou_masks)
        run['eval/epoch/iou_boxes'].log(iou_boxes)

## Load model and change number of classes

In [None]:
model = torchvision.models.detection.maskrcnn_resnet50_fpn_v2(weights=torchvision.models.detection.MaskRCNN_ResNet50_FPN_V2_Weights.DEFAULT)

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
in_features = model.roi_heads.box_predictor.cls_score.in_features
num_classes = 2
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

hidden_layer = 256
in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                       hidden_layer,
                                                       num_classes)
#loading pretrained model
#model.load_state_dict(torch.load('/content/drive/MyDrive/mask-rcnn_pretrainedv2_1_carplate_dataset.pt', map_location=torch.device('cpu')))

## Training model

In [None]:
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.003,
                            momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                   step_size=2,
                                                   gamma=0.1)
num_epochs = 7

In [None]:
run = neptune.init(
    api_token= os.getenv('NEPTUNE_API_TOKEN'),
    project='misha/carplate-segmentation'
)

In [None]:
train(train_loader, val_loader, model, num_epochs, optimizer, lr_scheduler)

In [None]:
torch.save(model.state_dict(), '/content/drive/MyDrive/mask-rcnn_pretrainedv2_2_carplate_dataset.pt')
