In [1]:
import datetime
import os
import time

import presets
import torch
import torch.utils.data
import torchvision
import torchvision.models.detection
import torchvision.models.detection.mask_rcnn
import utils
from coco_utils import get_coco, get_coco_kp
from engine import evaluate, train_one_epoch
from group_by_aspect_ratio import create_aspect_ratio_groups, GroupedBatchSampler
from torchvision.transforms import InterpolationMode
from transforms import SimpleCopyPaste

In [2]:
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor


def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained on COCO
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")

    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # now get the number of input features for the mask classifier
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    # and replace the mask predictor with a new one
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                       hidden_layer,
                                                       num_classes)

    return model

In [3]:
import transforms as T


def get_transform(train):
    transforms = []
    transforms.append(T.PILToTensor())
    transforms.append(T.ConvertImageDtype(torch.float))
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

In [4]:
from dataset import SignsDataset

model = get_model_instance_segmentation(2)
dataset = SignsDataset('data/sign_dataset/train/', get_transform(train=True))
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=2, shuffle=True, num_workers=4,
    collate_fn=utils.collate_fn)
# For Training
images, targets = next(iter(data_loader))
images = list(image for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]
output = model(images, targets)  # Returns losses and detections
# For inference
model.eval()
x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
predictions = model(x)

In [5]:
from engine import train_one_epoch, evaluate
import utils


def train(epochs: int = 3):
    # train on the GPU or on the CPU, if a GPU is not available
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    # our dataset has two classes only - background and person
    num_classes = 2
    # use our dataset and defined transformations
    dataset = SignsDataset('data/sign_dataset/train/', get_transform(train=True))
    dataset_test = SignsDataset('data/sign_dataset/val/', get_transform(train=False))

    # split the dataset in train and test set
    indices = torch.randperm(len(dataset)).tolist()
    dataset = torch.utils.data.Subset(dataset, indices[:-50])
    dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])

    # define training and validation data loaders
    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=1, shuffle=True, num_workers=2,
        collate_fn=utils.collate_fn)

    data_loader_test = torch.utils.data.DataLoader(
        dataset_test, batch_size=1, shuffle=False, num_workers=2,
        collate_fn=utils.collate_fn)

    # get the model using our helper function
    model = get_model_instance_segmentation(num_classes)

    # move model to the right device
    model.to(device)

    # construct an optimizer
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005,
                                momentum=0.9, weight_decay=0.0005)
    # and a learning rate scheduler
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                   step_size=3,
                                                   gamma=0.1)

    for epoch in range(epochs):
        # train for one epoch, printing every 10 iterations
        train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
        # update the learning rate
        lr_scheduler.step()
        # evaluate on the test dataset
        evaluate(model, data_loader_test, device=device)

    print("That's it!")
    return model

In [6]:
model = train()

