#Setup Dependencies

In [None]:
!pip install -q --upgrade ultralytics gdown onnx onnxruntime onnxsim ncnn filterpy scikit-image lap lapx fastapi kaleido lida python-multipart uvicorn tflite-support

#Github Clone and Setup Dependencies

In [None]:
!git clone --depth 1 https://github.com/roguehunter7/CornerSentinal

#Preparing Fresh Database from FiftyOne and Roboflow

In [None]:
!pip install --upgrade fiftyone ultralytics roboflow --no-cache

In [None]:
import fiftyone as fo
import fiftyone.zoo as foz
from fiftyone import ViewField as F
import numpy as np
import os
from tqdm import tqdm
import fiftyone.utils.random as four
from ultralytics import YOLO

EXPORT_DIR = "/content/"

classes=["Ambulance", "Bus", "Car", "Motorcycle", "Taxi", "Truck", "Van"]

dataset = foz.load_zoo_dataset(
    "open-images-v7",
    splits=["train"],
    only_matching=True,
    classes=classes,
    label_types="detections",
    num_workers = 16,
    shuffle=True,
    max_samples = 5000,
    cleanup=True
)

dataset.untag_samples(dataset.distinct("tags"))

four.random_split(
    dataset,
    {"train": 0.9, "val": 0.1}
)

def export_yolo_data(
    samples,
    export_dir,
    classes,
    label_field = "ground_truth",
    split = None
    ):

    if type(split) == list:
        splits = split
        for split in splits:
            export_yolo_data(
                samples,
                export_dir,
                classes,
                label_field,
                split
            )
    else:
        if split is None:
            split_view = samples
            split = "val"
        else:
            split_view = samples.match_tags(split)

        split_view.export(
            export_dir=export_dir,
            dataset_type=fo.types.YOLOv5Dataset,
            label_field=label_field,
            classes=classes,
            split=split
        )

export_yolo_data(
    dataset,
    "vehicle_train",
    classes,
    split = ["train", "val"]
)

In [None]:
!pip install -q --upgrade git+https://github.com/ultralytics/ultralytics.git@main roboflow --no-cache
from roboflow import Roboflow
from google.colab import userdata
RoboflowKey = userdata.get('RoboflowKey')
rf = Roboflow(api_key=RoboflowKey)
project = rf.workspace("main-project-dih2s").project("cornersentinal")
dataset = project.version(12).download("yolov8")

In [None]:
!sed -i '$d' {dataset.location}/data.yaml
!sed -i '$d' {dataset.location}/data.yaml
!sed -i '$d' {dataset.location}/data.yaml
!sed -i '$d' {dataset.location}/data.yaml
!echo -e "test: ../test/images\ntrain: ../train/images\nval: ../valid/images" >> {dataset.location}/data.yaml

