# Load models

In [1]:
import json
import trt_pose.coco

with open('human_pose.json', 'r') as f:
    human_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(human_pose)

keypoints = ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder",
    "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist",
    "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle", "neck"]

### Pose Classifier

In [2]:
import pickle
file = open("best.pkl",'rb')
posemodel = pickle.load(file)
file.close()

### Box Detector

In [3]:
import torch
boxmodel = torch.hub.load('ultralytics/yolov5', 'custom', path='box.pt')
boxmodel.conf = 0.4

Using cache found in /home/mscai/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2021-8-10 torch 1.8.0 CUDA:0 (NVIDIA Tegra X1, 3964.1328125MB)

Fusing layers... 
Model Summary: 224 layers, 7056607 parameters, 0 gradients
Adding AutoShape... 


### Pose Estimator

In [4]:
import trt_pose.models

num_parts = len(human_pose['keypoints'])
num_links = len(human_pose['skeleton'])

model = trt_pose.models.resnet18_baseline_att(num_parts, 2 * num_links).cuda().eval()

WIDTH = 224
HEIGHT = 224

data = torch.zeros((1, 3, HEIGHT, WIDTH)).cuda()

In [5]:
from torch2trt import TRTModule
OPTIMIZED_MODEL = 'resnet18_baseline_att_224x224_A_epoch_249_trt.pth'
model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))

<All keys matched successfully>

In [6]:
import cv2
import torchvision.transforms as transforms
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

In [21]:
from datetime import datetime
now = datetime.now()

# Functions

In [18]:
def preprocess(image):
    global device
    device = torch.device('cuda')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

def scalepose(pose):
    means = np.array([np.mean(pose[:,0]),np.mean(pose[:,1])])
    stds = np.array([np.std(pose[:,0]),np.std(pose[:,1])])
    if np.count_nonzero(stds) != 2: return pose # Bail out on an empty Skeleton
    pose = (pose.T - means[:,None]) / stds[:,None]
    return pose.T

def pose_class(peaks, counts, threshold, parts, bounds):  # TODO extract left and right wrist
    npeaks = peaks.numpy()[0]
    skeletons, skpresence, boxtype = list(), list(), list()
    if counts[0] == 0: y_pred = [-1]
    for i in range(counts[0]):
        if np.count_nonzero(npeaks[:,i]) >= threshold:
            skeleton = scalepose(npeaks[:,i]).flatten()
            skeletons.append(skeleton)
            skpresence.append(True)
            boxtype.append(box_interact(parts, bounds, i))
        else: skpresence.append(False)
    if len(skeletons)>0: 
        pred = posemodel.predict(skeletons)
        y_pred = list()
        for p, b in zip(pred,boxtype):
            if p == 1:
                if b == 'heavy_box': y_pred.append(2)
                else: y_pred.append(1) # Includes regular box and no box
            elif p == 2: y_pred.append(3)
            else: y_pred.append(p)
    else: 
        y_pred = [-1]
    return skpresence, y_pred, skeletons

# 0 = Correct
# 1 = Incorrect, light box
# 2 = Incorrect, heavy box
# 3 = Nonlifting

def box_interact(parts, bounds, i):
    boxtype = None
    lw = (parts[i]['left_wrist']['x'], parts[i]['left_wrist']['y'])
    rw = (parts[i]['right_wrist']['x'], parts[i]['right_wrist']['y'])
    for key in bounds:
        box = bounds[key]
        try:
            if None not in lw:
                if lw[0] > box['x1'] and lw[0] < box['x2'] and lw[1] > box['y1'] and lw[1] < box['y2'] and boxtype == None:
                    boxtype = box['name']
            if None not in rw:
                if rw[0] > box['x1'] and rw[0] < box['x2'] and rw[1] > box['y1'] and rw[1] < box['y2'] and boxtype == None:
                    boxtype = box['name']
        except: boxtype = None
    return boxtype

def kpoints(objects, counts, peaks):
    kpoint_total = dict()
    for i in range(counts[0]):
        kpoint_local = dict()
        human = objects[0][i]
        C = human.shape[0]
        for j in range(C):
            k = int(human[j])
            if k >= 0:
                peak = peaks[0][j][k]
                y = float(peak[0])
                x = float(peak[1])
                kpoint_local.update({keypoints[j]:{'x':x, 
                                                   'y':y}})
            else: kpoint_local.update({keypoints[j]:{'x':None,
                                                     'y':None}})
        kpoint_total.update({i:kpoint_local})
    return kpoint_total

def execute(change):
    # Outputs same size as input, resizes copy for inference.
    image = cv2.flip(change['new'], 0)

    # Find boxes
    boxResults = boxmodel([np.flip(image,2)])
    bounds = bounding_boxes(boxResults)
    
    # Find Poses
    data = preprocess(cv2.resize(image,(HEIGHT,WIDTH)))
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)
    parts = kpoints(objects, counts, peaks)
    
    # Classify Poses with boxes
    skpresence, y_pred, skeletons = pose_class(peaks, counts, 6, parts, bounds)
    #statusupdate(status, skpresence, y_pred, skeletons)
    
    # Annotate Image
    draw_objects(image, counts, objects, peaks, skpresence, y_pred)
    image = bounding_box_draw(image, bounds)
    
    if now:
        timetaken = datetime.now() - now
        fps = 1/timetaken
        with status: print('FPS = {fps}')
        now = datetime.now()
    
    # Output
    image_w.value = bgr8_to_jpeg(image[:, ::-1, :])

    
