## Test

In [None]:
from pathlib import Path

data_dir = Path("/home/jasseur/Downloads/videos")
!ls $data_dir

In [None]:
import albumentations as A

IMAGE_SIZE = (1088, 608)

val_transform = A.Compose(
    [
        A.Resize(height=IMAGE_SIZE[1], width=IMAGE_SIZE[0], p=1.0),
    ],
    p=1.0,
    bbox_params=A.BboxParams(
        format="pascal_voc",
        min_area=0,
        min_visibility=0,
        label_fields=["person_ids"]
    )
)

In [None]:
from src.test_dataset import TestDataset
%load_ext autoreload
%autoreload 2

test_dataset = TestDataset(data_dir, val_transform)
test_dataset.data_groups

In [None]:
def get_color(idx):
    idx = float(idx * 3)
    color = ((37 * idx) % 255, (17 * idx) % 255, (29 * idx) % 255)
    return color

In [None]:
def draw_dashed_rectangle(image, x1y1, x2y2, color, thickness=2, dash=8):
    height, width, _ = image.shape
    x1, y1 = x1y1
    x2, y2 = x2y2
    x1 = max(0, min(x1, width - 1))
    x2 = max(x1, min(x2, width - 1))
    y1 = max(0, min(y1, height - 1))
    y2 = max(y1, min(y2, height - 1))
    xs = [i for i in range(x1, x2 + 1) if (i % (2 * dash)) < dash]
    ys = [i for i in range(y1, y2 + 1) if (i % (2 * dash)) < dash]
    image[ys, x1 - thickness: x1 + thickness, :] = color
    image[ys, x2 - thickness: x2 + thickness, :] = color
    image[y1 - thickness: y1 + thickness, xs, :] = color
    image[y2 - thickness: y2 + thickness, xs, :] = color
    return image

In [None]:
import sys
sys.path.append("../..")
from person_detection.centernet.src.pose_dla_dcn import get_pose_net as get_dla_dcn
from src.trainer import Trainer
%load_ext autoreload
%autoreload 2

num_layers = 34
heads = {"hm": 1, "wh": 4, "reg": 2, "id": 128}
head_conv = 256
net = get_dla_dcn(num_layers, heads, head_conv)
# model_path = Path("/home/jasseur/Downloads/crowdhuman_dla34.pth")
model_path = Path("/home/jasseur/Downloads/fairmot_dla34.pth")
trainer = Trainer(
    net, image_size=IMAGE_SIZE, device="cuda", checkpoint_dir="checkpoint",
    nID=1000, model_path=model_path
)

In [None]:
GENDERS = ["M", "F"]
AGES = ["1-2", "3-9", "10-20", "21-25", "26-27", "28-31", "32-36", "37-45", "46-54", "55-65", "66-116"]

In [None]:
from tqdm.notebook import tqdm
import numpy as np
import cv2
from src.track import Track
%load_ext autoreload
%autoreload 2

def write_video(video, output_path):
    desired_fps = test_dataset.desired_fps
    video_writer = cv2.VideoWriter(
        str(output_path), cv2.VideoWriter_fourcc(*"MP4V"), # "MJPG"), 
        desired_fps, IMAGE_SIZE
    )
    try:
        for frame, time, state_tracks in trainer.predict_video(video):
            frame = (255 * frame).astype(np.uint8)
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            for track in state_tracks:
                (x1, y1, x2, y2) = map(int, track.bbox)
                color = get_color(track.id)
                if track.state == Track.TrackState.TRACKED:
                    cv2.rectangle(
                        frame, (x1, y1), (x2, y2), color, 2, lineType=cv2.LINE_AA
                    )
                else:
                    cv2.rectangle(
                        frame, (x1, y1), (x2, y2), color, 1, lineType=cv2.LINE_AA
                    )
                # (sx1, sy1, sx2, sy2) = map(int, track.get_state_bbox())
                # draw_dashed_rectangle(
                #     frame, (sx1, sy1), (sx2, sy2), color, 2
                # )
                cv2.putText(
                    frame, f"{track.id}_{GENDERS[track.getGender()]}_{AGES[track.getAge()]}", (x1, y1 + 30), cv2.FONT_HERSHEY_PLAIN,
                    1, (0, 0, 255), 2
                )
            cv2.putText(
                frame, "%.2f" % time, (10, 10), cv2.FONT_HERSHEY_PLAIN,
                1, (0, 0, 255), 2
            )
            video_writer.write(frame)
    finally:
        video_writer.release()

video_name, video = test_dataset[0]
output_path = Path("test.mp4")
write_video(video, output_path)

## ONNX export

In [None]:
from torch import nn
import torch

class MyModel(nn.Module):

    def __init__(self, model, pre_detector):
        super().__init__()
        self.model = model
        self.pre_detector = pre_detector

    @torch.no_grad()
    def forward(self, image):
        preds = self.model(image)
        preds_hm, preds_wh, preds_reg, preds_id = (
            preds[0]["hm"], preds[0]["wh"], preds[0]["reg"], preds[0]["id"]
        )
        preds_hm = self.pre_detector(preds_hm)
        return preds_hm, preds_wh, preds_reg, preds_id

deform_conv2d
- input: batch_size x in_channels x h x w
- weight: out_channels x in_channels x kernel_size x kernel_size
- offset: batch_size x (2 * kernel_size * kernel_size) x h x w
- mask: batch_size x (kernel_size * kernel_size) x h x w
- bias: out_channels

- output = batch_size x out_channels x h_out x w_out

https://github.com/onnx/tutorials/blob/master/PyTorchCustomOperator/README.md

In [None]:
from torch.onnx.symbolic_helper import parse_args

@parse_args('v', 'v', 'v', 'v', 'v', 'i', 'i', 'i', 'i', 'i', 'i', 'i', 'i', 'i')
def symbolic_deform_conv2d(
    g, input, weight, offset, mask, bias,
    stride_h, stride_w, pad_h, pad_w, dil_h, dil_w,
    n_weight_grps, n_offset_grps, use_mask
):
    return g.op(
        "custom_domain::deform_conv2d", input, weight, offset, mask, bias,
        stride_h_i=stride_h, stride_w_i=stride_w, pad_h_i=pad_h, pad_w_i=pad_w,
        dil_h_i=dil_h, dil_w_i=dil_w, out_channels_i=bias.type().sizes()[0]
    )

In [None]:
from torch.onnx import register_custom_op_symbolic
register_custom_op_symbolic("torchvision::deform_conv2d", symbolic_deform_conv2d, 11)

In [None]:
import numpy as np
import torch



name = "fairmot_dla34"
output_names = ["hm", "wh", "reg", "id"]
ONNX_FILE = f"{name}.onnx"

my_model = MyModel(trainer.net, trainer.pre_detector)
my_model.cuda()
my_model.eval()
x = torch.rand(1, 3, 416, 416, device="cuda")
np.save(f"{name}_x.npy", x.cpu().numpy())
torch.onnx.export(
    my_model, x, ONNX_FILE, verbose=False, opset_version=11,
    custom_opsets={"custom_domain": 2},
    output_names=output_names
)
with torch.no_grad():
    y = my_model(x)
for i, output_name in enumerate(output_names):
    np.save(f"{name}_y_{output_name}.npy", y[i].cpu().numpy())

In [None]:
!cp $name_*.npy ../../../../iot/nvidia_jetson/tensorrt/test/onnx

In [None]:
!cp $name.onnx ../../../../iot/nvidia_jetson/tensorrt/onnx/