In [1]:
import pandas as pd
import numpy as np
import cv2
import os
import re
import random

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torch
import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SequentialSampler

from matplotlib import pyplot as plt

import pickle
import json
import sys
sys.path.append('..')
import utils

IMAGE_SIZE = 1024

In [2]:
DATASET_PATH = '../mvtec_anomaly_detection_data'

annotations = utils.annotate_dataset(DATASET_PATH)
utils.create_annotation_files(annotations)

Found 15 detection objects.
Detection objects: ['bottle', 'cable', 'capsule', 'carpet', 'grid', 'hazelnut', 'leather', 'metal_nut', 'pill', 'screw', 'tile', 'toothbrush', 'transistor', 'wood', 'zipper']
Annotating: bottle (1/15)
Found 3 classes in ../mvtec_anomaly_detection_data\bottle\ground_truth
Classes: ['broken_large', 'broken_small', 'contamination']
Annotating: cable (2/15)
Found 8 classes in ../mvtec_anomaly_detection_data\cable\ground_truth
Classes: ['bent_wire', 'cable_swap', 'combined', 'cut_inner_insulation', 'cut_outer_insulation', 'missing_cable', 'missing_wire', 'poke_insulation']
Annotating: capsule (3/15)
Found 5 classes in ../mvtec_anomaly_detection_data\capsule\ground_truth
Classes: ['crack', 'faulty_imprint', 'poke', 'scratch', 'squeeze']
Annotating: carpet (4/15)
Found 5 classes in ../mvtec_anomaly_detection_data\carpet\ground_truth
Classes: ['color', 'cut', 'hole', 'metal_contamination', 'thread']
Annotating: grid (5/15)
Found 5 classes in ../mvtec_anomaly_detecti

In [3]:
dataset = utils.load_annotation_file('3')
random.shuffle(dataset)
train_ds, test_ds, val_ds = utils.train_test_split_annotations(dataset)

In [4]:
train_ds[0]

('3',
 (0.336, 0.314, 0.129, 0.116),
 '../mvtec_anomaly_detection_data\\carpet\\test\\metal_contamination\\006.png')

In [5]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [6]:
class MVTEC_Dataset(Dataset):
    def __init__(self, dataset, transforms=None):
        super().__init__()       
        
        self.dataset = dataset
        self.transforms = transforms
        
    def __getitem__(self, index: int):
        data = self.dataset[index]
        
        image = cv2.imread(data[2])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0
        
        bbox = list(data[1])
        bbox[0] = bbox[0] * IMAGE_SIZE
        bbox[1] = bbox[1] * IMAGE_SIZE
        bbox[2] = bbox[2] * IMAGE_SIZE + bbox[0]
        bbox[3] = bbox[3] * IMAGE_SIZE + bbox[1]
        bbox = [int(x) for x in bbox]
        area = bbox[2] * bbox[3]
        
        labels = torch.as_tensor([int(data[0])], dtype=torch.int64)
        
        iscrowd = torch.zeros([0], dtype=torch.int64)
        
        target = {}
        target['boxes'] = torch.Tensor([bbox])
        target['labels'] = labels
        # target['masks'] = None
        target['image_id'] = torch.tensor([index])
        target['area'] = torch.Tensor([area])
        target['iscrowd'] = iscrowd
        
        if self.transforms:
            sample = {
                'image': image,
                'bboxes': target['boxes'],
                'labels': labels
            }
            sample = self.transforms(**sample)
            image = sample['image']
            
            target['boxes'] = torch.stack(tuple(map(torch.tensor, zip(*sample['bboxes'])))).permute(1, 0)
        else:
            transform = torchvision.transforms.Compose([
                torchvision.transforms.ToTensor()
            ])
            image = transform(image)
        
        return image, target, index
    
    def __len__(self) -> int:
        return len(dataset)

