# Lab 3.2: Traffic sign detection

Welcome to Lab 3.2. In this lab, we will learn how to use a pre-trained YOLO model for traffic sign detection.


## Instructions

Below are detailed instructions to help you program and run the traffic sign tracking.

### Libraries

In [1]:
import torch
from PIL import Image
import cv2
import numpy as np
import os
import pathlib
# Ensure proper path handling for Windows
temp = pathlib.PosixPath
pathlib.PosixPath = pathlib.WindowsPath

# Handle library conflicts
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

In [4]:
class SignDetector():
    def __init__(self, model_path, force_reload):
        # Initialize the SignDetector object
        self.model = self.load_model(model_path, force_reload)
        # Print the list of class names
        self.classes = self.model.names
        print(self.classes)

        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model.to(self.device)
        print("Using Device: ", self.device)

    def load_model(self, model_path, force_reload):
        # Load the model from the model_path or a pre-trained model
        if model_path:
            # Load a custom model from the model_path
            model = torch.hub.load(
                'ultralytics/yolov5',
                'custom',
                path=model_path,
                force_reload=force_reload
            )
        else:
            # Load the pre-trained yolov5s model
            model = torch.hub.load(
                'ultralytics/yolov5',
                'yolov5s',
                pretrained=True
            )

        return model  # Return the loaded model

    def score_frame(self, frame):
        # Convert the frame from BGR to RGB
        frame_rgb = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        # Predict objects in the frame
        results = self.model(frame_rgb)
        # Extract labels and coordinates of detected objects
        labels, cord = results.xyxyn[0][:, -1], results.xyxyn[0][:, :-1]

        return labels, cord  # Return labels and coordinates

    def class_to_label(self, x):
        # Convert class number to class label
        return self.classes[int(x)]  # Return the corresponding class label

    def plot_boxes(self, results, height, width, confidence=0.3):
        # Draw bounding boxes around detected objects
        # Extract labels and coordinates from the results
        labels, cord = results
        # List to store detections
        detections = []

        for i in range(len(labels)):
            # Coordinates of the object
            row = cord[i]
            # Check the confidence of the object
            if row[4] >= confidence:
                # Calculate the bounding box coordinates
                x1, y1, x2, y2 = int(
                    row[0]*width), int(row[1]*height), int(row[2]*width), int(row[3]*height)

                # Confidence of the detection
                conf = float(row[4].item())
                # Label of the object
                class_label = self.class_to_label(
                    labels[i])
                # Add the object to the list of detections
                detections.append(
                    ([x1, y1, int(x2-x1), int(y2-y1)], conf, class_label))

        return detections  # Return the list of detections


### Implement the YOLOv5 Pre-trained Model and DeepSORT



Solution to exercise 1


In [None]:
from deep_sort_realtime.deepsort_tracker import DeepSort
ip_lap_lead = "192.168.109.105"
port_lap_lead = 7001

ip_lap = "192.168.109.105"
port_lap = 3000

# Define the path to the YOLO model weights for traffic sign detection
model_path = r'..\Other\YoloWeights\TrafficSign.pt'

# Initialize the traffic sign detector with the specified model path
traffic_signs_detector = SignDetector(model_path=model_path, force_reload=True)

# Initialize the object tracker with specified parameters
object_tracker = DeepSort(
    max_age=3, n_init=2, nms_max_overlap=1.0, max_cosine_distance=0.3)

In [5]:
import socket
BUFFER_SIZE = 65355


def create_udp_socket_listen(ip, port):
    """Create and bind a UDP socket for listening"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((ip, port))
    return sock


def create_udp_socket_send():
    """Create and configure a UDP socket for sending"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, BUFFER_SIZE)
    return sock

In [5]:
from queue import Queue

END_MARKER = b'\xff\xd9'
IMAGE_RESIZE_DIMS = (1280, 720)
queue_image = Queue()
MAX_QUEUE_SIZE = 3


def get_image_from_cam():
    """Receive images via UDP and put them in the queue."""
    global stop_event
    cap = cv2.VideoCapture(0)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)

    while cap.isOpened() and (not stop_event.is_set()):
        _, image = cap.read()
        queue_image.put(image)

        if queue_image.qsize() >= MAX_QUEUE_SIZE:
            queue_image.get()

In [None]:
def get_image_from_leader():
    """Receive images via UDP and put them in the queue."""
    global stop_event

    sock = create_udp_socket_listen(ip_lap, port_lap)
    print(f"Listening on IP {ip_lap}, UDP port {port_lap}")

    image_data = bytearray()
    while not stop_event.is_set():
        data, _ = sock.recvfrom(BUFFER_SIZE)
        image_data.extend(data)

        if data.endswith(END_MARKER):
            try:
                image_array = np.frombuffer(image_data, dtype=np.uint8)
                image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
                queue_image.put(image)

                if queue_image.qsize() >= MAX_QUEUE_SIZE:
                    queue_image.get()
            except Exception as e:
                print(f"Failed to convert image: {str(e)}")

            image_data = bytearray()

In [None]:
import time

# Define constants for different directions and commands
STRAIGHT = 1      # Move straight
SLOW = 2          # Slow down
TURNLEFT = 5      # Turn left
TURNRIGHT = 6     # Turn right
FORWARD = 7       # Move forward
BACKWARD = 8      # Move backward
STANDBY = 9       # Standby mode
STOP = 10         # Stop


