# Load data

In [None]:
!pip install datasets

In [None]:
from datasets import load_dataset

dataset = load_dataset("mllab/alfafood")

# Preprocessing


In [None]:
import os
import cv2
import random
import pathlib
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from typing import Tuple, Dict, List

import torch
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

from sklearn.model_selection import train_test_split

import albumentations
from albumentations.pytorch.transforms import ToTensorV2

from PIL import Image, ImageFile, ImageFont, ImageDraw, ImageEnhance
ImageFile.LOAD_TRUNCATED_IMAGES = True

import copy
from time import time

import warnings
warnings.filterwarnings('ignore')

FUSED_SHAPE = (640, 480)
ORIGINAL_SHAPE = (4000, 3000)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def fused_bbox(bboxes, original_shape, fused_shape):

    x_fused = fused_shape[0] / original_shape[0]
    y_fused = fused_shape[1] / original_shape[1]

    for i in range(len(bboxes)):
        bboxes[i][0] = bboxes[i][0] * x_fused
        bboxes[i][1] = bboxes[i][1] * y_fused
        bboxes[i][2] = bboxes[i][2] * x_fused
        bboxes[i][3] = bboxes[i][3] * y_fused

    return bboxes

In [None]:
images = [dataset['train'][i]['image'].resize(FUSED_SHAPE) for i in range(len(dataset['train']))]
objects = [dataset['train'][i]['objects'] for i in range(len(dataset['train']))]

for i in range(len(objects)):
    if objects[i]['bbox'] != []:
        objects[i]['bbox'] = fused_bbox(objects[i]['bbox'], ORIGINAL_SHAPE, FUSED_SHAPE)

# Prepare Dataset

In [None]:
class AlfaFoodDataset(Dataset):
    def __init__(self, images: List, objects: List[Dict[str, List]], transform: torchvision.transforms=None) -> None:
        self.images = images
        self.annotations = copy.deepcopy(objects)
        self.transform = transform
        self.num_classes = len(set(i for ob in objects for i in ob['categories']))
        self.list_transforms = np.zeros(shape=(len(self.images),))

        for i in range(len(self.annotations)):
            self.bboxes = self.annotations[i]['bbox']
            for bbox in self.bboxes:
                bbox[2] += bbox[0]
                bbox[3] += bbox[1]


    def __getitem__(self, index: int) -> Tuple[torch.Tensor, Tuple[Tuple[int]], Tuple[int]]:
        "Returns one sample of data: image, labels, bboxes"

        image = torch.Tensor(np.array(self.images[index].convert('RGB')))#.transpose(1, 2, 0)
        bboxes = self.annotations[index]['bbox']
        labels = self.annotations[index]['categories']

        if self.transform and self.list_transforms[index] == 0:
            transformed = self.transform(image = image, bboxes = bboxes, labels = labels)
            image = torch.Tensor(np.array(transformed['image']).transpose(1, 2, 0))
            bboxes = transformed['bboxes']
            labels = transformed['labels']

            self.list_transforms[index] = 1

        target = dict()
        target['boxes'] = torch.as_tensor(bboxes, dtype=torch.float)
        target['labels'] = torch.as_tensor(labels, dtype=torch.int64)

        if target['boxes'].shape == torch.Size([0]):
            target['boxes'] = torch.Tensor([0, 0, 1e-10, 1e-10]).unsqueeze(dim=0)
        if target['labels'].shape == torch.Size([0]):
            target['labels']= torch.zeros(size=(1, ), dtype=torch.int64)
        return image, target


    def __len__(self) -> int:
        "Returns the total number of samples."
        return len(self.images)

# Visualization bounding boxes with labels

In [None]:
data = AlfaFoodDataset(images, objects)

img, target = data[8]

In [None]:

def show_image_with_objects(image, bboxes, labels=None):

    image = Image.fromarray(image.numpy().astype(np.uint8))

    color = list((random.randint(40, 240), random.randint(40, 255), random.randint(60, 255)) for i in range(100))

    random.shuffle(color)

    # if bboxes.shape
    for i in range(len(bboxes)):
        draw = ImageDraw.Draw(image)
        draw.rectangle(bboxes[i].numpy(), outline = color[i], width=2)

        bbox = draw.textbbox((bboxes[i][0], bboxes[i][1]), f"{labels[i]}")
        draw.rectangle((bbox[0]-2, bbox[1]-2, bbox[2]+2, bbox[3]+2), fill=(30, 20, 20))
        draw.text((bboxes[i][0], bboxes[i][1]), f"{labels[i]}", color[i])
    return image

