In [3]:
import os
import logging
import json
import csv
import collections
# import get_secret

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


# Load settings from the deployment path or Default file
config_path = "config.json"
if os.path.exists("/config/camera_inference_config.json"):
    config_path = "/config/camera_inference_config.json"


class StreamConfig(dict):

    # deployment config path
    DEFAULT_CONFIG_FILE = config_path
    DEFAULT_CLASSES_FILE = "classes.csv"
    DEFAULT_GSTREAMER_SRC = "videotestsrc is-live=TRUE do-timestamp=TRUE ! video/x-raw, framerate=30/1, width=X, height=Y ! videoconvert ! timeoverlay ! appsink wait-on-eos=false max-buffers=1 drop=True"
    DEFAULT_GSTREAMER_DST = "appsrc ! queue leaky=2 ! jpegenc ! tcpserversink host=0.0.0.0 port=X sync-method=0 recover-policy=0 sync=false"
    DETECTION_KEY = "detection"
    SUBDETECTION_KEY = "inner"

    def __init__(self, *args, **kwargs):
        # Default params
        self.update(
            {
                "id": "default",
                "info": {"vehiclePlazaID": "0", "vehicleLaneID": "0", "vehiclePlazaName": "0"},
                "source": {
                    "name": "/video/ambulance.mp4",
                    "type": "GSTREAMER",  # FILE, RTSP, LIVE, GSTREAMER, RANDOM or TEST
                    "restart_on_failure": True,
                    "ip": None,
                    "port": 554,
                    "username": None,
                    "password": None,
                    "width": None,
                    "height": None,
                    "fps": 30,
                    "timeout_sec": 5,
                },
                "target": {
                    "enabled": False,
                    "filename": None,
                    "gstreamer": None,
                    "port": None,
                    "width": 1280,
                    "height": 720,
                    "fps": 30,
                },
                "ml": {
                    "enabled": True,
                    "model_dir": "/model/",
                    "framework": "MXNET",
                    "network": "model",
                    "score_threshold": 0.5,
                    "border_size": 1,
                    "font_scale": 0.5,
                    "device_type": "GPU",
                    "input_shape": [1, 3, 512, 512],
                    "select_top": True,
                    "selected_classes": None,  # List of class IDs
                    "subgroups": ["logo", "carplate"],
                    "sort_by_coordinate": [
                        0,
                        False,
                    ],  # Cooridnates: [topLeft.x topLeft.y bottomRight.x bottomRight.y]
                    "draw_bounding_boxes": False,
                    "lane_zone": None,  # [[100, 700], [1200, 700], [1200, 1000], [100, 1000]]
                    "zone_point": "BOTTOM",  # BOTTOM, LEFT, RIGHT, TOP, CENTER
                },
                "tracker": {
                    "enabled": True,
                    "max_age": 1,
                    "min_hits": 3,
                    "draw_bounding_box": False,
                },
                "voter": {
                    "enabled": True,
                    "override_enabled": False,
                    "send_top_classes": False,
                    "overridden_classes": [16, 17, 18, 19, 30, 31],
                    "buffer_size": 10,
                    "threshold": 1.0,
                    "draw_bounding_box": False,
                },
                "ocr": {
                    "enabled": True,
                    "model_dir": "/data/ocr/",
                    "send_carplateImage": False,
                    "network": "ocr-net",
                },
                "upload": {
                    "enabled": False,
                    "topic": None,
                    "save_carplateImage": False,
                    "save_frame": True,
                    "trigger": "push",  # Keyword to be set in metadata on trigger
                    "stream": "inference_results",
                    "max_size": 512435456,  # Default is 512 MB
                    "stream_segment_size": 64777216,  # Default is 64 MB.
                    "time_to_live_millis": None,
                    "persistence": "Memory",  # File or Memory
                    "jpeg_quality": 50,
                },
                "measure_fps": False,
                "num_frames": -1,
                "debug": False,
                "log_interval": 30,
                "debug_file": "/video/time_tracker.csv",
            }
        )

        # Set params from env variables
        self["greengrass"] = StreamConfig.IsGreengrassEnabled()

        # Overwrite all default params with the input ones
        if len(args) > 0:
            self.update(StreamConfig.DeepUpdate(self, args[0]))

        # Automatically enable FPS measurement for num_frames > 0
        if self["num_frames"] > 0:
            self["measure_fps"] = True

        # Update some fields with default values if wrong input
        if self.get("source.type") == "GSTREAMER":
            useDefault = True
            if self.isSet("source.name"):
                if "!" in self.get("source.name"):
                    useDefault = False
                if self.isSet("source.ip"):
                    try:
                        credentials = get_secret()
                        username = (
                            f'{self.get("info.vehiclePlazaID")}{self.get("info.vehicleLaneID")}'
                        )
                        password = credentials["CAM_PASS"]
                        ip = self.get("source.ip")
                        port = self.get("source.port")
                        rtspUrl = "rtsp://%s:%s@%s:%d" % (username, password, ip, port)
                        self["source"][
                            "name"
                        ] = f"rtspsrc location={rtspUrl} latency=0 ! queue ! rtph264depay ! decodebin ! videoconvert ! appsink sync=false"
                        useDefault = False
                    except Exception as e:
                        raise Exception("AWS secret manager not configured : " + repr(e))
            if useDefault:
                gstSrc = StreamConfig.DEFAULT_GSTREAMER_SRC
                gstSrc = gstSrc.replace("width=X", "width=%d" % self.get("source.width"))
                gstSrc = gstSrc.replace("height=Y", "height=%d" % self.get("source.height"))
                self["source"]["name"] = gstSrc

        # Update source field
        if self.get("source.type") == "RTSP":
            try:
                credentials = get_secret()
                username = f'{self.get("info.vehiclePlazaID")}{self.get("info.vehicleLaneID")}'
                password = credentials["CAM_PASS"]
                ip = self.get("source.ip")
                port = self.get("source.port")
                self["source"]["name"] = "rtsp://%s:%s@%s:%d" % (username, password, ip, port)
            except Exception as e:
                raise Exception("AWS secret manager not configured : " + repr(e))

        # Update target fields
        if self.isSet("target.port"):
            gstDst = StreamConfig.DEFAULT_GSTREAMER_DST
            gstDst = gstDst.replace("port=X", "port=%d" % self.get("target.port"))
            self["target"]["gstreamer"] = gstDst

        # Assume trigger on detection if trigger is None
        if self.get("upload.trigger") is None:
            self["upload"]["trigger"] = StreamConfig.DETECTION_KEY

    @staticmethod
    def DeepUpdate(original, update):
        """
        Recursively update a dict. Subdicts won't be overwritten but also updated.
        """
        if not isinstance(original, collections.Mapping):
            return update
        for key, value in update.items():
            if isinstance(value, collections.Mapping):
                original[key] = StreamConfig.DeepUpdate(original.get(key), value)
            else:
                original[key] = value
        return original

    def isGreengrassEnabled(self):
        return self["greengrass"]

    @staticmethod
    def IsGreengrassEnabled():
        for envvar in os.environ:
            if envvar == "AWS_GG_MQTT_ENDPOINT":
                return True
            elif envvar == "AWS_GG_HTTP_ENDPOINT":
                return True
            elif envvar == "AWS_GG_MQTT_PORT":
                return True
        return False

    @staticmethod
    def LoadJSON(filename):
        with open(filename) as json_file:
            return json.load(json_file)

    @staticmethod
    def LoadCSV(filename):
        with open(filename) as csvfile:
            reader = csv.reader(csvfile)
            classes = {}
            groups = {}
            for i in reader:
                classes[i[0]] = i[1]
                groups[i[0]] = i[2]
            return classes, groups  # a dictionary with values in tuple

    @staticmethod
    def LoadConfigFromFile(filename=None):
        if filename is None:
            filename = StreamConfig.DEFAULT_CONFIG_FILE
        return StreamConfig.LoadJSON(filename)

    @staticmethod
    def LoadClassesFromFile(filename=None):
        if filename is None:
            filename = StreamConfig.DEFAULT_CLASSES_FILE
        return StreamConfig.LoadCSV(filename)

    @staticmethod
    def __get(obj, key):
        if type(key) is str:
            chain = key.split(".")
        else:
            chain = key
        _key = chain.pop(0)
        if _key in obj:
            return StreamConfig.__get(obj[_key], chain) if chain else obj[_key]
        return None

    def get(self, key):
        return StreamConfig.__get(self, key)

    def isSet(self, key):
        found = self.get(key)
        return found is not None


