In [None]:
import sys
import json
import logging
import time
import re
from datetime import datetime
from logging.handlers import RotatingFileHandler
import cProfile

import boto3
import cv2
import numpy as np
import panoramasdk

from uuid import uuid4
import socket
import json
import copy
from sympy import Point, Polygon

import zmq
from multiprocessing import Process


In [None]:
node = panoramasdk.node()

In [None]:
# Modify your account and other information here

#REGION = "ap-southeast-1"
REGION = "us-east-1"

IOT_PUB_TOPIC = "panorama/people/event"

#S3_BUCKET_NAME = "panorama-tailgating-img-tec"
S3_BUCKET_NAME = "cvsecurity-storage-99cc675e13243-staging"
S3_PREFIX = "public/"

IPC_PORT = 5555
#IPC_PORT = 5558

#DETECT_AREA = [(895, 903), (1327, 940), (1283, 1040), (842, 992)]
#DETECT_AREA = [(452, 553), (677, 562), (689, 639), (422, 640)]
DETECT_AREA = [(138, 605), (372, 579), (398,670), (183, 708)]



In [None]:
def plot_polygon(im, cordon_coordinates=None, color=[0, 0, 255], line_thickness=10):
    
    [cv2.line(im, cordon_coordinates[idx], cordon_coordinates[idx + 1], color, line_thickness) for idx in
     range(len(cordon_coordinates) - 1)]

    # Close the last point to 1st point
    cv2.line(im, cordon_coordinates[-1], cordon_coordinates[0], color, line_thickness)


In [None]:

def preprocess(img, size):
    """Resizes and normalizes a frame of video."""
    resized = cv2.resize(img, (size, size))
    mean = [0.485, 0.456, 0.406]  # RGB
    std = [0.229, 0.224, 0.225]  # RGB
    img = resized.astype(np.float32) / 255.  # converting array of ints to floats
    r, g, b = cv2.split(img)
    # normalizing per channel data:
    r = (r - mean[0]) / std[0]
    g = (g - mean[1]) / std[1]
    b = (b - mean[2]) / std[2]
    # putting the 3 channels back together:
    x1 = [[[], [], []]]
    x1[0][0] = r
    x1[0][1] = g
    x1[0][2] = b
    return np.asarray(x1)


In [None]:
def split_s3_path( s3_path ):
    re_pattern_s3_path = "s3://([^/]+)/(.*)"
    re_result = re.match( re_pattern_s3_path, s3_path )
    bucket = re_result.group(1)
    key = re_result.group(2)
    return bucket, key


In [None]:
def get_logger(name=__name__,level=logging.INFO):
    logger = logging.getLogger(name)
    logger.setLevel(level)
    
    #handler = RotatingFileHandler("/opt/aws/panorama/logs/app.log", maxBytes=100000000, backupCount=2)
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(level)
    
    formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
                                    datefmt='%Y-%m-%d %H:%M:%S')
    handler.setFormatter(formatter)
    
    logger.addHandler(handler)
    
    return logger

logger = get_logger(level=logging.INFO)


In [None]:
logger.info('stdout logger test.')