def execute_vid(image):
    # Outputs same size as input, resizes copy for inference.
    
    image = cv2.flip(image,1)
    
    # Find boxes
    boxResults = boxmodel([np.flip(image,2)])
    bounds = bounding_boxes(boxResults)
    
    # Find Poses
    data = preprocess(cv2.resize(image,(HEIGHT,WIDTH)))
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    parts = kpoints(objects, counts, peaks)
    
    skpresence, y_pred, skeletons = pose_class(peaks, counts, 8, parts, bounds)
    #statusupdate(status, skpresence, y_pred, skeletons)
    
    # Annotate Image
    image = bounding_box_draw(image, bounds)
    draw_objects(image, counts, objects, peaks, skpresence, y_pred)
    
    if now:
        timetaken = datetime.now() - now
        fps = 1/timetaken
        with status: print('FPS = {fps}')
        now = datetime.now()
    
    # Return Image
    return bgr8_to_jpeg(image[:, ::-1, :])
    
def statusupdate(status, skpresence, y_pred, skeletons):
    pred = ["Correct", "Incorrect", "Incorrect - Heavy Box" "Nonlifting"]
    status.clear_output()
    with status: 
        print(f"Poses detected = {len(skpresence)}\nPoses Classified = {len(y_pred)}")
    if len(skpresence)>=1:
        for i in range(len(skpresence)):
            with status: 
                print(pred[y_pred[i]] + "\n")
                print(skeletons[i])
    else:
        with status: print("No poses detected.")

def bounding_boxes(detections):
    names = detections.names
    detections = detections.xyxyn[0].tolist()
    boxes = dict()
    for i in range(len(detections)):
        d = detections[i]
        boxes.update({i:{
            'name':names[int(d[5])],
            'x1':d[0],
            'y1':d[1],
            'x2':d[2],
            'y2':d[3],
            }})
    return boxes

def bounding_box_draw(image, bounds):
    for key in bounds.keys():
        h, w, _ = image.shape
        if bounds[key]['name'] == 'heavy_box': color = (0, 0, 255)
        else: color = (0, 255, 0)
        x1 = int(bounds[key]['x1']*w)
        x2 = int(bounds[key]['x2']*w)
        y1 = int(bounds[key]['y1']*h)
        y2 = int(bounds[key]['y2']*h)
        image = cv2.rectangle(image, (x1,y1), (x2,y2), color = color, thickness = 5)
    return image

In [8]:
from gdp_drawobjects import DrawObjects
from trt_pose.parse_objects import ParseObjects

parse_objects = ParseObjects(topology)
draw_objects = DrawObjects(topology)

## Establish Camera

In [30]:
from jetcam.csi_camera import CSICamera
from jetcam.utils import bgr8_to_jpeg

#camera = CSICamera(width=WIDTH, height=HEIGHT, capture_fps=30)
camera = CSICamera(width=1280, height=720, capture_fps=30)

camera.running = True

RuntimeError: Could not initialize camera.  Please see error trace.

## Output Display

In [15]:
import ipywidgets
from IPython.display import display

image_w = ipywidgets.Image(format='jpeg')
status = ipywidgets.Output(layout={'border': '1px solid black'})

display(image_w)
display(status)

Image(value=b'', format='jpeg')

Output(layout=Layout(border='1px solid black'))

## Run

In [24]:
# Single Frame

execute({'new': camera.value})

In [26]:
# Continuous

camera.observe(execute, names='value')

In [27]:
# Stop Continuous

camera.unobserve_all()

In [28]:
# Release Camera (Do this when you're done or you might struggle to start up again)

camera.running = False

## Below this is pre-recorded Video Inference

In [11]:
import os
from jetcam.utils import bgr8_to_jpeg
import matplotlib.pyplot as plt
vidpath = '../../../video'
vids = os.listdir('../../../video')
vids = [vid for vid in vids if vid[0] is not '.']
vids

['TS008_01.mp4',
 'TS004_01.mp4',
 'TS005_01.mp4',
 'inferred.avi',
 'TS003_02.mp4',
 'TS005_02.mp4',
 'TS007_02.mp4',
 'TS001_01.mp4',
 'TS006_01.mp4',
 'TS002_01.mp4',
 'TS001_02.mp4',
 'TS007_01.mp4',
 'TS003_01.mp4',
 'video.ipynb']

In [34]:
image_w.value = execute_vid(cv2.imread('../../../training_images/incorrect/0010.jpeg'))

{0: {'name': 'box', 'x1': 0.44218748807907104, 'y1': 0.40414267778396606, 'x2': 0.91796875, 'y2': 0.8484786152839661}}


In [31]:
vidfile = 'TS008_01.mp4'

vid = cv2.VideoCapture(os.path.join(vidpath,vidfile))
#out = cv2.VideoWriter(os.path.join(vidpath,'inferred.avi'),cv2.VideoWriter_fourcc(*'DIVX'), 25, (1280,720))

while True:
    ret_val, frame = vid.read()
    if ret_val:
        output = execute_vid(frame)
        image_w.value = output
        #out.write(output)
    else:
        break
    if cv2.waitKey(1) == 27:
        break  # esc to quit
        
#out.release()