In [None]:
from dataclasses import dataclass, field
from glob import glob
import os

import albumentations as A
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchvision.utils import make_grid, draw_bounding_boxes
import torchvision
from torchvision import transforms
from torchvision.io import read_image
import tqdm
#import wandb

from utils import Detect, MultiBoxLoss, od_collate_fn


In [None]:
@dataclass
class ModelParameters:
    """Class with all the model parameters"""
    batch_size: int = 16
    lr: float = 0.001
    scheduler_type: str = 'ReduceLROnPlateau'
    lr_scheduler_patience: int = 10
    epochs: int = 100
    classes: list = field(default_factory=lambda: ['face'])
    image_size: int = 128
    detection_threshold: float = 0.5
    blazeface_channels: int = 32
    focal_loss: bool = False
    model_path: str = 'weights/blazeface128.pt'
    #use_wandb: bool = False
    augmentation: dict = None

In [None]:
model_params = ModelParameters()

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, labels_path, image_size: int, augment: A.Compose = None):
        self.labels_path = labels_path
        self.labels = list(sorted(glob(f'{labels_path}/*')))
        self.labels = [x for x in self.labels if os.stat(x).st_size != 0]
        self.augment = augment
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
            transforms.Resize((image_size, image_size))
        ])
        self.image_size = image_size

    def __getitem__(self, idx):
        # load images and masks
        img_path = self.labels[idx].replace('labels', 'images')[:-3] + 'jpg'
        img = plt.imread(img_path)
        if len(img.shape) == 2 or img.shape[2] == 1:
            # Handle grayscale images
            img = np.stack((img,)*3, axis=-1)
        if img.shape[2] == 4:
            img = img[:, :, :3]
        rescale_output = self.resize_and_pad(img, self.image_size)
        img = rescale_output['image']
        annotations = pd.read_csv(self.labels[idx], header=None, sep=' ')
        labels = annotations.values[:, 0]
        yolo_bboxes = annotations.values[:, 1:]
        cx = yolo_bboxes[:, 0]
        cy = yolo_bboxes[:, 1]
        w = yolo_bboxes[:, 2]
        h = yolo_bboxes[:, 3]
        x1 = (cx - w / 2) * rescale_output['x_ratio'] + rescale_output['x_offset']
        x2 = (cx + w / 2) * rescale_output['x_ratio'] + rescale_output['x_offset']
        y1 = (cy - h / 2) * rescale_output['y_ratio'] + rescale_output['y_offset']
        y2 = (cy + h / 2) * rescale_output['y_ratio'] + rescale_output['y_offset']
        x1 = np.expand_dims(x1, 1)
        x2 = np.expand_dims(x2, 1)
        y1 = np.expand_dims(y1, 1)
        y2 = np.expand_dims(y2, 1)
        target = np.concatenate([x1, y1, x2, y2, labels.reshape(-1, 1)], axis=1).clip(0., 1.)
        if self.augment is not None:
            augmented = self.augment(image=img, bboxes=target)
            img = augmented['image']
            target = np.array(augmented['bboxes'])

        return self.transform(img.copy()), np.clip(target, 0, 1)

    def __len__(self):
        return len(self.labels)

    @staticmethod
    def resize_and_pad(img, target_size=128):
        if img.shape[0] > img.shape[1]:
            new_y = target_size
            new_x = int(target_size * img.shape[1] / img.shape[0])
        else:
            new_y = int(target_size * img.shape[0] / img.shape[1])
            new_x = target_size
        output_img = cv2.resize(img, (new_x, new_y))
        top = max(0, new_x - new_y) // 2
        bottom = target_size - new_y - top
        left = max(0, new_y - new_x) // 2
        right = target_size - new_x - left
        output_img = cv2.copyMakeBorder(
            output_img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(128, 128, 128)
        )
        # Compute labels values updates
        x_ratio = new_x / target_size
        y_ratio = new_y / target_size
        x_offset = left / target_size
        y_offset = top / target_size

        return {'image': output_img, 'x_ratio': x_ratio, 'x_offset': x_offset, 'y_ratio': y_ratio, 'y_offset': y_offset}



In [None]:
augment = A.Compose(
    [
        A.RandomBrightnessContrast(brightness_limit=0.2, always_apply=True),
        A.HorizontalFlip(p=0.5),
        A.RandomCropFromBorders(
            crop_left=0.05,
            crop_right=0.05,
            crop_top=0.05,
            crop_bottom=0.05,
            p=0.9,
        ),
        A.Affine(
            rotate=(-30, 30),
            scale=(0.8, 1.1),
            keep_ratio=True,
            translate_percent=(-0.05, 0.05),
            cval=(128, 128, 128),
            p=0.9,
        ),
    ],
    bbox_params=A.BboxParams(format='albumentations')
)
model_params.augmentation = augment.to_dict()

In [None]:
dataset_path = 'dataset/' 
train_dataset = MyDataset(
    dataset_path + 'labels/train', 
    image_size=model_params.image_size,
    augment=augment,
)
valid_dataset = MyDataset(
    dataset_path + 'labels/val', 
    image_size=model_params.image_size,
)

In [None]:
#from pytorch_tuto import utils
train_dataloader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=model_params.batch_size,
    shuffle=True,
    num_workers=4,
    collate_fn=od_collate_fn#utils.collate_fn
)

In [None]:
images, targets = next(iter(train_dataloader))  # get first element

In [None]:
from torchvision.utils import draw_bounding_boxes
idx = 7
image, target = images[idx], targets[idx]

classes = ['face', 'hand']
labels = [classes[int(label)] for label in target[:, -1]]
img_with_boxes = draw_bounding_boxes(((image*0.5 + 0.5)*255).to(torch.uint8),
                                     target[:, :-1] * model_params.image_size,
                                     labels)
