In [4]:
import numpy as np
import os
from typing import Tuple
import io
import time
import cv2
import torch
import torchvision
from PIL import Image, ImageOps
from my_mobilenet_v2_tsm import MobileNetV2
from onnx_tf.backend import prepare
import onnx
import tensorflow as tf
from onnx import version_converter

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])



For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
If you depend on functionality not listed there, please file an issue.



In [5]:
from mobilenet_v2_tsm2 import MobileNetV2

from my_mobilenet_v2_tsm import MobileNetV2 as MobilenetTsmLstm

In [3]:


SOFTMAX_THRES = 0
HISTORY_LOGIT = True
REFINE_OUTPUT = True

# scale


def get_executor(use_gpu=True):
    torch_inputs = (torch.rand([1, 3, 224, 224]),
                    torch.zeros([1, 3, 56, 56]),
                    torch.zeros([1, 4, 28, 28]),
                    torch.zeros([1, 4, 28, 28]),
                    torch.zeros([1, 8, 14, 14]),
                    torch.zeros([1, 8, 14, 14]),
                    torch.zeros([1, 8, 14, 14]),
                    torch.zeros([1, 12, 14, 14]),
                    torch.zeros([1, 12, 14, 14]),
                    torch.zeros([1, 20, 7, 7]),
                    torch.zeros([1, 20, 7, 7]))
    torch_module = MobileNetV2(n_class=27)
    input_names = []
    input_shapes = {}
    torch_module.eval()
    if not os.path.exists("mobilenetv2_jester_online.pth.tar"):  # checkpoint not downloaded
        print('Downloading PyTorch checkpoint...')
        import urllib.request
        url = 'https://hanlab.mit.edu/projects/tsm/models/mobilenetv2_jester_online.pth.tar'
        urllib.request.urlretrieve(url, './mobilenetv2_jester_online.pth.tar')
    #print(torch.load("mobilenetv2_jester_online.pth.tar"))
    torch_module.load_state_dict(torch.load("mobilenetv2_jester_online.pth.tar"), strict=False)
    with torch.no_grad():
        for index, torch_input in enumerate(torch_inputs):
            name = "i" + str(index)
            input_names.append(name)
            input_shapes[name] = torch_input.shape
        buffer = io.BytesIO()
        torch.onnx.export(torch_module, torch_inputs, buffer, input_names=input_names, output_names=["o" + str(i) for i in range(len(torch_inputs))])

        outs = torch_module(*torch_inputs)
        buffer.seek(0, 0)
        onnx_model = onnx.load(buffer)
        #onnx.checker.check_model(onnx_model)
        #onnx_model = onnx.load("BERT.onnx")  # load onnx model
        tf_rep = prepare(onnx_model)  # prepare tf representation
    return tf_rep 


def transform(frame: np.ndarray):
    # 480, 640, 3, 0 ~ 255
    frame = cv2.resize(frame, (224, 224))  # (224, 224, 3) 0 ~ 255
    frame = frame / 255.0  # (224, 224, 3) 0 ~ 1.0
    frame = np.transpose(frame, axes=[2, 0, 1])  # (3, 224, 224) 0 ~ 1.0
    frame = np.expand_dims(frame, axis=0)  # (1, 3, 480, 640) 0 ~ 1.0
    return frame


class GroupScale(object):
    """ Rescales the input PIL.Image to the given 'size'.
    'size' will be the size of the smaller edge.
    For example, if height > width, then image will be
    rescaled to (size * height / width, size)
    size: size of the smaller edge
    interpolation: Default: PIL.Image.BILINEAR
    """

    def __init__(self, size, interpolation=Image.BILINEAR):
        self.worker = torchvision.transforms.Resize(size, interpolation)

    def __call__(self, img_group):
        return [self.worker(img) for img in img_group]


class GroupCenterCrop(object):
    def __init__(self, size):
        self.worker = torchvision.transforms.CenterCrop(size)

    def __call__(self, img_group):
        return [self.worker(img) for img in img_group]


class Stack(object):

    def __init__(self, roll=False):
        self.roll = roll

    def __call__(self, img_group):
        if img_group[0].mode == 'L':
            return np.concatenate([np.expand_dims(x, 2) for x in img_group], axis=2)
        elif img_group[0].mode == 'RGB':
            if self.roll:
                return np.concatenate([np.array(x)[:, :, ::-1] for x in img_group], axis=2)
            else:
                return np.concatenate(img_group, axis=2)