Epoch: [0]  [   0/2004]  eta: 5:31:23  lr: 0.000010  loss: 5.7623 (5.7623)  loss_classifier: 1.1565 (1.1565)  loss_box_reg: 0.0339 (0.0339)  loss_mask: 4.4528 (4.4528)  loss_objectness: 0.1134 (0.1134)  loss_rpn_box_reg: 0.0057 (0.0057)  time: 9.9219  data: 2.2623  max mem: 1387
Epoch: [0]  [  10/2004]  eta: 0:37:18  lr: 0.000060  loss: 4.4693 (3.8723)  loss_classifier: 1.1565 (1.1591)  loss_box_reg: 0.0610 (0.0675)  loss_mask: 2.9912 (2.5504)  loss_objectness: 0.0621 (0.0916)  loss_rpn_box_reg: 0.0028 (0.0037)  time: 1.1224  data: 0.2086  max mem: 1681
Epoch: [0]  [  20/2004]  eta: 0:22:47  lr: 0.000110  loss: 1.7457 (2.6054)  loss_classifier: 0.7276 (0.8059)  loss_box_reg: 0.0610 (0.0782)  loss_mask: 0.7491 (1.6306)  loss_objectness: 0.0573 (0.0849)  loss_rpn_box_reg: 0.0036 (0.0058)  time: 0.2276  data: 0.0030  max mem: 1684
Epoch: [0]  [  30/2004]  eta: 0:17:34  lr: 0.000160  loss: 0.9975 (2.0581)  loss_classifier: 0.1895 (0.5949)  loss_box_reg: 0.0759 (0.0914)  loss_mask: 0.5314 (

In [None]:
# torch.save(model.state_dict(), 'checkpoints/maskecnn.pth')

In [6]:
checkpoint = torch.load('checkpoints/maskecnn.pth')
model = get_model_instance_segmentation(2)
model.load_state_dict(checkpoint)

<All keys matched successfully>

In [7]:
dataset_test = SignsDataset('data/sign_dataset/val/', get_transform(train=False))
data_loader_test = torch.utils.data.DataLoader(
    dataset_test, batch_size=1, shuffle=False, num_workers=2,
    collate_fn=utils.collate_fn)

In [40]:
evaluate(model.to('cuda'), data_loader_test, device='cuda')

creating index...
index created!
Test:  [  0/127]  eta: 0:03:10  model_time: 0.2665 (0.2665)  evaluator_time: 0.0196 (0.0196)  time: 1.5030  data: 1.2129  max mem: 7402
Test:  [100/127]  eta: 0:00:03  model_time: 0.0820 (0.0855)  evaluator_time: 0.0090 (0.0104)  time: 0.1012  data: 0.0018  max mem: 7402
Test:  [126/127]  eta: 0:00:00  model_time: 0.0931 (0.0889)  evaluator_time: 0.0460 (0.0247)  time: 0.3019  data: 0.0649  max mem: 7402
Test: Total time: 0:00:18 (0.1451 s / it)
Averaged stats: model_time: 0.0931 (0.0889)  evaluator_time: 0.0460 (0.0247)
Accumulating evaluation results...
DONE (t=0.02s).
Accumulating evaluation results...
DONE (t=0.01s).
IoU metric: bbox
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.657
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets=100 ] = 0.922
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets=100 ] = 0.815
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.568
 

<coco_eval.CocoEvaluator at 0x233b11276a0>

In [9]:
dataset = SignsDataset('data/sign_dataset/val/', get_transform(train=True))
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=4, shuffle=True, num_workers=2,
    collate_fn=utils.collate_fn)

In [20]:
from typing import Union
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt


def draw_images(images: list, targets: list, n_rows: int, th: Union[None, float] = None, name: str = 'default'):
    maskes_imgs = list()
    for i in range(len(images)):
        maskes_imgs.append(get_image(images[i], targets[i], th=th))
    fig, axes = plt.subplots(nrows=n_rows, ncols=1, figsize=(15, 15))
    ind_img = 0
    for col in range(min(n_rows, len(maskes_imgs))):
        axes[col].imshow(maskes_imgs[ind_img])
        ind_img += 1
    fig.savefig(f'saved/{name}.png')


def get_image(image, target, th: float = None):
    image = (image * 255).to(torch.uint8)
    masks = target['masks'].to(torch.bool)
    boxes = target['boxes']
    if th is not None:
        masks = masks[target['scores'] > th]
        boxes = boxes[target['scores'] > th]
    image = torchvision.utils.draw_segmentation_masks(image, masks)
    image = torchvision.utils.draw_bounding_boxes(image, boxes, width=3, colors=(0, 125, 255))
    image = torch.permute(image, (1, 2, 0))
    image = np.array(image, dtype=np.uint8)
    return Image.fromarray(image)

images, true_targets = list(next(iter(data_loader)))
draw_images(images, true_targets, n_rows=4, th=None)

In [13]:
model.eval()
model.to('cpu')
images = list(image.to('cpu') for image in images)
pred_target = model(images)
for i in range(len(pred_target)):
    pred_target[i]['masks'] = pred_target[i]['masks'][:, 0, :, :]
targets = [{k: v.to('cpu') for k, v in t.items()} for t in targets]
draw_images(images, pred_target, th=0.9, name='predict', n_rows=4)

In [21]:
import os

def open_image(file_path: str):
    img = Image.open(file_path).convert("RGB")
    img = torchvision.transforms.PILToTensor()(img)
    img = torchvision.transforms.ConvertImageDtype(torch.float)(img)
    return img

path = 'C:\\MySpace\\Projects\\GenerImages\\lab3\\data\\images'
fp_images = os.listdir(path)
images = [open_image(f'{path}\\{fp}') for fp in fp_images]
pred_target = model(images)
for i in range(len(pred_target)):
    pred_target[i]['masks'] = pred_target[i]['masks'][:, 0, :, :]
targets = [{k: v.to('cpu') for k, v in t.items()} for t in targets]
draw_images(images, pred_target, th=0.9, name='real_predict', n_rows=4)