show_image_with_objects(img, target['boxes'], target['labels'])

# Prepare the transformation for augmentations

In [None]:
train_transform = albumentations.Compose(
    [
        albumentations.Resize(height=FUSED_SHAPE[1], width=FUSED_SHAPE[0]),
        albumentations.HorizontalFlip(p=0.8),
        albumentations.Rotate(p=0.5),
        albumentations.pytorch.transforms.ToTensorV2()
    ],
    bbox_params=albumentations.BboxParams(format='pascal_voc', label_fields=['labels'])
)

test_transform = albumentations.Compose(
    [
        albumentations.Resize(height=FUSED_SHAPE[1], width=FUSED_SHAPE[0]),
        albumentations.pytorch.transforms.ToTensorV2()
    ],
    bbox_params=albumentations.BboxParams(format='pascal_voc', label_fields=['labels'])
)

# Separation of data into training, validation and test samples

In [None]:
train_data, test_data = train_test_split(list(zip(images, objects)), test_size=0.3, shuffle=True, random_state=42)
val_data, data_data = train_test_split(test_data, test_size=0.3, shuffle=True, random_state=42)

train_data = AlfaFoodDataset(images=list(items[0] for items in train_data),
                             objects=list(items[1] for items in train_data),
                             transform=train_transform )

test_data = AlfaFoodDataset(images=list(items[0] for items in test_data),
                            objects=list(items[1] for items in test_data),
                            transform=test_transform )

val_data = AlfaFoodDataset(images=list(items[0] for items in val_data),
                           objects=list(items[1] for items in val_data),
                           transform=test_transform )

# Create Data loaders

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

train_dataloader = torch.utils.data.DataLoader(dataset=train_data,
                                               batch_size=4,
                                               shuffle=True,
                                               num_workers=os.cpu_count(),
                                               collate_fn=collate_fn)
val_dataloader = torch.utils.data.DataLoader(dataset=val_data,
                                             batch_size=4,
                                             shuffle=False,
                                             num_workers=os.cpu_count(),
                                             collate_fn=collate_fn)
test_dataloader = torch.utils.data.DataLoader(dataset=test_data,
                                              batch_size=4,
                                              shuffle=False,
                                              num_workers=os.cpu_count(),
                                              collate_fn=collate_fn)

# Create model

In [None]:
class FasterRCNN_ResNet50(torch.nn.Module):
  def __init__(self, num_classes: int=127) -> None:
    super().__init__()

    self.model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True, pretrained_backbone=True)
    num_classes = num_classes + 1
    in_features = self.model.roi_heads.box_predictor.cls_score.in_features
    self.model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    for child in list(model.children())[:-1]:
      for param in child.parameters():
          param.requires_grad = False

  def forward(self, X: torch.Tensor) -> torch.Tensor:
    return self.model(X)

  # To calculate the loss function
  def forward(self, images: List[torch.Tensor], annotation: List[Dict[str, torch.Tensor]]) -> Dict[str, int]:
    return self.model(images, annotation)

model = FasterRCNN_ResNet50(num_classes = data.num_classes)

# Declare functions for training, validation and testing

In [None]:
def train_epoch(device: torch.device,
                model: torch.nn.Module,
                optimizer: torch.optim.Optimizer,
                dataloader: torch.utils.data.DataLoader,
                criterion: torch.nn.Module = None):

  model.to(device)
  model.train()
  train_loss = 0
  torch.cuda.empty_cache()

  time_epoch_start = time()

  for batch, (images, annotations) in enumerate(dataloader):

    images = [image.to(device) for image in images]
    annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]

    if criterion == None:
      loss_dict = model(images, annotations)
      loss = sum(loss for loss in loss_dict.values())
    else:
      pass
    train_loss += loss.item()

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # print(f"LOSSES: {loss}")

  train_loss = train_loss / len(dataloader)
  # train_acc = train_acc / len(dataloader)
  time_epoch_end = time() - time_epoch_start

  return train_loss, time_epoch_end

def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          val_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          epochs: int,
          device: torch.device):
  for epoch in tqdm(range(epochs)):
    train_loss, time_epoch = train_epoch(model=model,
                                        dataloader=train_dataloader,
                                        optimizer=optimizer,
                                        device=device)
    print(f"EPOCH: {epoch+1}, LOSS: {train_loss}")

num_epochs = 1
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(params, lr=1e-3, weight_decay=1e-5)

train(model, train_dataloader, val_dataloader, optimizer, num_epochs, device)