In [None]:
import cv2
import matplotlib.pyplot as plt
import time
import numpy as np
import tensorflow as tf
import paho.mqtt.client as mqtt

In [None]:
#!pip install paho-mqtt
# Load the model
model = tf.keras.models.load_model("cnn12-2.h5")

In [None]:
### TODO: Paste IoU function here
def IoU(box_predicted, box_true, verbose = True):
    (x1_pred, y1_pred, x2_pred, y2_pred) = box_predicted
    (x1_true, y1_true, x2_true, y2_true) = box_true

    # calculate intersection area
    xx1 = np.maximum(x1_pred, x1_true)
    yy1 = np.maximum(y1_pred, y1_true)
    xx2 = np.minimum(x2_pred, x2_true)
    yy2 = np.minimum(y2_pred, y2_true)

    w = np.maximum(0, xx2 - xx1 + 1)
    h = np.maximum(0, yy2 - yy1 + 1)
    intersectionArea = w * h

    if verbose:
        print("w:", w)
        print("h:", h)
        print("Intersection area:", intersectionArea)

    # calculate the areas for the prediction and ground-truth (true) boxes
    boxPredictedArea = (x2_pred - x1_pred + 1) * (y2_pred - y1_pred + 1)
    boxTrueArea = (x2_true - x1_true + 1) * (y2_true - y1_true + 1)

    # calculate the union area
    unionArea = boxPredictedArea + boxTrueArea - intersectionArea

    if verbose:
        print("Union area:", unionArea)

    # return IoU
    return intersectionArea / float(unionArea)

In [None]:
def NMS(boxes, probs, threshold=0.5, verbose=False):# to remove the rectangles
    D_index = []

    # sort indexes by probability
    B_index = np.argsort(probs)

    # keep looping while some indexes still remain in the indexes list
    while len(B_index) > 0:
        # append the index of the rectangle with the highest probability to D
        last = len(B_index) - 1
        d_index = B_index[last]
        D_index.append(d_index)
        B_index = np.delete(B_index, [last]) # delete the index from index list B

        overlaps=[]
        for b_index in B_index[:last]: #
            b = boxes[b_index]
            d = boxes[d_index]
            ### TODO: Calculate the overlap between the two boxes
            overlap = IoU(b, d, verbose=verbose)
            ### TODO: Append overlap to list overlaps
            overlaps.append(overlap)

        overlaps = np.asarray(overlaps) # convert to array, necessary for comparison below (np.where(overlaps > threshold))

        # delete candidate rectangles (indexes) from B who overlap greather than the threshold hyper parameter
        indexes_to_delete = np.where(overlaps > threshold)
        ### TODO: Remove the indexes 'indexes_to_delete' from index list B
        B_index = np.delete(B_index, indexes_to_delete[0])

    return D_index

In [None]:
def read_img(path):
  """
    Read Image
  """
  img = cv2.cvtColor(cv2.imread(path), cv2.COLOR_BGR2RGB)
  return img

In [None]:
def create_selective_regions(img):
  """
      Take random regions from the image
  """
  # selective search
  ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation() 
  ss.setBaseImage(img)
  ss.switchToSelectiveSearchFast()
  rects = ss.process()
  print("Number of found rectangles: " + str(len(rects)))
  return rects

In [None]:
def create_mini_images(img, rects):
  """
      Transform the rectangles (coordinates x,y,w,h) into small images
  """
  imgs = []
  for (startX, startY, width, height) in rects:
      imgs.append(img[startY:(startY + height), startX:(startX + width)]) 
  return imgs

In [None]:
def preprocess_imgs(imgs, inputsize):
    """
        Resize the images and normalize (transform the values to a range 0,1)
    """
    for i in range(len(imgs)):
        # resize it to the required input dimensions of our trained CNN
        imgs[i] = cv2.resize(imgs[i], inputsize, interpolation=cv2.INTER_CUBIC)
        # normalize the pixel values
        imgs[i] = imgs[i] * 1./255
    return imgs

def prob_and_label_multiple_imgs(imgs, inputsize = (56, 56)):
    """
        Feed the images to the model and get the classes and probabilities
    """
    print("\nDetecting the class of each region:")
    (H, W) = inputsize
    imgs = preprocess_imgs(imgs, inputsize)
    imgs = np.resize(imgs, (len(imgs), H, W, 3)) # array size: number of images(len(imgs))x56x56x3(RGB)
    preds = model.predict(imgs)
    print("Done!\n")
    probs = np.max(preds, axis=-1)
    labels = np.argmax(preds, axis=-1)
    return (probs, labels)

In [None]:
def remove_background(probs, labels, rects):
    """
        Remove the background regions and their probabilities
    """
    new_labels = []
    new_rects = []
    new_probs = []
    for x in np.unique(labels):
          if x!= 43: # class 43 is the background
              indexes = np.where(labels == x)[0]
              for i in indexes:
                  new_probs.append(probs[i])
                  new_rects.append(rects[i].tolist())
                  new_labels.append(x)
    return new_probs, new_labels, new_rects