class ToTorchFormatTensor(object):
    """ Converts a PIL.Image (RGB) or numpy.ndarray (H x W x C) in the range [0, 255]
    to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0] """

    def __init__(self, div=True):
        self.div = div

    def __call__(self, pic):
        if isinstance(pic, np.ndarray):
            # handle numpy array
            
            img = torch.from_numpy(pic).permute(2, 0, 1).contiguous()
        else:
            # handle PIL Image
            img = torch.ByteTensor(torch.ByteStorage.from_buffer(pic.tobytes()))
            img = img.view(pic.size[1], pic.size[0], len(pic.mode))
            # put it from HWC to CHW format
            # yikes, this transpose takes 80% of the loading time/CPU
            img = img.transpose(0, 1).transpose(0, 2).contiguous()
        return img.float().div(255) if self.div else img.float()


class GroupNormalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        rep_mean = self.mean * (tensor.size()[0] // len(self.mean))
        rep_std = self.std * (tensor.size()[0] // len(self.std))

        # TODO: make efficient
        for t, m, s in zip(tensor, rep_mean, rep_std):
            t.sub_(m).div_(s)

        return tensor


def get_transform():
    cropping = torchvision.transforms.Compose([
        GroupScale(256),
        GroupCenterCrop(224),
    ])
    transform = torchvision.transforms.Compose([
        cropping,
        Stack(roll=False),
        ToTorchFormatTensor(div=True),
        GroupNormalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    return transform

catigories = [
    "Doing other things",  # 0
    "Drumming Fingers",  # 1
    "No gesture",  # 2
    "Pulling Hand In",  # 3
    "Pulling Two Fingers In",  # 4
    "Pushing Hand Away",  # 5
    "Pushing Two Fingers Away",  # 6
    "Rolling Hand Backward",  # 7
    "Rolling Hand Forward",  # 8
    "Shaking Hand",  # 9
    "Sliding Two Fingers Down",  # 10
    "Sliding Two Fingers Left",  # 11
    "Sliding Two Fingers Right",  # 12
    "Sliding Two Fingers Up",  # 13
    "Stop Sign",  # 14
    "Swiping Down",  # 15
    "Swiping Left",  # 16
    "Swiping Right",  # 17
    "Swiping Up",  # 18
    "Thumb Down",  # 19
    "Thumb Up",  # 20
    "Turning Hand Clockwise",  # 21
    "Turning Hand Counterclockwise",  # 22
    "Zooming In With Full Hand",  # 23
    "Zooming In With Two Fingers",  # 24
    "Zooming Out With Full Hand",  # 25
    "Zooming Out With Two Fingers"  # 26
]


n_still_frame = 0

def process_output(idx_, history):
    # idx_: the output of current frame
    # history: a list containing the history of predictions
    if not REFINE_OUTPUT:
        return idx_, history

    max_hist_len = 20  # max history buffer

    # mask out illegal action
    if idx_ in [7, 8, 21, 22, 3]:
        idx_ = history[-1]

    # use only single no action class
    if idx_ == 0:
        idx_ = 2
    
    # history smoothing
    if idx_ != history[-1]:
        if not (history[-1] == history[-2]): #  and history[-2] == history[-3]):
            idx_ = history[-1]
    

    history.append(idx_)
    history = history[-max_hist_len:]

    return history[-1], history



In [5]:

WINDOW_NAME = 'Video Gesture Recognition'
def main():
    print("Open camera...")
    cap = cv2.VideoCapture(0)
    _, img = cap.read()
    print(type(img), img.shape,type(cap))
    
    
    #print(cap)

    # set a lower resolution for speed up
    #cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)
    #cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)
    
    # env variables
    full_screen = False
    cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
    cv2.resizeWindow(WINDOW_NAME, 640, 480)
    cv2.moveWindow(WINDOW_NAME, 0, 0)
    cv2.setWindowTitle(WINDOW_NAME, WINDOW_NAME)


    t = None
    index = 0
    print("Build transformer...")
    transform = get_transform()
    print("Build Executor...")
    model = get_executor()
    buffer = (
        tf.Variable(np.empty((1, 3, 56, 56))),
        tf.Variable(np.empty((1, 4, 28, 28))),
        tf.Variable(np.empty((1, 4, 28, 28))),
        tf.Variable(np.empty((1, 8, 14, 14))),
        tf.Variable(np.empty((1, 8, 14, 14))),
        tf.Variable(np.empty((1, 8, 14, 14))),
        tf.Variable(np.empty((1, 12, 14, 14))),
        tf.Variable(np.empty((1, 12, 14, 14))),
        tf.Variable(np.empty((1, 20, 7, 7))),
        tf.Variable(np.empty((1, 20, 7, 7)))
    )
    """buffer = [
            
            torch.zeros([1, 3, 56, 56], dtype=torch.float32),
            torch.zeros([1, 4, 28, 28], dtype=torch.float32),
            torch.zeros([1, 4, 28, 28], dtype=torch.float32),
            torch.zeros([1, 8, 14, 14], dtype=torch.float32),
            torch.zeros([1, 8, 14, 14], dtype=torch.float32),
            torch.zeros([1, 8, 14, 14], dtype=torch.float32),
            torch.zeros([1, 12, 14, 14], dtype=torch.float32),
            torch.zeros([1, 12, 14, 14], dtype=torch.float32),
            torch.zeros([1, 20, 7, 7], dtype=torch.float32),
            torch.zeros([1, 20, 7, 7], dtype=torch.float32)
        ]"""
    idx = 0
    history = [2]
    history_logit = []
    history_timing = []

    i_frame = -1

    print("Ready!")
    while True:
        i_frame += 1
        _, img = cap.read()  # (480, 640, 3) 0 ~ 255
        print(type(img), img.shape,type(cap))
        if i_frame % 2 == 0:  # skip every other frame to obtain a suitable frame rate   
            t1 = time.time()
            #print(transform([Image.fromarray(img).convert('RGB')]).size())
            img_tran = transform([Image.fromarray(img).convert('RGB')]).unsqueeze(0)
            print(img_tran.shape)
            img_tran = tf.Variable(img_tran)
            print(type(img_tran))
            print(img_tran.shape)
            
            #out = model(img_tran, *buffer)
            
            x = model.run((img_tran,) + buffer)
            print(x)
            #print(torch.argmax(x))
            
            idx_ = torch.argmax(x)
            #idx_ = torch.max(x, 1)[1][0]
            #print(idx_)
            



            idx, history = process_output(idx_, history)
            #idx = idx_

            t2 = time.time()
            print(f"{index} {catigories[idx]}")


            current_time = t2 - t1

        img = cv2.resize(img, (640, 480))
        img = img[:, ::-1]
        height, width, _ = img.shape
        label = np.zeros([height // 10, width, 3]).astype('uint8') + 255

        cv2.putText(label, 'Prediction: ' + catigories[idx],
                    (0, int(height / 16)),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.7, (0, 0, 0), 2)
        cv2.putText(label, '{:.1f} Vid/s'.format(1 / current_time),
                    (width - 170, int(height / 16)),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.7, (0, 0, 0), 2)

        img = np.concatenate((img, label), axis=0)
        cv2.imshow(WINDOW_NAME, img)

        key = cv2.waitKey(1)
        if key & 0xFF == ord('q') or key == 27:  # exit
            break
        elif key == ord('F') or key == ord('f'):  # full screen
            print('Changing full screen option!')
            full_screen = not full_screen
            if full_screen:
                print('Setting FS!!!')
                cv2.setWindowProperty(WINDOW_NAME, cv2.WND_PROP_FULLSCREEN,
                                      cv2.WINDOW_FULLSCREEN)
            else:
                cv2.setWindowProperty(WINDOW_NAME, cv2.WND_PROP_FULLSCREEN,
                                      cv2.WINDOW_NORMAL)


        if t is None:
            t = time.time()
        else:
            nt = time.time()
            index += 1
            t = nt

    cap.release()
    cv2.destroyAllWindows()


main()

Open camera...
<class 'numpy.ndarray'> (480, 640, 3) <class 'cv2.VideoCapture'>
Build transformer...
Build Executor...


  c0 = int(copy.deepcopy(c)) // 8


Ready!
<class 'numpy.ndarray'> (480, 640, 3) <class 'cv2.VideoCapture'>
torch.Size([1, 3, 224, 224])
<class 'tensorflow.python.ops.variables.RefVariable'>
(1, 3, 224, 224)


ValueError: setting an array element with a sequence.