## Import SAS & Open Source Packages
* SAS ESPPy for communication with SAS Event Stream Processing
* OpenCV (cv2) to display results
* Others for various other tasks

In [None]:
#Imports
import threading
import time
import websocket
import json
import numpy as np
import base64
import cv2
import esppy

## Build SAS Event Stream Processing Pipeline
This code defines a simple SAS Event Stream Processing (ESP) project that will receive images and applies a Tiny YOLOv2 model on it to detect masked and unmasked faces.<br>

In [None]:
#Connect to ESP and create ESP project
esp = esppy.ESP(hostname='http://localhost:9900')
esp_project = esp.create_project('object_detection', n_threads=10)
esp_project.pubsub = 'manual'
esp_project.add_continuous_query('contquery')

# Window: Video Capture
vid_capture = esp.SourceWindow(schema=('id*:int64', 'image:blob'),
index_type='empty', insert_only=True)
vid_capture.pubsub = True
esp_project.windows['w_input_image'] = vid_capture

# Window: Video Resize
vid_capture_resize = esp.CalculateWindow(algorithm='ImageProcessing', 
                                         name='resized', 
                                         function='resize',
                                         height=416, 
                                         width=416, 
                                         input_map=dict(imageInput='image'), 
                                         output_map=dict(imageOutput='_image_'))
vid_capture_resize.schema_string = 'id*:int64,image:blob,_image_:blob'
esp_project.windows['w_resize_image'] = vid_capture_resize

# Window: Model Reader
model_reader = esp.ModelReaderWindow()
esp_project.windows['w_read_model'] = model_reader

# Window: Model Request
model_request = esp.SourceWindow(schema=('req_id*:int64', 'req_key:string', 'req_val:string'),index_type='empty', insert_only=True)
esp_project.windows['w_request_model'] = model_request

# Window: Model Score
model_score = esp.ScoreWindow()
model_score.pubsub = True
model_score.add_offline_model(model_type='astore')
def score_window_fields(number_objects):
    _field = "id*:int64,image:blob,_image_:blob,_nObjects_:double,"
    for obj in range(0,number_objects):
        _field += "_Object" + str(obj) + "_:string,"
        _field += "_P_Object" + str(obj) + "_:double,"
        _field += "_Object" + str(obj) + "_x:double,"
        _field += "_Object" + str(obj) + "_y:double,"
        _field += "_Object" + str(obj) + "_width:double,"
        _field += "_Object" + str(obj) + "_height:double,"
    return _field[:-1]
model_score.schema_string = score_window_fields(20)
esp_project.windows['w_score_image'] = model_score

# Connections
vid_capture.add_target(vid_capture_resize, role='data')
vid_capture_resize.add_target(model_score, role='data')
model_request.add_target(model_reader, role='request')
model_reader.add_target(model_score, role='model')

# Load Project
esp.load_project(esp_project)

# Publisher: Send Model to Scoring Window
pub = model_request.create_publisher(blocksize=1, rate=0, pause=0, dateformat='%Y%dT%H:%M:%S.%f', opcode='insert', format='csv')
pub.send('i,n,1,"usegpuesp","1"\n')
pub.send('i,n,2,"ndevices","1"\n')
pub.send('i,n,3,"action","load"\n')
pub.send('i,n,4,"type","astore"\n')
pub.send('i,n,5,"reference","/data/notebooks/Face_Mask_Detection/Tiny-Yolov2.astore"\n')
pub.send('i,n,6,,\n')
pub.close()

# Publisher: Send Video
pub = vid_capture.create_publisher(blocksize=1, rate=0, pause=0, opcode='insert', format='csv')

# Display project
esppy.options.display.image_scale = 0.8
esp_project

## Publish Image frames into the SAS Event Stream Processing Project & Receive scored data
The class 'video_pub' extracts frames from a video (or camera) and publishes them into SAS Event Stream Processing.<br>
The second class 'video_sub' subscribes to the Scoring-Window to receive the coordinates of detected objects.<br>
It will display the results in a window and - optionally - save the results to a video.<br>