def fill_output_dict(boxes, labels):
    """
        Output as a dictionary so that each key is the class/label 
        of the shield and the value is coordinates of the box(es)
    """
    output = {}
    for x in np.unique(labels):
        indexes = np.where(labels==x)[0]
        output[x] = [boxes[i] for i in indexes]
    return output

def convert_xywh_to_xxyy(rect):
    """
        Convert Rectangle coordinates from xywh to xyxy
    """
    startX, startY, width, height = rect
    return startX, startY, (startX + width), (startY + height)

def perform_nms_on_candidate_rectangles(probs, labels, rects, threshold=.5):
    """
        Perform NMS on candidate rectangles
    """
    probs, labels, rects = remove_background(probs, labels, rects)
    rects = [convert_xywh_to_xxyy(x) for x in rects ]
    D_index = NMS(rects, probs, threshold=threshold)
    selected_boxes = [rects[i] for i in D_index]
    selected_labels = [labels[i] for i in D_index]
    output = fill_output_dict(selected_boxes, selected_labels)
    return output

In [None]:
def detect_shield(path, threshold=.5):
    # Read Image
    img = read_img(path)
    # Select Regions
    rects = create_selective_regions(img)
    # Get mini Images
    imgs = create_mini_images(img, rects)
    # Classify Images
    (probs, labels) = prob_and_label_multiple_imgs(imgs)
    # Perform NMS
    output = perform_nms_on_candidate_rectangles(
        probs, labels, rects, threshold)
    return output

# Test

In [None]:
path="debug_image.png"
RATIO = .1
THRESHOLD = .001
output_rects = detect_shield(path=path, ratio=RATIO, threshold=THRESHOLD)

output_rects

# Main Function

In [20]:
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import io
import base64

def detect_traffic_sign(image_file_name):
    try:
        path = image_file_name
        THRESHOLD = 0.01

        #coordinates?
        output_rects = detect_shield(path=path, threshold=THRESHOLD)


        fig, ax = plt.subplots()
        img = read_img(path, ratio=RATIO)
        ax.imshow(img)

        for key in output_rects.keys():
            c_rect = output_rects[key]
            for x in c_rect:
                rect = patches.Rectangle((x[0], x[1]), x[2] - x[0], x[3] - x[1], linewidth=1, edgecolor='r', facecolor='none')
                ax.add_patch(rect)
                ax.text(x[0] + 3, x[1] - 5, key, color="r")

        ax.axis('off')

        # Save the plot to a BytesIO object
        buf = io.BytesIO()
        fig.savefig(buf, format='png', bbox_inches='tight', pad_inches=0)
        buf.seek(0)
        plt.close(fig)  # Close the figure to release resources
        
        # Encode the plot to a base64 string
        img_base64 = base64.b64encode(buf.read()).decode('utf-8')
        buf.close()

        output_rects = {int(key): value for key, value in output_rects.items()}

        return output_rects, img_base64  # Return the base64 string

    except Exception as e:
        print(f"Error processing image {image_file_name}: {repr(e)}")
        return None, None  # Return None indicating error


# MQTT Loop: Run this

In [21]:
import paho.mqtt.client as mqtt
import base64
import json
import base64


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("adas/image", 0) 

# The callback for when a PUBLISH message is received from the server.
def on_message(mosq, obj, msg):
    
    try:
      if msg.topic.startswith("adas/image"):
        json_message = msg.payload.decode('utf-8')

        # Parse JSON
        data = json.loads(json_message)

        # the image comes in a  format of byte64 so we need to decode it first and then we can save it as .png
        # Decode Base64 image data ->
        image_base64 = data['image']
        image_bytes = base64.b64decode(image_base64)

        #save it as 'debug_image.png'
        with open('debug_image.png', 'wb') as image_file:
            image_file.write(image_bytes)

        image_file_name = "debug_image.png"

        #run the retangles function that returns the calssification photo as base64 and also the classes with coordinates
        output_rects, img_base64 = detect_traffic_sign(image_file_name)
        print("Class and coordinates: ", output_rects)

        if img_base64:
            # Create a JSON object
            json_data = {
                "keys": output_rects,
                "image": img_base64
            }

            # Convert JSON object to string
            json_string = json.dumps(json_data)

            # Publish the JSON string
            client.publish("adas/", json_string)
            print('Image processed and sent!')
            print("-------------------------------------")

    except Exception as e:
      print("Error processing the messages::", repr(e))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("194.95.24.122", 1883, 60)
client.loop_forever()



  client = mqtt.Client()


Connected with result code 0
Number of found rectangles: 624

Detecting the class of each region:
Done!

Class and coordinates:  {12: [(63, 64, 108, 113)], 40: [(148, 133, 169, 156)]}
Image processed and sent!
-------------------------------------