plt.imshow(img_with_boxes.permute(1, 2, 0).numpy())

In [None]:
#from pytorch_tuto import utils
val_dataloader = torch.utils.data.DataLoader(
    valid_dataset,
    batch_size=model_params.batch_size,
    shuffle=True,
    num_workers=4,
    collate_fn=od_collate_fn#utils.collate_fn
)

In [None]:
dataloaders_dict = {"train": train_dataloader, "val": val_dataloader}

# test with ssd model.

In [None]:
from blazeface import BlazeFace
if model_params.image_size == 256:
    model = BlazeFace(back_model=True)
else:
    model = BlazeFace()
model.load_anchors('anchors.npy')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = MultiBoxLoss(jaccard_thresh=0.5, neg_pos=3, device=device, focal=model_params.focal_loss, dbox_list=model.anchors)
optimizer = optim.Adam(model.parameters(), lr=model_params.lr)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=model_params.lr_scheduler_patience)

In [None]:
output = model(torch.randn(1, 3, model_params.image_size, model_params.image_size))
output.shape

In [None]:
def train_model(
        net,
        dataloaders_dict,
        criterion,
        optimizer,
        scheduler,
        model_params,
        device,
):
    net = net.to(device)

    for epoch in range(model_params.epochs):
        curr_lr = scheduler.optimizer.param_groups[0]['lr']
        # Train
        running_loss = 0.
        running_loc_loss = 0.
        running_class_loss = 0.
        for images, targets in tqdm.tqdm(dataloaders_dict['train']):
            images = images.to(device)
            targets = [ann.to(device) for ann in targets]
            optimizer.zero_grad()
            outputs = net(images)
            loss_l, loss_c = criterion(outputs, targets)
            loss = loss_l + loss_c
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            running_loc_loss += loss_l.item()
            running_class_loss += loss_c.item()

        # Eval
        net.eval()
        val_loss = 0.
        val_loc_loss = 0.
        val_class_loss = 0.
        with torch.no_grad():
            for images, targets in dataloaders_dict['val']:
                images = images.to(device)
                targets = [ann.to(device) for ann in targets]
                outputs = net(images)
                loss_l, loss_c = criterion(outputs, targets)
                loss = loss_l + loss_c
                val_loss += loss.item()
                val_loc_loss += loss_l.item()
                val_class_loss += loss_c.item()

        train_loss = running_loss / len(dataloaders_dict['train'])
        train_loc_loss = running_loc_loss / len(dataloaders_dict['train'])
        train_class_loss = running_class_loss / len(dataloaders_dict['train'])
        val_loss = val_loss / len(dataloaders_dict['val'])
        val_loc_loss = val_loc_loss / len(dataloaders_dict['val'])
        val_class_loss = val_class_loss / len(dataloaders_dict['val'])
        print(f'[{epoch + 1}] train loss: {train_loss:.3f} | val loss: {val_loss:.3f}')
        print(f'train loc loss: {train_loc_loss:.3f} | train class loss: {train_class_loss:.3f}')
        scheduler.step(val_loss)
        # Save model
        torch.save(net.state_dict(), model_params.model_path)



# start training here

In [None]:
model.anchors.min(axis=0), model.anchors.max(axis=0), model.anchors.shape

In [None]:
train_model(
    model,
    dataloaders_dict,
    criterion,
    optimizer,
    scheduler,
    model_params,
    device=device,
)

### Convert model

In [None]:
import torch
import tensorflow as tf
import onnx
import onnx_tf

In [None]:
model.load_state_dict(torch.load('original_blazeface_128.pt'))
model.eval()
model.to('cpu')

In [None]:
input_shape = (1, 3, model_params.image_size, model_params.image_size)
output = model(torch.randn(input_shape))
output

In [None]:
model_name = 'original_blazeface_128'

In [None]:
torch.onnx.export(model, torch.randn(input_shape), f'{model_name}.onnx', opset_version=11)

In [None]:
onnx_model = onnx.load( f'{model_name}.onnx')

In [None]:
tf_model = onnx_tf.backend.prepare(onnx_model)

In [None]:
# Fix for error from https://stackoverflow.com/questions/76839366/tf-rep-export-graphtf-model-path-keyerror-input-1
from onnx import helper
# Define a mapping from old names to new names
name_map = {"x.1": "x_1"}

# Initialize a list to hold the new inputs
new_inputs = []

# Iterate over the inputs and change their names if needed
for inp in onnx_model.graph.input:
    if inp.name in name_map:
        # Create a new ValueInfoProto with the new name
        new_inp = helper.make_tensor_value_info(name_map[inp.name],
                                                inp.type.tensor_type.elem_type,
                                                [dim.dim_value for dim in inp.type.tensor_type.shape.dim])
        new_inputs.append(new_inp)
    else:
        new_inputs.append(inp)

# Clear the old inputs and add the new ones
onnx_model.graph.ClearField("input")
onnx_model.graph.input.extend(new_inputs)

# Go through all nodes in the model and replace the old input name with the new one
for node in onnx_model.graph.node:
    for i, input_name in enumerate(node.input):
        if input_name in name_map:
            node.input[i] = name_map[input_name]
onnx.save(onnx_model, f'{model_name}.onnx')

In [None]:
tf_model.export_graph(f'{model_name}.tf')

In [None]:
### 3/ TF to TFLite
model_converter = tf.lite.TFLiteConverter.from_saved_model(f'{model_name}.tf')
model_converter.target_spec.supported_ops = [
  tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
]
#model_converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = model_converter.convert()
open(f'{model_name}.tflite', 'wb').write(tflite_model)

That's all :)