In [None]:
# Class to publish videos to SAS Event Stream Processing
class video_pub():
    def __init__(self, publisher, video_file, video_quality=95):
        self.cap = cv2.VideoCapture(video_file)
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
        self.video_quality = video_quality
        self.pub = publisher
        threading.Thread(target=self.stream, daemon=True).start()
        print('Publisher started!')
        
    def stream(self):
        while True:
            ret, frame = self.cap.read()
            encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), self.video_quality]
            _, buffer = cv2.imencode('.jpg', frame, encode_param)
            encoded_string = base64.b64encode(buffer)
            strToSend = 'i, n, ' + str(int(time.time()*100)) + ',' + encoded_string.decode() + ',' + '\n'
            self.pub.send(strToSend)
            
# Class to subscribe to SAS Event Stream Processing to receive scored images - optionally save to video
class video_sub():
    def __init__(self, window, save_to_file=True, filename='', video_fps=25, video_size=(1920,1080)):
        self.ws = websocket.WebSocketApp(window.subscriber_url+"?format=json&mode=streaming&pagesize=1&schema=false",
                                 on_message = self.on_message,
                                 on_error = self.on_error,
                                 on_close = self.on_close)
        self.ws.on_open = self.on_open
        self.save_to_file = save_to_file
        if save_to_file == True:
            fourcc = cv2.VideoWriter_fourcc(*"H264")
            self.out = cv2.VideoWriter(filename, fourcc, video_fps, video_size)
        threading.Thread(target=self.ws.run_forever, daemon=True).start()
        print('Subscriber Started!')
        return

    def on_message(self, message):
        try:
            data = json.loads(message)
            frame = self.highlightImage(data)
            if self.save_to_file == True:
                self.out.write(frame)
            cv2.imshow('frame',frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                cv2.destroyAllWindows()
        except Exception as e:
            print(e)

    def on_error(self, error):
        print(error)

    def on_close(self):
        print("Websocket closed!")

    def on_open(self):
        print('Websocket open!')
        
    def highlightImage(self, data):
        object_list = ['mask', 'no_mask']
        color_palette = [
            (0,255,64), #green
            (0,64,255), #red
        ]
        #BGR Colorcodes
        obj_colors = {}
        i = 0
        for _object in object_list:
            obj_colors[_object] = color_palette[i]
            i += 1

        row = data['events'][0]['event']
        numberOfObjects = data['events'][0]['event']['_nObjects_']
        imageBufferBase64 = data['events'][0]['event']['image']['image']

        nparr = np.frombuffer(base64.b64decode(imageBufferBase64), dtype=np.uint8)
        frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        image_h, image_w,_ = frame.shape
        for i in range(0, int(float(numberOfObjects))):
            obj = row['_Object' + str(i) + '_'].strip()
            prob = float(row['_P_Object' + str(i) + '_'])
            if prob > 0.8:
                probability = " (" + str(round(prob * 100, 2)) + "%)"
                x = float(row['_Object' + str(i) + '_x'])
                y = float(row['_Object' + str(i) + '_y'])
                width = float(row['_Object' + str(i) + '_width'])
                height = float(row['_Object' + str(i) + '_height'])
                x1 = int(image_w * (x - width / 2))
                y1 = int(image_h * (y - height/ 2))
                x2 = int(image_w * (x + width / 2))
                y2 = int(image_h * (y + height/ 2))
                if obj in obj_colors:
                    bbox_color = obj_colors[obj]
                    border_offset = 3
                    cv2.rectangle(frame,(x1,y1),(x2,y2),bbox_color,1)
                    (label_width, label_height), baseline = cv2.getTextSize(obj + probability, cv2.FONT_HERSHEY_DUPLEX, 0.4, 1)
                    cv2.rectangle(frame,(x1,y1),(x1+label_width+10,y1-label_height-border_offset-10),bbox_color,-1)
                    cv2.putText(frame, obj + probability, (x1+5, y1-border_offset-5), cv2.FONT_HERSHEY_DUPLEX, 0.4, (0, 0, 0), 1,
                        cv2.LINE_AA)
        return frame

publisher = video_pub(publisher=pub,
                      video_file='/data/notebooks/Face_Mask_Detection/video.avi',
                      #video_file=0,
                      video_quality=95)
subscriber = video_sub(window=model_score, 
                       save_to_file=True, 
                       filename='/data/notebooks/Face_Mask_Detection/scored_video2.avi', 
                       video_fps=30, 
                       video_size=(1920,1080))