In [None]:
import os
import os.path as path
import re

import numpy as np
import pandas as pd
import pycocotools
import torch
import torch.utils.data
import torchvision
import transforms as T
import utils
from engine import evaluate, train_one_epoch
from PIL import Image, ImageDraw
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

up = path.abspath(path.join("FastRCNNPredictor.ipynb", "../../../.."))

In [None]:
working_dir = up + "/mkortas/detect-waste/FastRCNN/"
data_file = up + "/mkortas/detect-waste/annotations/annotat.csv"
data_folder = up + "/TACO-master/data/"
model_path = up + "/mkortas/detect-waste/models/model"
num_classes = 28

In [None]:
os.chdir(working_dir)

In [None]:
def parse_one_annot(path_to_data_file, filename):
    data = pd.read_csv(path_to_data_file)
    print(filename)
    boxes_array = data[data["filename"] == filename][
        ["xmin", "ymin", "xmax", "ymax"]
    ].values
    print(boxes_array)
    return boxes_array

In [None]:
def getListOfFiles(dirName):
    listOfFile = os.listdir(dirName)
    allFiles = list()
    for entry in listOfFile:
        fullPath = os.path.join(dirName, entry)
        if os.path.isdir(fullPath):
            allFiles = allFiles + getListOfFiles(fullPath)
        else:
            fullPath = fullPath.replace(data_folder, "")
            allFiles.append(fullPath)
    return allFiles

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, root, data_file, transforms=None):
        self.root = root
        self.transforms = transforms
        files = getListOfFiles(os.path.join(root))
        p = re.compile("batch")
        l2 = [s for s in files if p.match(s)]
        self.imgs = sorted(l2)
        self.path_to_data_file = data_file

    def __getitem__(self, idx):
        img_path = os.path.join(self.root, self.imgs[idx])
        self.imgs[idx] = self.imgs[idx]
        img = Image.open(img_path).convert("RGB")
        box_list = parse_one_annot(self.path_to_data_file, self.imgs[idx])
        boxes = torch.as_tensor(box_list, dtype=torch.float32)
        num_objs = len(box_list)
        labels = torch.ones((num_objs,), dtype=torch.int64)
        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd
        if self.transforms is not None:
            img, target = self.transforms(img, target)
        return img, target

    def __len__(self):
        return len(self.imgs)

In [None]:
dataset = Dataset(root=data_folder, data_file=data_file)

In [None]:
def get_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

In [None]:
def get_transform(train):
    transforms = []
    transforms.append(T.ToTensor())
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

In [None]:
dataset = Dataset(
    root=data_folder, data_file=data_file, transforms=get_transform(train=True)
)
dataset_test = Dataset(
    root=data_folder, data_file=data_file, transforms=get_transform(train=False)
)

torch.manual_seed(1)
indices = torch.randperm(len(dataset)).tolist()
testing_num = int(len(dataset) * 0.2)
dataset = torch.utils.data.Subset(dataset, indices[:-testing_num])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-testing_num:])
data_loader = torch.utils.data.DataLoader(
    dataset, batch_size=2, shuffle=True, num_workers=4, collate_fn=utils.collate_fn
)
data_loader_test = torch.utils.data.DataLoader(
    dataset_test,
    batch_size=1,
    shuffle=False,
    num_workers=4,
    collate_fn=utils.collate_fn,
)
print(
    "We have: {} examples, {} are training and {} testing".format(
        len(indices), len(dataset), len(dataset_test)
    )
)

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model = get_model(num_classes)
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

In [None]:
num_epochs = 10
for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
    lr_scheduler.step()
    evaluate(model, data_loader_test, device=device)

In [None]:
torch.save(model.state_dict(), model_path)

In [None]:
loaded_model = get_model(num_classes=num_classes)
loaded_model.load_state_dict(torch.load(model_path))

In [None]:
idx = 1
img, _ = dataset_test[idx]
label_boxes = np.array(dataset_test[idx][1]["boxes"])
loaded_model.eval()
with torch.no_grad():
    prediction = loaded_model([img])
image = Image.fromarray(img.mul(255).permute(1, 2, 0).byte().numpy())
draw = ImageDraw.Draw(image)
for elem in range(len(label_boxes)):
    draw.rectangle(
        [
            (label_boxes[elem][0], label_boxes[elem][1]),
            (label_boxes[elem][2], label_boxes[elem][3]),
        ],
        outline="green",
        width=3,
    )
for element in range(len(prediction[0]["boxes"])):
    boxes = prediction[0]["boxes"][element].cpu().numpy()
    score = np.round(prediction[0]["scores"][element].cpu().numpy(), decimals=4)
    if score > 0.8:
        draw.rectangle(
            [(boxes[0], boxes[1]), (boxes[2], boxes[3])], outline="red", width=3
        )
        draw.text((boxes[0], boxes[1]), text=str(score))
image