In [1]:
model_hw = 416
frame_w = 1280
frame_h = 720

In [2]:
import sys
sys.path.append('/home/xilinx/')

import time
import math
import numpy as np
import tflite_runtime.interpreter as tflite
import cv2
import matplotlib.pyplot as plt
import pynq

from pynq import Overlay
from pynq.lib.video import *

from tcu_pynq.driver import Driver
from tcu_pynq.util import div_ceil
from tcu_pynq.architecture import ultra96

In [3]:
overlay = Overlay('/home/xilinx/tensil_ultra96v2.bit')
tcu = Driver(ultra96, overlay.axi_dma_0)

In [4]:
cap = cv2.VideoCapture(0)

cap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_w);
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_h);

In [5]:
displayport = DisplayPort()
displayport.configure(VideoMode(frame_w, frame_h, 24), PIXEL_RGB)

In [6]:
tcu.load_model('/home/xilinx/yolov4_tiny_{0}_onnx_ultra96v2.tmodel'.format(model_hw))

In [7]:
interpreter = tflite.Interpreter(model_path='/home/xilinx/yolov4_tiny_{0}_post.tflite'.format(model_hw))
interpreter.allocate_tensors()

In [8]:
with open('/home/xilinx/coco-labels-2014_2017.txt') as f:
    labels_coco = f.read().split('\n')
    
def set_tensor(driver, interpreter, hw_size, data):
    input_details = interpreter.get_input_details()
    input_idxs = [i for i in range(len(input_details))
                  if input_details[i]['shape'][1] == hw_size and input_details[i]['shape'][2] == hw_size]
    inp = input_details[input_idxs[0]]
    data = data.astype(inp['dtype'])
    inner_dim = inp['shape'][-1]
    inner_size = div_ceil(inner_dim, driver.arch.array_size) * driver.arch.array_size
    if inner_size != inner_dim:
        data = data.reshape((-1, inner_size))[:, :inner_dim]
    data = data.reshape(inp['shape'])
    interpreter.set_tensor(inp['index'], data)
    
def filter_and_reshape(boxes, scores, score_threshold=0.4):
    scores_max = np.max(scores, axis=-1)
    mask = scores_max > score_threshold
    
    filtered_boxes = boxes[mask]
    filtered_scores = scores[mask]
    
    filtered_boxes = np.reshape(filtered_boxes, [scores.shape[0], -1, filtered_boxes.shape[-1]])    
    filtered_scores = np.reshape(filtered_scores, [scores.shape[0], -1, filtered_scores.shape[-1]])

    return filtered_boxes, filtered_scores


def non_maximum_suppression(boxes, iou_threshold=0.4):
    if len(boxes) == 0:
        return boxes
    
    area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
    ll_x = np.maximum.outer(boxes[:, 0], boxes[:, 0])
    ll_y = np.maximum.outer(boxes[:, 1], boxes[:, 1])
    ur_x = np.minimum.outer(boxes[:, 2], boxes[:, 2])
    ur_y = np.minimum.outer(boxes[:, 3], boxes[:, 3])
    intersection_x = np.maximum(0, ur_x - ll_x)
    intersection_y = np.maximum(0, ur_y - ll_y)
    intersection = intersection_x * intersection_y
    
    iou = intersection / area - np.identity(area.shape[-1])
    p = iou >= iou_threshold
    p = p & p.T
    n =  p.shape[-1]
    
    no_needs_merge = set()
    for i in range(n):
        if not p[i].any():
            no_needs_merge.add(i)
    
    needs_merge = set()
    for i in range(n):
        for j in range(n):
            if p[i, j]:
                needs_merge.add(tuple(sorted((i, j))))

    def merge(needs_merge):
        result = set()
        discarded = set()
        for indices in needs_merge:
            idx = indices[0]
            if idx not in discarded:
                result.add(indices[0])
            discarded.add(indices[1])
            if indices[1] in result:
                result.remove(indices[1])
        return result

    return sorted(list(no_needs_merge) + list(merge(needs_merge)))

