In [1]:
# !pip install -r requirements.txt
# !pip list

import numpy as np
import local_utils
from torchvision import datasets
from torch.utils.data import DataLoader, Sampler
from torchvision.transforms import ToTensor
from torch import nn

from distutils.util import strtobool
from utils.utils import *
from utils.datasets import *
from utils.parse_config import *

from copy import deepcopy
import os
from re import compile as compile_re
from glob import glob as glob_glob

from collections import namedtuple
import tqdm
from typing import Dict

from models import *

import json

### 1. The eval data
Define data and dataloaders

In [None]:
test_path = "data/PST900_RGBT_Dataset/test"
test_loader = torch.utils.data.DataLoader(RGBTFolderP(test_path), batch_size=1, shuffle=False)

Save eval data in .npz format for later evaluation.

In [None]:
quantization_data = []
quantization_labels = []

for _, img, target in test_loader:
    quantization_data.append(img)
    quantization_labels.append(target)

batch_shape = np.shape(quantization_data[0].numpy())
print(batch_shape)

test_X = torch.cat(quantization_data).numpy()
test_Y = torch.cat(quantization_labels).numpy()

np.savez('eval_PST900.npz', data=test_X, targets=test_Y)

print('Done')

### 2. Init the float model
Create the model and load the weights.

In [None]:
device = torch.device("cpu")
model_config_path = "config/yolov3.cfg"
weights_path = "weights/float_133.weightd"

net = Darknet(model_config_path)
net.load_weights_dict(weights_path)
net.to(device)

net.eval()

print(device)
with open("log_net.txt", "w") as f:
    f.write(str(net))


In [5]:
class EvalLoader:
    def __init__(self, 
                 batch_size: int = 1, 
                 npz_path: str = 'eval_PST900.npz') -> None:
        data = np.load(npz_path)
        self.data = data['data'].astype(np.float32)
        self.targets = data['targets']
        self.batch_size = batch_size
    
    def __getitem__(self, i):
        if i >= len(self):
            raise StopIteration

        beg = min(i * self.batch_size, self.data.shape[0])
        end = min(beg + self.batch_size, self.data.shape[0])

        return self.data[beg:end, ...], self.targets[beg:end]
    
    def __len__(self):
        return self.data.shape[0] // self.batch_size

In [None]:
loader = EvalLoader()
for batch_i, (imgs, targets) in enumerate(loader):
    print("Batch:", batch_i)
    print("Img:",imgs.shape)
    print("Target:",targets.shape)
    img = imgs
    target = targets
    break

### 3. Eval the float model
Ensure the model was loaded properly.

In [13]:
@torch.no_grad()
def evaluation(model: nn.Module,
               data_loader: torch.utils.data.DataLoader,
               conf_thres: int = 0.5,
               nms_thres: int = 0.4
               ):

    num_classes = 4
    img_size = 416
    iou_thres = 0.5
    
    # YOLO layers needed for evaluation, because the net was split before and does not contain the YOLO layer
    yoloL = YOLOLayer([(116, 90), (156, 198), (373, 326)], num_classes, img_size)
    yoloM = YOLOLayer([(30, 61), (62, 45), (59, 119)], num_classes, img_size)
    yoloS = YOLOLayer([(10, 13), (16, 30), (33, 23)], num_classes, img_size)    

    print("Compute mAP...")

    all_detections = []
    all_annotations = []

    for _, (imgs, targets) in enumerate(tqdm.tqdm(data_loader, desc="Detecting objects")):

        imgs = torch.from_numpy(imgs)
        targets = torch.from_numpy(targets)


        with torch.no_grad():
            output = model(imgs)
            outputs = []
            y1 = yoloL(output[0])
            y2 = yoloM(output[1])
            y3 = yoloS(output[2])
            outputs.append(y1)
            outputs.append(y2)
            outputs.append(y3)
            outputs = torch.cat(outputs, 1)
            outputs = non_max_suppression(outputs, 4, conf_thres=conf_thres, nms_thres=nms_thres)

        for output, annotations in zip(outputs, targets):
            all_detections.append([np.array([]) for _ in range(num_classes)])
            if output is not None:
                # Get predicted boxes, confidence scores and labels
                pred_boxes = output[:, :5].cpu().numpy()
                scores = output[:, 4].cpu().numpy()
                pred_labels = output[:, -1].cpu().numpy()

                # Order by confidence
                sort_i = np.argsort(scores)
                pred_labels = pred_labels[sort_i]
                pred_boxes = pred_boxes[sort_i]

                for label in range(num_classes):
                    all_detections[-1][label] = pred_boxes[pred_labels == label]

            all_annotations.append([np.array([]) for _ in range(num_classes)])
            if any(annotations[:, -1] > 0):

                annotation_labels = annotations[annotations[:, -1] > 0, 0].cpu().numpy()
                _annotation_boxes = annotations[annotations[:, -1] > 0, 1:].cpu()

                # Reformat to x1, y1, x2, y2 and rescale to image dimensions
                annotation_boxes = np.empty_like(_annotation_boxes)
                annotation_boxes[:, 0] = _annotation_boxes[:, 0] - _annotation_boxes[:, 2] / 2
                annotation_boxes[:, 1] = _annotation_boxes[:, 1] - _annotation_boxes[:, 3] / 2
                annotation_boxes[:, 2] = _annotation_boxes[:, 0] + _annotation_boxes[:, 2] / 2
                annotation_boxes[:, 3] = _annotation_boxes[:, 1] + _annotation_boxes[:, 3] / 2
                annotation_boxes *= img_size

                for label in range(num_classes):
                    all_annotations[-1][label] = annotation_boxes[annotation_labels == label, :]

    average_precisions = {}
    for label in range(num_classes):
        true_positives = []
        scores = []
        num_annotations = 0

        for i in tqdm.tqdm(range(len(all_annotations)), desc=f"Computing AP for class '{label}'"):
            detections = all_detections[i][label]
            annotations = all_annotations[i][label]

            num_annotations += annotations.shape[0]
            detected_annotations = []

            for *bbox, score in detections:
                scores.append(score)

                if annotations.shape[0] == 0:
                    true_positives.append(0)
                    continue

                overlaps = bbox_iou_numpy(np.expand_dims(bbox, axis=0), annotations)
                assigned_annotation = np.argmax(overlaps, axis=1)
                max_overlap = overlaps[0, assigned_annotation]

                if max_overlap >= iou_thres and assigned_annotation not in detected_annotations:
                    true_positives.append(1)
                    detected_annotations.append(assigned_annotation)
                else:
                    true_positives.append(0)

        # no annotations -> AP for this class is 0
        if num_annotations == 0:
            average_precisions[label] = 0
            continue

        true_positives = np.array(true_positives)
        false_positives = np.ones_like(true_positives) - true_positives
        # sort by score
        indices = np.argsort(-np.array(scores))
        false_positives = false_positives[indices]
        true_positives = true_positives[indices]

        # compute false positives and true positives
        false_positives = np.cumsum(false_positives)
        true_positives = np.cumsum(true_positives)

        # compute recall and precision
        recall = true_positives / num_annotations
        precision = true_positives / np.maximum(true_positives + false_positives, np.finfo(np.float64).eps)

        # compute average precision
        average_precision = compute_ap(recall, precision)
        average_precisions[label] = average_precision

    logger = {}
    print("Average Precisions:")
    for c, ap in average_precisions.items():
        print(f"+ Class '{c}' - AP: {ap}")
        logger[c] = ap

    mAP = np.mean(list(average_precisions.values()))
    logger["mAP"] = mAP
    print(f"mAP: {mAP}")

    return logger, mAP

