In [1]:
# Connect to ESP - Use your hostname + port
import esppy
esp = esppy.ESP(hostname='localhost', port=9900)

In [2]:
%%time
import threading
import time
import websocket
import json
from random import randint
import numpy as np
import base64
import cv2
import esppy

#esp = esppy.ESP(hostname='http://localhost:9900')
esp_project = esp.create_project('test', n_threads=10)
esp_project.pubsub = 'manual'
esp_project.add_continuous_query('contquery')

# Window: Video Capture
vid_capture = esp.SourceWindow(schema=('id*:int64', 'image:blob'),
index_type='empty', insert_only=True)
vid_capture.pubsub = True
esp_project.windows['w_input'] = vid_capture

# Window: Video Resize
vid_capture_resize = esp.CalculateWindow(algorithm='ImageProcessing', 
                                         name='resized', 
                                         function='resize',
                                         height=416, 
                                         width=416, 
                                         input_map=dict(imageInput='image'), 
                                         output_map=dict(imageOutput='_image_'))
vid_capture_resize.schema_string = 'id*:int64,image:blob,_image_:blob'
esp_project.windows['w_resize'] = vid_capture_resize

# Window: Model Reader
model_reader = esp.ModelReaderWindow()
esp_project.windows['w_reader'] = model_reader

# Window: Model Request
model_request = esp.SourceWindow(schema=('req_id*:int64', 'req_key:string', 'req_val:string'),index_type='empty', insert_only=True)
esp_project.windows['w_request'] = model_request

# Window: Model Score
model_score = esp.ScoreWindow()
model_score.pubsub = True
model_score.add_offline_model(model_type='astore')
def score_window_fields(number_objects):
    _field = "id*:int64,image:blob,_image_:blob,_nObjects_:double,"
    for obj in range(0,number_objects):
        _field += "_Object" + str(obj) + "_:string,"
        _field += "_P_Object" + str(obj) + "_:double,"
        _field += "_Object" + str(obj) + "_x:double,"
        _field += "_Object" + str(obj) + "_y:double,"
        _field += "_Object" + str(obj) + "_width:double,"
        _field += "_Object" + str(obj) + "_height:double,"
    return _field[:-1]
model_score.schema_string = score_window_fields(20)
esp_project.windows['w_score'] = model_score

# Connections
vid_capture.add_target(vid_capture_resize, role='data')
vid_capture_resize.add_target(model_score, role='data')
model_request.add_target(model_reader, role='request')
model_reader.add_target(model_score, role='model')

# Load Project time delta
esp.load_project(esp_project)

# Publisher: Send Model -> Adapt the reference to your model file location
# "usegpuesp" tells ESP to use GPU
# "ndevices" tells ESP how many GPUs to use
pub = model_request.create_publisher(blocksize=1, rate=0, pause=0, dateformat='%Y%dT%H:%M:%S.%f', opcode='insert', format='csv')
pub.send('i,n,1,"usegpuesp","1"\n')
pub.send('i,n,2,"ndevices","1"\n')
pub.send('i,n,3,"action","load"\n')
pub.send('i,n,4,"type","astore"\n')
#pub.send('i,n,5,"reference","/data/models/Tiny-Yolov2.astore"\n')
pub.send('i,n,5,"reference","/data/models/Tiny-Yolov2_face.astore"\n')
pub.send('i,n,6,,\n')
pub.close()

# Publisher: Send Video
pub = vid_capture.create_publisher(blocksize=1, rate=0, pause=0, opcode='insert', format='csv')

CPU times: user 35.1 ms, sys: 3.96 ms, total: 39 ms
Wall time: 917 ms


In [3]:
# Publishes frames from your camera to ESP (using base64 encoding)
class video_pub():
    def __init__(self, pub):
        self.cap = cv2.VideoCapture(0)
        self.pub = pub
        
    def stream(self):
        while True:
            ret, frame = self.cap.read()
            frame = cv2.flip(frame, 1)
            _, buffer = cv2.imencode('.jpg', frame)
            encoded_string = base64.b64encode(buffer)
            strToSend = 'i, n, ' + str(int(time.time()*100)) + ',' + encoded_string.decode() + ',' + '\n'
            self.pub.send(strToSend)
            time.sleep(0.1)