In [None]:
!mv -v /content/vehicle_train/images/train/* /content/CornerSentinal-12/train/images/
!mv -v /content/vehicle_train/images/val/* /content/CornerSentinal-12/valid/images/
!mv -v /content/vehicle_train/labels/train/* /content/CornerSentinal-12/train/labels/
!mv -v /content/vehicle_train/labels/val/* /content/CornerSentinal-12/valid/labels/
%cd /content/

#Training

In [None]:
from ultralytics import YOLO
!yolo task=detect mode=train model=yolov10n data=/content/CornerSentinal-12 epochs=100 plots=True imgsz=320 batch=-1 cache=True

In [None]:
!zip -r /content/detect1.zip /content/runs/detect/
!cp -r /content/detect1.zip /content/drive/MyDrive/

In [None]:
!ls /content/runs/detect/train

In [None]:
from IPython.display import Image

In [None]:
Image(filename = '/content/runs/detect/train/confusion_matrix.png', width = 700)

In [None]:
Image(filename = r'/content/runs/detect/train/results.png', width = 700)

In [None]:
Image(filename = r'/content/runs/detect/train/val_batch2_pred.jpg', width = 700)

In [None]:
from google.colab import runtime
runtime.unassign()

#Copying Trained Model From Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp -r /content/drive/MyDrive/detect.zip /content/
!unzip -q /content/detect.zip
!mv -v /content/content/runs /content/

In [None]:
!yolo export model=/content/runs/detect/train/weights/best.pt format=onnx simplify=True dynamic=True int8=True

#Benchmark Model

In [None]:
!yolo benchmark model=/content/runs/detect/train/weights/best.pt data='/content/vehicle_train/dataset.yaml' imgsz=320 int8=True device=cpu

In [None]:
!yolo export model=/content/runs/detect/train/weights/best.pt format=onnx simplify=True dynamic=True int8=True

#Program Video Inference

In [None]:
%cd CornerSentinal

##leftside

In [None]:
from collections import defaultdict
import cv2
import time
from ultralytics import YOLO
import numpy as np


# Load the YOLOv8 model
model = YOLO('yolov10n.pt')

# Open the video file
video_path = "test_images/leftside.mp4"
cap = cv2.VideoCapture(video_path)

# Constants for speed calculation
VIDEO_FPS = int(cap.get(cv2.CAP_PROP_FPS))
FACTOR_KM = 3.6
LATENCY_FPS = VIDEO_FPS / 2

# Define the output video file path with MP4 format
output_video_path = "/content/CornerSentinal/test_images/leftside_out.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # Use "mp4v" for H.264 compression
out = cv2.VideoWriter(output_video_path, fourcc, VIDEO_FPS, (int(cap.get(3)), int(cap.get(4))))

# Function to calculate Euclidean distance
def calculate_distance(p1, p2):
    return np.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2)

# Function to calculate speed using Euclidean distance
def calculate_speed(distances, factor_km, latency_fps):
    if len(distances) <= 1:
        return 0.0
    average_speed = (np.mean(distances) * factor_km) / latency_fps * 10
    return average_speed

# Function to generate 9-bit binary code based on conditions
def generate_binary_code(class_id, speed, is_stationary, is_wrong_side):
    binary_code = ['0'] * 8

    # Stationary bit
    binary_code[0] = '1' if is_stationary else '0'

    if class_id == 0:  # Ambulance
        binary_code[2:5] = '100'
    elif class_id in [2, 6, 4]:  # Car or Van or Taxi/Auto
        binary_code[2:5] = '010'
    elif class_id in [5, 1]:  # Bus or Truck
        binary_code[2:5] = '011'
    elif class_id == 3:  # Motorcycle
        binary_code[2:5] = '001'

    # Wrong side warning bit
    binary_code[5] = '1' if is_wrong_side else '0'

    # Replace speed section
    if speed > 60:  # Overspeed Vehicle
        binary_code[6:8] = '11'
    elif 40 <= speed < 60:
        binary_code[6:8] = '10'
    elif 1.5 <= speed < 40:
        binary_code[6:8] = '01'

    return ''.join(binary_code)

# Function to display warning message on the frame
def display_warning_message(frame, binary_code):
    warning_message = f"Warning: {binary_code}"
    cv2.putText(frame, warning_message, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)

# Store track history for each vehicle
track_history = defaultdict(list)
stationary_timers = defaultdict(float)

# Counter to keep track of frames
frame_counter = 0

# Placeholder for the previous frame and points for optical flow
prev_frame = None
prev_pts = None

# Placeholder for the previous binary code
prev_binary_code = None
recv_binary_code = None
prev_track_id = None

while cap.isOpened():
    ret = cap.grab()
    if ret:
        success, frame = cap.retrieve()

        # Check if YOLO inference should be performed on this frame
        if frame_counter % 2 == 0:
            results = model.track(frame, persist=True, tracker='bytetrack.yaml', imgsz=320, conf=0.25, int8=True)
            annotated_frame = results[0].plot()

            if results[0].boxes.id is not None:
                boxes = results[0].boxes.xywh.cpu().numpy().astype(int)
                class_id = results[0].boxes.cls.cpu().numpy().astype(int)
                track_ids = results[0].boxes.id.cpu().numpy().astype(int)

                for i, box in enumerate(boxes):
                    x, y, w, h = box
                    xmin, ymin, xmax, ymax = x, y, x + w, y + h
                    track = track_history[track_ids[i]]
                    track.append((float(x + w / 2), float(y + h / 2)))

                    if len(track) >= 2 and track[-2][1] < track[-1][1]:
                        distances = [calculate_distance(track[j], track[j + 1]) for j in range(len(track) - 1)]
                        speed = calculate_speed(distances, FACTOR_KM, LATENCY_FPS)
                        is_stationary = speed < 1.0
                        stationary_timers[track_ids[i]] = time.time() if not is_stationary else stationary_timers[
                            track_ids[i]]

                        if time.time() - stationary_timers[track_ids[i]] > 10.0:
                            is_stationary = True
                        is_wrong_side = False

                        binary_code = generate_binary_code(class_id[i], speed, is_stationary, is_wrong_side)

                        display_warning_message(annotated_frame, binary_code)
                        cv2.putText(annotated_frame, f"Speed: {speed:.2f} km/h", (int(x), int(y) - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
                        roi = frame[int(y):int(y + h), int(x):int(x + w)]

                        if prev_frame is not None and prev_pts is not None:
                            prev_frame_resized = cv2.resize(prev_frame, (roi.shape[1], roi.shape[0]))
                            flow = cv2.calcOpticalFlowPyrLK(prev_frame_resized, roi, prev_pts, None,
                                                             winSize=(15, 15), maxLevel=2)
                            flow_distances = np.sqrt(np.sum((prev_pts - flow[0]) ** 2, axis=2))

                            good_pts = flow_distances > 0.5
                            for j, is_good in enumerate(good_pts):
                                if is_good:
                                    x1, y1 = prev_pts[j].astype(int).ravel()
                                    x2, y2 = (x + flow[0][j][0], y + flow[0][j][1]).astype(int)
                                    cv2.line(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                            prev_frame = roi
                            prev_pts = np.array([[(int(w / 2), int(h / 2))]], dtype=np.float32)

    # Increment frame counter
    frame_counter += 1
    # Display the annotated frame
    out.write(annotated_frame)  # Save the frame to the output video
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()

In [None]:
!rm -rf '/content/leftside.mp4'
import os
# Set up NVIDIA GPU device for FFmpeg
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
# Convert leftside_out.mp4 to leftside.mp4 using GPU-accelerated FFmpeg
os.system(f"ffmpeg -hwaccel cuda -i '/content/CornerSentinal/test_images/leftside_out.mp4' -vcodec hevc_nvenc '/content/leftside.mp4'")

In [None]:
from google.colab import files
files.download('/content/leftside.mp4')

##rightside

In [None]:
from collections import defaultdict
import cv2
import time
from ultralytics import YOLO
from lanedetector import *
import numpy as np
import threading
from queue import Queue

# Load the YOLOv8 model
model = YOLO('train3/weights/best.pt')

# Open the video file
video_path = "test_images/rightside.mp4"
cap = cv2.VideoCapture(video_path)

# Constants for speed calculation
VIDEO_FPS = int(cap.get(cv2.CAP_PROP_FPS))
FACTOR_KM = 3.6
LATENCY_FPS = VIDEO_FPS / 2

# Define the output video file path with MP4 format
output_video_path = "/content/CornerSentinal/test_images/leftside_out.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # Use "mp4v" for H.264 compression
out = cv2.VideoWriter(output_video_path, fourcc, VIDEO_FPS, (int(cap.get(3)), int(cap.get(4))))

# Initialize lane detector
ld = LineDrawerGUI(video_path)
line_coords = ld.line_coords
option_val = ld.option_val
ld2 = LaneDetector(video_path, line_coords)
ld2.calculate_all_points()
points = ld2.points

# Function to calculate Euclidean distance
def calculate_distance(p1, p2):
    return np.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2)

# Function to calculate speed using Euclidean distance
def calculate_speed(distances, factor_km, latency_fps):
    if len(distances) <= 1:
        return 0.0
    average_speed = (np.mean(distances) * factor_km) / latency_fps * 10
    return average_speed

# Function to generate 9-bit binary code based on conditions
def generate_binary_code(class_id, speed, is_stationary, is_wrong_side):
    binary_code = ['0'] * 8

    # Stationary bit
    binary_code[0] = '1' if is_stationary else '0'

    if class_id == 0:  # Ambulance
        binary_code[2:5] = '100'
    elif class_id in [2, 6, 4]:  # Car or Van or Taxi/Auto
        binary_code[2:5] = '010'
    elif class_id in [5, 1]:  # Bus or Truck
        binary_code[2:5] = '011'
    elif class_id == 3:  # Motorcycle
        binary_code[2:5] = '001'

    # Wrong side warning bit
    binary_code[5] = '1' if is_wrong_side else '0'

    # Replace speed section
    if speed > 60:  # Overspeed Vehicle
        binary_code[6:8] = '11'
    elif 40 <= speed < 60:
        binary_code[6:8] = '10'
    elif 1.5 <= speed < 40:
        binary_code[6:8] = '01'

    return ''.join(binary_code)

# Function to display warning message on the frame
def display_warning_message(frame, binary_code):
    warning_message = f"Warning: {binary_code}"
    cv2.putText(frame, warning_message, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)

# Store track history for each vehicle
track_history = defaultdict(list)
stationary_timers = defaultdict(float)

# Counter to keep track of frames
frame_counter = 0

# Placeholder for the previous frame and points for optical flow
prev_frame = None
prev_pts = None

# Placeholder for the previous binary code
prev_binary_code = None
recv_binary_code = None
prev_track_id = None


while cap.isOpened():
    ret = cap.grab()
    if ret:
        success, frame = cap.retrieve()

        # Check if YOLO inference should be performed on this frame
        if frame_counter % 2 == 0:
            results = model.track(frame, persist=True, tracker='bytetrack.yaml', imgsz=320, conf=0.25, int8=True)
            annotated_frame = results[0].plot()

            if results[0].boxes.id is not None:
                boxes = results[0].boxes.xywh.cpu().numpy().astype(int)
                class_id = results[0].boxes.cls.cpu().numpy().astype(int)
                track_ids = results[0].boxes.id.cpu().numpy().astype(int)

                for i, box in enumerate(boxes):
                    x, y, w, h = box
                    xmin, ymin, xmax, ymax = x, y, x + w, y + h
                    track = track_history[track_ids[i]]
                    track.append((float(x + w / 2), float(y + h / 2)))

                    if len(track) >= 2 and track[-2][1] < track[-1][1]:
                        distances = [calculate_distance(track[j], track[j + 1]) for j in range(len(track) - 1)]
                        speed = calculate_speed(distances, FACTOR_KM, LATENCY_FPS)
                        is_stationary = speed < 1.0
                        stationary_timers[track_ids[i]] = time.time() if not is_stationary else stationary_timers[
                            track_ids[i]]

                        if time.time() - stationary_timers[track_ids[i]] > 10.0:
                            is_stationary = True
                        vehicle_pos = calculate_centroid(xmin, ymin, xmax, ymax)
                        correct_lane = lane_detector(points, vehicle_pos, int(option_val))
                        is_wrong_side = correct_lane != 1.0 and correct_lane != 0

                        binary_code = generate_binary_code(class_id[i], speed, is_stationary, is_wrong_side)
                        display_warning_message(annotated_frame, binary_code)
                        cv2.putText(annotated_frame, f"Speed: {speed:.2f} km/h", (int(x), int(y) - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
                        roi = frame[int(y):int(y + h), int(x):int(x + w)]

                        if prev_frame is not None and prev_pts is not None:
                            prev_frame_resized = cv2.resize(prev_frame, (roi.shape[1], roi.shape[0]))
                            flow = cv2.calcOpticalFlowPyrLK(prev_frame_resized, roi, prev_pts, None,
                                                             winSize=(15, 15), maxLevel=2)
                            flow_distances = np.sqrt(np.sum((prev_pts - flow[0]) ** 2, axis=2))

                            good_pts = flow_distances > 0.5
                            for j, is_good in enumerate(good_pts):
                                if is_good:
                                    x1, y1 = prev_pts[j].astype(int).ravel()
                                    x2, y2 = (x + flow[0][j][0], y + flow[0][j][1]).astype(int)
                                    cv2.line(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                            prev_frame = roi
                            prev_pts = np.array([[(int(w / 2), int(h / 2))]], dtype=np.float32)

    # Increment frame counter
    frame_counter += 1
    # Display the annotated frame
    out.write(annotated_frame)  # Save the frame to the output video
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break


# Release resources
cap.release()
cv2.destroyAllWindows()

In [None]:
!rm -rf '/content/rightside.mp4'

import os

# Set up NVIDIA GPU device for FFmpeg
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# Convert hbfootage_out.mp4 to footage.mp4 using GPU-accelerated FFmpeg
os.system(f"ffmpeg -hwaccel cuda -i '/content/CornerSentinal/test_images/rightside_out.mp4' -vcodec hevc_nvenc '/content/rightside.mp4'")

In [None]:
from google.colab import files
files.download('/content/rightside.mp4')