In [1]:
import os
import codecs
import requests
import base64
import sys
import json

from io import BytesIO

import param
import PIL
import panel as pn
import numpy as np
import cv2 as cv
import pandas as pd
import holoviews as hv

from PIL import ImageDraw

hv.extension('bokeh')
pn.extension()

ImportError: dlopen(/Users/philippjfr/miniconda3/envs/pyconde2019/lib/python3.7/site-packages/cv2.cpython-37m-darwin.so, 2): Library not loaded: @rpath/libfreetype.6.dylib
  Referenced from: /Users/philippjfr/miniconda3/envs/pyconde2019/lib/libopencv_freetype.4.1.dylib
  Reason: Incompatible library version: libopencv_freetype.4.1.dylib requires version 24.0.0 or later, but libfreetype.6.dylib provides version 23.0.0

In [None]:
class ObjectDetector(param.Parameterized):
    """
    Detects objects in a webcam video stream using OpenCV Tensorflow model.
    """
    
    detection_threshold = param.Number(default=0.5, bounds=(0, 1))
    
    snapshot = param.Action(lambda x: x.param.trigger('snapshot'))

    vstream = param.ClassSelector(class_=pn.widgets.VideoStream)
    
    def __init__(self, **params):
        if 'vstream' not in params:
            params['vstream'] = pn.widgets.VideoStream(width=480, height=360)
        super(ObjectDetector, self).__init__(**params)
        self._bounds = []
        self.cvNet = cv.dnn.readNetFromTensorflow('./ssd_mobilenet_v2_coco_2018_03_29/frozen_inference_graph.pb',
                                                  './ssd_mobilenet_v2_coco_2018_03_29.pbtxt')
    
    def get_img(self):
        base64 = self.vstream.value.split(',')[1]
        b = codecs.decode(bytes(base64, 'utf-8'), 'base64')
        return PIL.Image.open(BytesIO(b))

    def detect_objects(self, img):
        cvimg = cv.cvtColor(np.array(img).astype(np.uint8), cv.COLOR_RGB2BGR)
        rows = cvimg.shape[0]
        cols = cvimg.shape[1]
        self.cvNet.setInput(cv.dnn.blobFromImage(cvimg, size=(200, 200), swapRB=True, crop=False))
        cvOut = self.cvNet.forward()

        draw = ImageDraw.Draw(img)
        bounds = []
        for detection in cvOut[0,0,:,:]:
            score = float(detection[2])
            if score > self.detection_threshold:
                left = detection[3] * cols
                top = detection[4] * rows
                right = detection[5] * cols
                bottom = detection[6] * rows
                bounds.append((left, rows-bottom, right, rows-top))
                draw.rectangle([left, bottom, right, top])
        return img, bounds

    @param.depends('snapshot', watch=True)
    def _take_snapshot(self):
        self.vstream.snapshot()

    @param.depends('vstream.value')
    def get_objects(self):
        try:
            img = self.get_img()
        except:
            img = np.array([[0]], dtype='uint8')

        try:
            img, bounds = self.detect_objects(img)
        except:
            img, bounds = PIL.Image.fromarray(img), []
        self._bounds = bounds
        return pn.panel(img, width=480, height=360)

    @param.output()
    def image(self):
        return hv.RGB(np.array(self.get_img()), bounds=(0, 0, 640, 480)).opts(
            frame_width=640, frame_height=480, xaxis=None, yaxis=None)
    
    @param.output()
    def polygons(self):
        return hv.Path([hv.Bounds(b) for b in self._bounds]).opts(
            line_color='black', line_width=3, apply_ranges=False, tools=['tap'])
    
    def panel(self):
        row = pn.Row(self.vstream, self.get_objects)
        return pn.Row(pn.layout.HSpacer(), pn.Column(self.param.detection_threshold, row, self.param.snapshot), pn.layout.HSpacer())

detector = ObjectDetector()

detector.panel()

In [None]:
class Selector(param.Parameterized):
    """
    Allows editing and selecting bounding boxes.
    """
    
    polygons = param.ClassSelector(class_=hv.Path)
    
    image = param.ClassSelector(class_=hv.Image)
        
    def panel(self):
        self.edit = hv.streams.PolyEdit(source=self.polygons)
        self.selection = hv.streams.Selection1D(source=self.polygons)
        return pn.Row(pn.layout.HSpacer(), pn.panel(self.image * self.polygons), pn.layout.HSpacer())

    @param.output()
    def images(self):
        return [self.image.select(x=path.range(0), y=path.range(1))
                for i, path in enumerate(self.edit.element.split())
                if i in self.selection.index]

selector = Selector(image=detector.image(), polygons=detector.polygons())

selector.panel()

In [None]:
class VisionAPI(param.Parameterized):
    
    images = param.List()
    
    GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY')
    GOOGLE_CLOUD_VISION_URL = "https://vision.googleapis.com/v1/images:annotate"

    @classmethod
    def make_request(cls, img):
        pimg = PIL.Image.fromarray(img.data)
        bio = BytesIO()
        pimg.save(bio, 'png')
        bio.seek(0)
        b64_string = base64.b64encode(bio.read()).decode('utf-8')
        body = {
            "requests": [{
                "image": {"content": b64_string},
                "features": [{"type": "LABEL_DETECTION"}]
            }]
        }
        return requests.post("%s?key=%s" % (cls.GOOGLE_CLOUD_VISION_URL, cls.GOOGLE_API_KEY),
                             json.dumps(body), headers={'content-type': 'application/json'})
    
    def panel(self):
        panes = []
        for img in self.images:
            panes.append(pn.panel(PIL.Image.fromarray(img.data), width=300))
            panes.append(pd.DataFrame(self.make_request(img).json()['responses'][0]['labelAnnotations'],
                                      columns=['description', 'score', 'topicality']))
        return pn.Row(pn.layout.HSpacer(), pn.Row(*panes), pn.layout.HSpacer())
    
VisionAPI(images=selector.images()).panel()

In [None]:
pipeline = pn.pipeline.Pipeline()

pipeline.add_stage('Detector', ObjectDetector)
pipeline.add_stage('Selector', Selector)
pipeline.add_stage('Classifier', VisionAPI)

layout = pipeline.layout
layout.sizing_mode = 'stretch_both'
layout.servable()