In [None]:
class Application:
    def __init__(self):
        
        """Initializes the application's attributes with parameters from the interface, and default values."""
        
        self.sub_process = None
        self.socket = None

        self.MODEL_NODE = "model_node"
        self.MODEL_DIM = 512
        self.frame_num = 0
        self.threshold = 50.
        # Desired class
        self.classids = [14.]

        self.s3_client = boto3.resource('s3', region_name=REGION)
        self.iot_client = boto3.client('iot-data', region_name=REGION)

        self.last_count = 0
        self.current_count = 0
        self.total_count = 0

        # Polygon settings
        p1, p2, p3, p4 = map(Point, DETECT_AREA)
        self.poly = Polygon(p1, p2, p3, p4)

        try:
            # Parameters
            logger.info('Getting parameters')
            self.threshold = node.inputs.threshold.get()
        except:
            logger.exception('Error during initialization.')
        finally:
            logger.info('Initialiation complete.')
            logger.info('Threshold: {}'.format(self.threshold))

        self.sub_process = Process(target=self.listener)
        self.sub_process.start()

        # Init zmq publisher
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.PUB)
        self.socket.bind(f"tcp://*:{IPC_PORT}")

    def cleanup(self):
        
        if self.socket:
            print( "cleaning up socket" )
            self.socket.close()
            print( "cleaned up socket" )
                    
        if self.sub_process:
            print( "cleaning up sub process" )
            self.sub_process.kill()
            self.sub_process.join()
            self.sub_process.close()
            print( "cleaned up sub process" )
        
    def upload_image(self, s3_url, copy_to_send):

        print( f"Uploading image : {s3_url}" )

        raw_serial = cv2.imencode('.png', copy_to_send)[1].tobytes()
        
        s3_bucket, s3_path = split_s3_path(s3_url) 
        self.s3_client.Object(s3_bucket, s3_path).put(Body=raw_serial, ContentType='image/PNG')

    def listener(self):
        context = zmq.Context()
        socket = context.socket(zmq.SUB)
        socket.connect(f"tcp://localhost:{IPC_PORT}")
        # Set setsockopt to receive all message
        socket.setsockopt(zmq.SUBSCRIBE, "".encode('utf-8'))

        while True:
            msg = socket.recv_pyobj()
            self.upload_image(*msg)
            #threading.Thread(target=self.ppe_iot_handler.detect_and_report, args=(*msg, )).start()

    def publish(self, socket, args):
        """Usage
        send_args = [[stream_ids[idx], result_boxes, self.cordon_area[idx], image_list[idx]], event_no]
        self.publish(socket, send_args)
        """
        logger.info(f'Publish to s3')
        socket.send_pyobj(args)

    def process_streams(self):
        """Processes one frame of video from one or more video streams."""
        self.frame_num += 1
        logger.debug( f"Frame {self.frame_num}" )

        # Loop through attached video streams
        streams = node.inputs.video_in.get()
        for stream in streams:
            self.process_media(stream)

        node.outputs.video_out.put(streams)

    def process_media(self, stream):
        """Runs inference on a frame of video."""
        image_data = preprocess(stream.image, self.MODEL_DIM)
        logger.debug(image_data.shape)

        # Run inference
        inference_results = node.call({"data":image_data}, self.MODEL_NODE)

        # Process results (object deteciton)
        self.process_results(inference_results, stream)

    def process_results(self, inference_results, stream):
        """Processes output tensors from a computer vision model and annotates a video frame."""
        if inference_results is None:
            logger.warning("Inference results are None.")
            return

        num_people = 0
        
        stream_image_h, stream_image_w, _ = stream.image.shape
        
        assert isinstance(inference_results, tuple)
        assert len(inference_results)==3
        
        class_data, conf_data, bbox_data = inference_results[0][0], inference_results[1][0], inference_results[2][0]
        
        #print( "class_data :", class_data )
        #print( "conf_data :", conf_data )
        #print( "bbox_data :", bbox_data )

        now = datetime.now()

        for a in range(len(conf_data)):
            if conf_data[a][0] * 100 > self.threshold and class_data[a][0] in self.classids:
                #print( f"Detected : score={conf_data[a][0]}, class={class_data[a][0]}" )
                (left, top, right, bottom) = np.clip(bbox_data[a]/self.MODEL_DIM,0,1)
                #stream.add_rect(left, top, right, bottom)
                cv2.rectangle(stream.image, (int(left * stream_image_w), int(top * stream_image_h)), (int(right * stream_image_w), int(bottom * stream_image_h)), (255, 88, 9), 2)
                # office settings
                check_bottom = bottom * stream_image_h
                check_mid = ((left + right) / 2) * stream_image_w
                
                """
                if self.poly.encloses_point(Point(check_mid, check_bottom)):
                    num_people += 1
                """
                
            else:
                continue

        self.current_count = num_people

        send_args = None
        if self.current_count != self.last_count :
            logger.info('# people {}'.format(str(num_people)))
            if self.current_count > self.last_count :
                self.total_count += self.current_count - self.last_count

            ts = datetime.timestamp(now)
            data = {
                "timestamp": ts,
                "last_count": self.last_count,
                "current_count": self.current_count,
                "camera_name": stream.stream_id,
                "s3_url": "s3://{}/{}{}.png".format(S3_BUCKET_NAME, S3_PREFIX, ts)
            }
            send_args = [data["s3_url"]]

            self.last_count = self.current_count
            message_json = json.dumps(data)
            self.iot_client.publish(
                topic=IOT_PUB_TOPIC,
                payload=bytes(message_json, "utf-8"),
                qos=1)

        cv2.putText(stream.image,
            text='# total people {}'.format(str(self.total_count)),
            org=(200, 100),
            fontFace=cv2.FONT_HERSHEY_SIMPLEX,
            fontScale=1.0,
            color=(255, 255, 255),
            thickness=2,
            lineType=cv2.LINE_4)
        
        plot_polygon(stream.image, DETECT_AREA, line_thickness=3)

        if send_args:
            send_args.append(copy.deepcopy(stream.image))
            self.publish(self.socket, send_args)


In [None]:
app = Application()

In [None]:
def mainLoop():
    try:
        while True:
            app.process_streams()
    except KeyboardInterrupt:
        print( "KeyboardInterrupt" )
          

#mainLoop()
cProfile.runctx( "mainLoop()", globals(), locals() )


In [None]:
app.cleanup()