In [4]:
# Again, change according to your ESP server
class video_sub():
    def __init__(self):
        self.ws = websocket.WebSocketApp("ws://localhost:9900/SASESP/subscribers/test/contquery/w_score/?format=json&mode=streaming&pagesize=1&schema=true",
                                 on_message = self.on_message,
                                 on_error = self.on_error,
                                 on_close = self.on_close)
        self.ws.on_open = self.on_open
        self.frame = None
        return
        
    def highlightImage(self, data):
        object_list = ['Human_Face']
        color_palette = [
        (0,64,255), #red
        (0,191,255), #orange
        (0,255,255), #yellow
        (0,255,64), #green
        (255,255,0) #blue
        ]
        obj_colors = {}
        i = 0
        for _object in object_list:
            obj_colors[_object] = color_palette[i]
            i += 1

        row = data['events'][0]['event']
        numberOfObjects = data['events'][0]['event']['_nObjects_']
        imageBufferBase64 = data['events'][0]['event']['image']['image']

        nparr = np.frombuffer(base64.b64decode(imageBufferBase64), dtype=np.uint8)
        frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        image_h, image_w,_ = frame.shape
        for i in range(0, int(float(numberOfObjects))):
            obj = row['_Object' + str(i) + '_']
            prob = float(row['_P_Object' + str(i) + '_'])
            probability = " (" + str(round(prob * 100, 2)) + "%)"
            x = float(row['_Object' + str(i) + '_x'])
            y = float(row['_Object' + str(i) + '_y'])
            width = float(row['_Object' + str(i) + '_width'])
            height = float(row['_Object' + str(i) + '_height'])
            x1 = int(image_w * (x - width / 2))
            y1 = int(image_h * (y - height/ 2))
            x2 = int(image_w * (x + width / 2))
            y2 = int(image_h * (y + height/ 2))
            if obj in obj_colors:
                bbox_color = obj_colors[obj]
                border_offset = 3
                cv2.rectangle(frame,(x1,y1),(x2,y2),bbox_color,1)
                (label_width, label_height), baseline = cv2.getTextSize(obj + probability, cv2.FONT_HERSHEY_DUPLEX, 0.4, 1)
                cv2.rectangle(frame,(x1,y1),(x1+label_width+10,y1-label_height-border_offset-10),bbox_color,-1)
                cv2.putText(frame, obj.lower() + probability, (x1+5, y1-border_offset-5), cv2.FONT_HERSHEY_DUPLEX, 0.4, (0, 0, 0), 1,
                    cv2.LINE_AA)
        return frame

    def on_message(self, message):
        data = json.loads(message)
        self.frame = self.highlightImage(data)

    def on_error(self, error):
        None
        #print(error)


    def on_close(self):
        print("### closed ###")


    def on_open(self):
        print('open')

In [5]:
print('Starting Publisher Thread.')
video_pub1 = video_pub(pub)
video_pub1_t = threading.Thread(target=video_pub1.stream)
video_pub1_t.daemon = True
video_pub1_t.start()

time.sleep(3)

print('Starting Subsriber Thread.')
video_sub1 = video_sub()
video_sub1_t = threading.Thread(target=video_sub1.ws.run_forever)
video_sub1_t.daemon = True
video_sub1_t.start()

Starting Publisher Thread.
Starting Subsriber Thread.
open


ERROR:websocket:error from callback <bound method video_sub.on_message of <__main__.video_sub object at 0x7f2c51f34ad0>>: Expecting value: line 1 column 1 (char 0)


In [6]:
while(True):
    # Capture frame-by-frame
    frame = video_sub1.frame

    # Display the resulting frame
    cv2.imshow('frame',frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# When everything done, release the capture
cv2.destroyAllWindows()

ERROR:websocket:error from callback <bound method video_sub.on_message of <__main__.video_sub object at 0x7f2c51f34ad0>>: 'events'


error: OpenCV(4.1.2) /io/opencv/modules/highgui/src/window.cpp:376: error: (-215:Assertion failed) size.width>0 && size.height>0 in function 'imshow'