In [9]:
output = cv2.VideoWriter('/home/xilinx/recording.mp4',cv2.VideoWriter_fourcc(*"MJPG"),1,(frame_w,frame_h))

In [10]:
for _ in range(60):
    start = time.time()
    
    cap_frame = displayport.newframe()
    cap.read(cap_frame)
    
    crop_h = int(max(0, (frame_h - frame_w) / 2))
    crop_w = int(max(0, (frame_w - frame_h) / 2))
    ratio_h = (frame_h - crop_h * 2)/model_hw
    ratio_w = (frame_w - crop_w * 2)/model_hw

    x_frame = cap_frame    
    x_frame=x_frame[crop_h:frame_h - crop_h, crop_w:frame_w - crop_w]
    x_frame=cv2.resize(x_frame, (model_hw, model_hw), interpolation=cv2.INTER_LINEAR)
    x_frame=cv2.cvtColor(x_frame, cv2.COLOR_BGR2RGB)    
    x_frame = x_frame.astype('float32') / 255
    x_frame = np.pad(x_frame, [(0, 0), (0, 0), (0, tcu.arch.array_size - 3)], 'constant', constant_values=0)
    
    inputs = {'x:0': x_frame}    
    outputs = tcu.run(inputs)
    
    set_tensor(tcu, interpreter, model_hw / 32, np.array(outputs['model/conv2d_17/BiasAdd:0']))
    set_tensor(tcu, interpreter, model_hw / 16, np.array(outputs['model/conv2d_20/BiasAdd:0']))

    interpreter.invoke()

    output_details = interpreter.get_output_details()
    scores, boxes_xywh = [interpreter.get_tensor(output_details[i]['index']) for i in range(len(output_details))]

    boxes_xywh, scores = filter_and_reshape(boxes_xywh, scores)
    
    boxes_xy, boxes_wh = np.split(boxes_xywh, (2,), axis=-1)
    boxes_x0y0x1y1 = np.concatenate([boxes_xy - boxes_wh/2, boxes_xy + boxes_wh/2], axis=-1)
    
    box_indices = non_maximum_suppression(boxes_x0y0x1y1[0])

    latency = (time.time() - start)
    fps = 1/latency
    
    for i in box_indices:
        category_idx = np.argmax(scores, axis=-1)[0, i]
        category_conf = np.max(scores, axis=-1)[0, i]
        text = f'{labels_coco[category_idx]} = {category_conf:.2}'
        
        box_x0y0x1y1 = boxes_x0y0x1y1[0, i]        
        box_x0y0x1y1[0] *= ratio_w
        box_x0y0x1y1[1] *= ratio_h
        box_x0y0x1y1[2] *= ratio_w
        box_x0y0x1y1[3] *= ratio_h
        box_x0y0x1y1[0] += crop_w
        box_x0y0x1y1[1] += crop_h
        box_x0y0x1y1[2] += crop_w
        box_x0y0x1y1[3] += crop_h
        box_x0y0x1y1 = box_x0y0x1y1.astype('int')
        
        cap_frame = cv2.rectangle(cap_frame, (crop_w, crop_h), (frame_w - crop_w, frame_h - crop_h), (255, 0, 0), 1)
        cap_frame = cv2.rectangle(cap_frame, (box_x0y0x1y1[0], box_x0y0x1y1[1]), (box_x0y0x1y1[2], box_x0y0x1y1[3]), (0, 0, 255), 1)
        cap_frame = cv2.putText(cap_frame, text, (box_x0y0x1y1[0] + 2, box_x0y0x1y1[1] - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255))
            
    
    cap_frame = cv2.putText(cap_frame, f'{fps:.2}fps', (2, frame_h - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 0))
    output.write(cap_frame.copy())
    displayport.writeframe(cap_frame)    

In [11]:
output.release()
displayport.close()
cap.release()
tcu.close()