In [None]:
evaluation(net, loader, 0.8, 0.5)

In [None]:
x = np.arange(0.1, 1.0, 0.1)
y = np.arange(0.1, 1.0, 0.1)

X, Y = np.meshgrid(x, y)

data = np.zeros((X.shape[0], Y.shape[1]))

for conf, nms in zip(X.ravel(), Y.ravel()):
    i = int(conf * 10 - 1)
    j = int(nms * 10 - 1)
    print("Conf:", conf, "NMS:", nms)
    _, mAP = evaluation(net, loader, conf, nms)
    data[i, j] = mAP

with open('mAP_tests.json', 'w') as f:
    json.dump(data.tolist(), f)

### 4. Quantize PTQ (Post Training Quantization)

Stage 1: Calibration of Vitis AI Quantizer

Stage 2: Evaluation

In [16]:
def quantize(float_model:torch.nn.Module, 
             input_shape:tuple,
             quant_dir:str, 
             quant_mode:str, 
             device:torch.device,
             dataloader,
             conf = 0.1,
             nms = 0.1):
    """
    :param float_model: float model with loaded weights
    :param input_shape: shape of input(CH,W,H)
    :param quant_dir: path to directory with quantized model components
    :param quant_mode: quant_mode in ['calib', 'test'] 
    :param data_loader: data_loader - for 'calib' must be batch_size == 1
    """
    tm = local_utils.TimeMeasurement("Quantization", len(dataloader))
    with tm:
        # available in docker or after packaging 
        # vitis-AI-tools/..../pytorch../pytorch_nndct
        # and installing the package
        print("before import")
        from pytorch_nndct.apis import torch_quantizer, dump_xmodel
        # model to device
        print("Before device")
        model = float_model.to(device)

        # Force to merge BN with CONV for better quantization accuracy
        optimize = 1

        rand_in = torch.randn(input_shape)
        print("get qunatizer start")
        try:
            quantizer = torch_quantizer(
                quant_mode, model, rand_in, output_dir=quant_dir, device=device)
        except Exception as e:
            print("exception:")
            print(e)
            return
        print("get qunatizer end")

        print("get quantized model start")
        quantized_model = quantizer.quant_model
        print("get quantized model end")

        # evaluate
#         output = quantized_model(rand_in) # IMPORTANT! if the data is not passed through the model, the model will not be quantized
        print("testing st")
        evaluation(quantized_model, dataloader, conf, nms)
        print("testing end")

        # export config
        if quant_mode == 'calib':
            print("export config")
            quantizer.export_quant_config()
            print("export config end")
        # export model
        if quant_mode == 'test':
            print("export xmodel")
            quantizer.export_xmodel(deploy_check=False, output_dir=quant_dir)
            print("export xmodel end")
    print(tm)

In [None]:
quantize(float_model=net, 
         input_shape=batch_shape,
         quant_dir='quant_dir',
         quant_mode='calib',
         device=device,
         dataloader=loader
         )

In [None]:
quantize(float_model=net, 
         input_shape=batch_shape,
         quant_dir='quant_dir', # directory for quantizer results
         quant_mode='test',
         device=device,
         dataloader=loader
         )

Model build

In [None]:
# compile model
!vai_c_xir --xmodel 'quant_dir/Darknet_int.xmodel' --arch arch.json --net_name Darknet_qu --output_dir build