In [7]:
# Albumentations
def get_train_transform():
    return A.Compose([
        A.Flip(0.5),
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

def get_valid_transform():
    return A.Compose([
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

In [8]:
# load a model; pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

In [9]:
num_classes = 5

# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features

# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [10]:
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0

    def send(self, value):
        self.current_total += value
        self.iterations += 1

    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations

    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0

In [19]:
train_ds

[('3',
  (0.336, 0.314, 0.129, 0.116),
  '../mvtec_anomaly_detection_data\\carpet\\test\\metal_contamination\\006.png'),
 ('4',
  (0.0, 0.51, 0.345, 0.49),
  '../mvtec_anomaly_detection_data\\carpet\\test\\thread\\017.png'),
 ('2',
  (0.136, 0.564, 0.194, 0.202),
  '../mvtec_anomaly_detection_data\\carpet\\test\\hole\\005.png'),
 ('4',
  (0.134, 0.598, 0.699, 0.317),
  '../mvtec_anomaly_detection_data\\carpet\\test\\thread\\008.png'),
 ('2',
  (0.182, 0.574, 0.13, 0.136),
  '../mvtec_anomaly_detection_data\\carpet\\test\\hole\\014.png'),
 ('0',
  (0.25, 0.573, 0.068, 0.083),
  '../mvtec_anomaly_detection_data\\carpet\\test\\color\\018.png'),
 ('3',
  (0.447, 0.651, 0.103, 0.111),
  '../mvtec_anomaly_detection_data\\carpet\\test\\metal_contamination\\015.png'),
 ('1',
  (0.358, 0.389, 0.247, 0.236),
  '../mvtec_anomaly_detection_data\\carpet\\test\\cut\\003.png'),
 ('3',
  (0.208, 0.279, 0.116, 0.074),
  '../mvtec_anomaly_detection_data\\carpet\\test\\metal_contamination\\012.png'),
 ('

In [11]:
def collate_fn(batch):
    return tuple(zip(*batch))

train_dataset = MVTEC_Dataset(train_ds, None)
test_dataset = MVTEC_Dataset(test_ds, None)
val_dataset = MVTEC_Dataset(val_ds, None)

train_data_loader = DataLoader(
    train_dataset,
    batch_size=2,
    shuffle=False,
    collate_fn=collate_fn
)

test_data_loader = DataLoader(
    train_dataset,
    batch_size=2,
    shuffle=False,
    collate_fn=collate_fn
)

valid_data_loader = DataLoader(
    val_dataset,
    batch_size=2,
    shuffle=False,
    collate_fn=collate_fn
)

In [12]:
#images, targets, image_ids = next(iter(train_data_loader))
#images = list(image.to(device) for image in images)
#targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

In [13]:
_, target, _ = next(iter(train_data_loader))
target

({'boxes': tensor([[344., 321., 476., 440.]]),
  'labels': tensor([3]),
  'image_id': tensor([0]),
  'area': tensor([209440.]),
  'iscrowd': tensor([], dtype=torch.int64)},
 {'boxes': tensor([[   0.,  522.,  353., 1024.]]),
  'labels': tensor([4]),
  'image_id': tensor([1]),
  'area': tensor([361472.]),
  'iscrowd': tensor([], dtype=torch.int64)})

In [14]:
target = [{k: v.to(device) for k, v in t.items()} for t in target]

In [15]:
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
# lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
lr_scheduler = None

num_epochs = 2

In [16]:
loss_hist = Averager()
itr = 1

for epoch in range(num_epochs):
    loss_hist.reset()
    
    for images, targets, image_ids in train_data_loader:
        
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)

        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()

        loss_hist.send(loss_value)

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if itr % 50 == 0:
            print(f"Iteration #{itr} loss: {loss_value}")

        itr += 1
    
    # update the learning rate
    if lr_scheduler is not None:
        lr_scheduler.step()

    print(f"Epoch #{epoch} loss: {loss_hist.value}")   

IndexError: list index out of range