In [4]:
# from stream_config import StreamConfig
import cv2
from pydarknet import Detector, Image
import numpy as np
import pandas as pd
import os
from time_tracker import TimeTracker


class OCR:
    def __init__(self, config, time_tracker_config):
        self.time_tracker = time_tracker_config["time_tracker"]
        self.num_frame = time_tracker_config["num_frame"]
        self.time_tracker_file = time_tracker_config["time_tracker_file"]
        self.enabled = config["enabled"]
        if not self.enabled:
            return
        ocr_netcfg = "{}{}.cfg".format(config["model_dir"], config["network"])
        ocr_weights = "{}{}.weights".format(config["model_dir"], config["network"])
        ocr_dataset = "{}{}.data".format(config["model_dir"], config["network"])

        if (
            not os.path.exists(ocr_netcfg)
            or not os.path.exists(ocr_weights)
            or not os.path.exists(ocr_dataset)
        ):
            raise Exception(
                "Model files not found: %s, %s, %s" % (ocr_netcfg, ocr_netcfg, ocr_dataset)
            )

        self.net = Detector(
            bytes(ocr_netcfg, encoding="utf-8"),
            bytes(ocr_weights, encoding="utf-8"),
            0,
            bytes(ocr_dataset, encoding="utf-8"),
        )

        self.lp_image = None
        self.save_carplate = config["send_carplateImage"]

    @TimeTracker
    def rotate(self, image, angle, center=None, scale=1.0):
        """
        This function take image and angle as input and return
        rotated image as output
        """
        (h, w) = image.shape[:2]
        if center is None:
            center = (w / 2, h / 2)

        # Perform the rotation
        M = cv2.getRotationMatrix2D(center, angle, scale)
        rotated = cv2.warpAffine(image, M, (w, h))

        return rotated

    @TimeTracker
    def carplate_rotation(self, image):
        """
        This function take image as input, detect the angle and
        return the rotated image
        """
        src = image.copy()
        scale = 1
        delta = 0
        ddepth = cv2.CV_16S

        # Grayscale and Canny Edges extracted:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 100, 170, apertureSize=3)

        try:
            lines = cv2.HoughLines(edges, 1, (np.pi / 180) * 4, 20)
            avg_theta = []
            for line in lines[:5]:
                for rho, theta in line:
                    if theta * 180 / np.pi > 90 and theta * 180 / np.pi < 110:
                        avg_theta.append(theta * 180 / np.pi)
                        a = np.cos(theta)
                        b = np.sin(theta)
                        x0 = a * rho
                        y0 = b * rho
                        x1 = int(x0 + 1000 * (-b))
                        y1 = int(y0 + 1000 * (a))
                        x2 = int(x0 - 1000 * (-b))
                        y2 = int(y0 - 1000 * (a))
            img = self.rotate(src, np.mean(avg_theta) - 90, center=None, scale=1.0)

            return img
        except Exception as e:
            return image

    @TimeTracker
    def noise_removal(self, image):
        se1 = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
        se2 = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
        mask = cv2.morphologyEx(image, cv2.MORPH_CLOSE, se1)
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, se2)
        return mask

    def histogram_equalization(self, img_in):
        # segregate color streams
        b, g, r = cv2.split(img_in)
        h_b, bin_b = np.histogram(b.flatten(), 256, [0, 256])
        h_g, bin_g = np.histogram(g.flatten(), 256, [0, 256])
        h_r, bin_r = np.histogram(r.flatten(), 256, [0, 256])
        # calculate cdf
        cdf_b = np.cumsum(h_b)
        cdf_g = np.cumsum(h_g)
        cdf_r = np.cumsum(h_r)

        # mask all pixels with value=0 and replace it with mean of the pixel values
        cdf_m_b = np.ma.masked_equal(cdf_b, 0)
        cdf_m_b = (cdf_m_b - cdf_m_b.min()) * 255 / (cdf_m_b.max() - cdf_m_b.min())
        cdf_final_b = np.ma.filled(cdf_m_b, 0).astype("uint8")

        cdf_m_g = np.ma.masked_equal(cdf_g, 0)
        cdf_m_g = (cdf_m_g - cdf_m_g.min()) * 255 / (cdf_m_g.max() - cdf_m_g.min())
        cdf_final_g = np.ma.filled(cdf_m_g, 0).astype("uint8")
        cdf_m_r = np.ma.masked_equal(cdf_r, 0)
        cdf_m_r = (cdf_m_r - cdf_m_r.min()) * 255 / (cdf_m_r.max() - cdf_m_r.min())
        cdf_final_r = np.ma.filled(cdf_m_r, 0).astype("uint8")
        # merge the images in the three channels
        img_b = cdf_final_b[b]
        img_g = cdf_final_g[g]
        img_r = cdf_final_r[r]

        img_out = cv2.merge((img_b, img_g, img_r))
        # validation
        equ_b = cv2.equalizeHist(b)
        equ_g = cv2.equalizeHist(g)
        equ_r = cv2.equalizeHist(r)
        equ = cv2.merge((equ_b, equ_g, equ_r))
        return img_out

    @TimeTracker
    def contour_image(self, image):
        """
        This function take license plate image as input and return only text portion
        of license plate image as output
        """

        img = self.carplate_rotation(image)
        imw, imh = img.shape[:2]

        img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img_gray = cv2.GaussianBlur(img_gray, (3, 3), 0)

        simg = cv2.Sobel(img_gray, -1, 1, 0)
        _, bw = cv2.threshold(simg, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)

        bw = self.noise_removal(bw)

        se = cv2.getStructuringElement(cv2.MORPH_RECT, (int(imw / 4), 4))
        mimg = cv2.morphologyEx(bw, cv2.MORPH_CLOSE, se)

        contours, _ = cv2.findContours(mimg, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
        chunks = []

        for i, c in enumerate(contours):
            # Calculate the area of each contour
            area = cv2.contourArea(c)
            x, y, w, h = cv2.boundingRect(c)
            if w / h > 0.4 and w * h > imw * imh * 0.04:
                chunks.append([x, y, x + w, y + h])
        # img = self.histogram_equalization(img)
        if len(chunks) == 1:
            w_ = chunks[0][2] - chunks[0][0]
            h_ = chunks[0][3] - chunks[0][1]
            x1 = max(chunks[0][0] - int(w_ * 0.1), 0)
            y1 = max(chunks[0][1] - int(h_ * 0.1), 0)
            x2 = min(chunks[0][2] + int(w_ * 0.1), imh)
            y2 = min(chunks[0][3] + int(h_ * 0.1), imw)

            return img[y1:y2, x1:x2]

        elif len(chunks) > 1:
            w_ = max([i[2] for i in chunks]) - min([i[0] for i in chunks])
            h_ = max([i[3] for i in chunks]) - min([i[1] for i in chunks])
            x1 = max(min([i[0] for i in chunks]) - int(w_ * 0.1), 0)
            y1 = max(min([i[1] for i in chunks]) - int(h_ * 0.1), 0)
            x2 = min(max([i[2] for i in chunks]) + int(w_ * 0.1), imh)
            y2 = min(max([i[3] for i in chunks]) + int(h_ * 0.1), imw)

            return img[y1:y2, x1:x2]
        else:
            return img

    @TimeTracker
    def cropLpWithMargin(self, frame, params):
        duplicate_frame = frame.copy()
        W, H = duplicate_frame.shape[:2]
        box = params["inner"]["carplate"].bounding_box
        x1 = box[0]
        y1 = box[1]
        x2 = box[2]
        y2 = box[3]
        h = y2 - y1
        w = x2 - x1
        lpx1 = max(int(x1 - (w * 0.2)), 0)
        lpx2 = min(int(x2 + (w * 0.2)), H)
        lpy1 = max(int(y1 - (h * 0.2)), 0)
        lpy2 = min(int(y2 + (h * 0.2)), W)
        carplate_frame = duplicate_frame[lpy1:lpy2, lpx1:lpx2]
        return carplate_frame

    def get_sequence_data(self, data):
        df = data.sort_values(by=["centery"])
        lst = list(df["centery"])
        count = 0
        line = []
        for i in range(len(lst)):
            if i != len(lst) - 1:
                if lst[i + 1] - lst[i] > 8:
                    line.append(count)
                    count += 1
                else:
                    line.append(count)
            else:
                line.append(count)
        df["line"] = line
        df = df.sort_values(by=["line", "centerx"])
        df.reset_index(drop=True, inplace=True)
        return df

    @TimeTracker
    def Image_to_string(self, img_path):
        """
        This function lp image as input and return recognition results
        as output
        """
        img_darknet = Image(img_path)

        results = self.net.detect(img_darknet, thresh=0.5, nms=0.25)
        if len(results):
            dct = {
                "centerx": [int(i[2][0] + i[2][2] / 2) for i in results],
                "centery": [int(i[2][1] + i[2][3] / 2) for i in results],
                "text": [str(i[0].decode("utf-8")) for i in results],
            }
            lp_str = "".join(self.get_sequence_data(pd.DataFrame(dct))["text"])
        else:
            lp_str = "-"
        return lp_str

    @TimeTracker
    def process(self, frame, params):
        if not self.enabled:
            params["LP"] = "None"
            return params

        lp = params[StreamConfig.SUBDETECTION_KEY]["carplate"]

        if lp is not None:
            original_img = self.cropLpWithMargin(frame, params)
            cropped_img = self.contour_image(original_img)
            carplate_number = self.Image_to_string(cropped_img)
            if len(carplate_number) >= 3:
                params["LP"] = carplate_number.replace("I", "1").replace("O", "0")
                self.lp_image = original_img
            else:
                self.lp_image = None
        if self.save_carplate:
            params["lp_image"] = self.lp_image
        return params

ModuleNotFoundError: No module named 'pydarknet'