def handle_sign_command(sign_list):
    """
    Determine the command to send based on the detected traffic signs.

    Parameters:
    - sign_list: A list of detected traffic signs, which may include colors and directions.

    Returns:
    - cmd: The command corresponding to the detected traffic signs.
    """
    # If the traffic light is red, stop the vehicle
    if 'red' in sign_list:
        cmd = STOP
        return cmd

    # If the sign indicates to stop, set the vehicle to standby mode
    if 'stop' in sign_list:
        cmd = STANDBY
        return cmd

    # If the traffic light is yellow, slow down the vehicle
    if 'yellow' in sign_list:
        cmd = SLOW
        return cmd

    # If the sign indicates to turn left, set the turn left command
    if 'left' in sign_list:
        cmd = TURNLEFT
        return cmd

    # If the sign indicates to turn right, set the turn right command
    if 'right' in sign_list:
        cmd = TURNRIGHT
        return cmd

    # If the traffic light is green or indicates to go straight, move forward
    if 'straight' in sign_list or 'green' in sign_list:
        cmd = FORWARD
        return cmd

    # Default command if no specific sign is detected
    else:
        cmd = STRAIGHT
        return cmd

In [None]:
def detect_traffic_sign():
    """Detect traffic signs and send commands based on detection results."""
    global stop_event

    # Create a socket for sending UDP messages
    sock = create_udp_socket_send()

    # Initialize variables for FPS calculation
    prev_time = time.time()
    frame_count = 0
    fps = 0
    prev_cmd = STOP

    while not stop_event.is_set():
        sign_list = set()
        try:
            # Get an image from the queue
            image = queue_image.get()

            # Update frame count and calculate FPS
            frame_count += 1
            tmp_time = time.time() - prev_time
            if tmp_time >= 1:
                fps = frame_count / tmp_time
                prev_time = time.time()
                frame_count = 0

            # Score the frame using the traffic signs detector
            results = traffic_signs_detector.score_frame(image)

            # Get bounding boxes and class labels for detected objects
            detections = traffic_signs_detector.plot_boxes(
                results, height=image.shape[0], width=image.shape[1], confidence=0.5)

            # Update object tracks
            tracks = object_tracker.update_tracks(detections, frame=image)

            for track in tracks:
                if not track.is_confirmed():
                    continue

                # Get tracking information
                track_id = track.track_id
                bbox_x1, bbox_y1, bbox_x2, bbox_y2 = map(int, track.to_ltrb())
                class_label = track.det_class
                conf = track.det_conf if track.det_conf is not None else 0.0

                # If the object is at the edge of the frame, mark it as a detected sign
                if bbox_y2 >= image.shape[0] - 10 or bbox_x1 < 10 or bbox_x2 > image.shape[1] - 10:
                    sign_list.add(class_label)

                    # Draw bounding box with green color for detected signs
                    cv2.rectangle(image, (bbox_x1, bbox_y1),
                                  (bbox_x2, bbox_y2), (0, 255, 0), 2)
                else:
                    # Draw bounding box with red color for normal tracking
                    cv2.rectangle(image, (bbox_x1, bbox_y1),
                                  (bbox_x2, bbox_y2), (0, 0, 255), 2)

                # Annotate the image with tracking information
                cv2.putText(image, "ID: " + str(track_id), (bbox_x1, bbox_y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
                cv2.putText(image, f'Class: {class_label}', (bbox_x1, bbox_y1 - 45),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
                cv2.putText(image, f'Conf: {conf:.2f}', (bbox_x1, bbox_y1 - 70),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

            # Determine the command based on detected signs
            cmd = handle_sign_command(sign_list)

        except Exception as e:
            # Handle errors in image processing
            print("Error processing image:", e)
            cmd = STOP

        # Send the command via UDP if it has changed
        if cmd != prev_cmd:
            try:
                data_cmd = f"{cmd}"
                data_bytes_cmd = data_cmd.encode()
                sock.sendto(data_bytes_cmd, (ip_lap_lead, port_lap_lead))
                print(f"Sent UDP message: '{cmd}' to {
                      ip_lap_lead}:{port_lap_lead}")
                prev_cmd = cmd
            except Exception as e:
                print(f"Error sending UDP message: {e}")

        # Display the FPS on the image
        cv2.putText(image, f'FPS: {int(fps)}', (20, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 2)

        # Show the processed image
        cv2.imshow('image', image)

        # Exit the loop if the 'Esc' key is pressed
        if cv2.waitKey(1) & 0xFF == 27:
            stop_event.set()
            break

    # Close all OpenCV windows
    cv2.destroyAllWindows()

### Test the Traffic sign detection model by using laptop camera

Solution to exercise 2

In [None]:
import threading
stop_event = threading.Event()

threads = [
    threading.Thread(target=get_image_from_leader,
                     name='GetSendImageThread'),
    threading.Thread(target=detect_traffic_sign,
                     name='ProcessImageThread'),
]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()



### Implement Traffic sign detection model to control AutoCar kit

Solution to exercise 3

In [None]:
import threading
stop_event = threading.Event()

threads = [
    threading.Thread(target=get_image_from_cam,
                     name='GetSendImageThread'),
    threading.Thread(target=detect_traffic_sign,
                     name='ProcessImageThread'),
]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()