# Install the needed libraries

In [None]:
pip install opencv-python torch torchvision ultralytics 

# Import the most important libraries used for the implementation through the whole code.

In [None]:
# This is the library that allows us to read and save graphics files and apply filters and effects to them, including resize, grayscale, rotation and many other variations. (Installation: pip install opencv-python)
import cv2

# A Jupyter kernel to work with Python code in Jupyter notebooks and other interactive frontends
import IPython

# For more easier implementation
from IPython.display import display, clear_output

# This is an image formatting library that deals with images as matrices and converts them from and back to images
from PIL import Image

# This library prints errors as trees
import traceback

# This is the Pytorch import (Installation: pip install torch)
import torch

# For common mathematical functions like "floor"
import math

# For measuring inference time
import time

# Imports all other pre-trained detection models needed along with their weights (COCO dataset with 80 classes); they will be automatically downloaded if they are not available (Installation: pip install torchvision)
from torchvision.models.detection import *

# For processing Pytorch detection models
from torchvision.io.image import read_image
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image
import torchvision.transforms as transforms

# Imports all pre-trained YOLOv5, YOLOv8, YOLOv9 and in the future YOLOv10 models (COCO dataset with 80 classes); they will be automatically downloaded if not available (Installation: pip install ultralytics)
from ultralytics import YOLO

# Determining first priority for GPU for better performance if available; otherwise, settle for CPU. You can import the models and their weights from either Ultralytics for YOLO or from Pytorch for other models.

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Use this only when you are using any model from Pytorch, it transforms input image to tensor, there is a commented line that resizes the input image into 300x300 before processing but it is not needed here.

In [None]:
transform = transforms.Compose([
    # transforms.Resize((300, 300)),
    transforms.ToTensor(),
])

In [None]:
# Here we can limit the classes we test to vehicle related ones
target_classes = ["truck", "bus", "car", "train", "bicycle"]

# Here we can import and initialize a detection model from "torchvision.models.detection"
WEIGHTS = FasterRCNN_ResNet50_FPN_Weights
MODEL = fasterrcnn_resnet50_fpn(weights=WEIGHTS.DEFAULT)

# List of all available detection models and their weights
 # fcos_resnet50_fpn, FCOS_ResNet50_FPN_Weights
 # retinanet_resnet50_fpn, RetinaNet_ResNet50_FPN_Weights
 # retinanet_resnet50_fpn_v2, RetinaNet_ResNet50_FPN_V2_Weights
 # fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
 # fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights
 # fasterrcnn_mobilenet_v3_large_fpn, FasterRCNN_MobileNet_V3_Large_FPN_Weights
 # fasterrcnn_mobilenet_v3_large_320_fpn, FasterRCNN_MobileNet_V3_Large_320_FPN_Weights
 # maskrcnn_resnet50_fpn, MaskRCNN_ResNet50_FPN_Weights
 # maskrcnn_resnet50_fpn_v2, MaskRCNN_ResNet50_FPN_V2_Weights
 # ssd300_vgg16, SSD300_VGG16_Weights
 # ssdlite320_mobilenet_v3_large, SSDLite320_MobileNet_V3_Large_Weights

# For YOLO

# Here we can import and initialize a detection model from "ultralytics.YOLO"
WEIGHTS = "yolov5nu.pt" # or "yolov8n.pt"
MODEL = YOLO(WEIGHTS)

In [None]:
# We can use MODEL.eval() to see all layers in the network
MODEL.eval()

# For one image input, we can simply use this code with "source" as the string path to the image.

In [None]:
source = "samples/truck.jpg"
frame = cv2.imread(source, cv2.IMREAD_COLOR)
handleFrame(frame, 1, source)

In [None]:
# The $handleFrame$ function takes care of the pre- and post-processing of the frames in case they have to be resized, grayscaled, flipped or enhanced for the detection model to deliver better results, and calls the processing method in between. This function has the following parameters:
# frame: the input image
# frameIndex: the index of this frame in the video if the source is a video, otherwise -1
# source: the source name; if camera, then it will be a positive integer like 0, or the video path as string
def handleFrame(frame, frameIndex=-1, source=None):
    # Pre-processing can be implemented here in case the frames have to be resized, grayscaled or flipped
    
    processed_frame = process_frame(frame, frameIndex, source)
    
    # Post-processing can be implemented here in case any modifications have to be shown on the displayed frame

    # Restores the RGB channels and displays the frames on top of each other like a smooth video
    _, processed_frame = cv2.imencode('.jpeg', processed_frame)
    display_handle.update(IPython.display.Image(data=processed_frame.tobytes()))
    IPython.display.clear_output(wait=True)

    # Clears the output to prepare for the next frame display in-place; this allows printing text above the image for debugging, but there will be blanks between frames
    # img = Image.fromarray(processed_frame, 'RGB')
    # display(img)
    # clear_output(wait=True)

In [None]:
# The process_frame function includes the detection head in the implementation and processes the frame accordingly with the same parameters as $handleFrame$. The iterations through the results objects can be obtained from the official documentation; as for YOLO models, it is in the Ultralytics Documents \cite{ultralyticsmodels} and for all other models, it is documented in Pytorch \cite{pytorchmodels}.
def process_frame(frame, frameIndex = -1, source = None):

    confidence_threshold = 0.5

    # Here we call the model inference on "frame"
    frame = detectAndHandle(frame, confidence_threshold, frameIndex, source)
    
    return frame

In [None]:
# The detectAndHandle function is a model specific code, here we can implement the detection model in the pipeline, this code works for detection models from torchvision by Pytorch. The confidence_threshold is injected through the process_frame method.
def detectAndHandle(frame, confidence_threshold, frameIndex = -1, source = None):
    # Convert frame to RGB and PIL Image, then apply transformation
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(frame_rgb)
    image = transform(pil_image).unsqueeze(0).to(device)

    # Object detection
    start_time = time.time()
    with torch.no_grad():
        predictions = MODEL(image)

    # Processing (inference) time
    processing_time = time.time() - start_time
    
    # Visualization
    scores = predictions[0]['scores']
    boxes = predictions[0]['boxes']
    labels = predictions[0]['labels']

    # For each detection
    for confidence, bounding_box, label in zip(scores, boxes, labels):
        name = WEIGHTS.DEFAULT.meta["categories"][label.item()]
        if name in target_classes and confidence>confidence_threshold:
            # Extract bounding boxes
            x1, y1, x2, y2 = map(int, bounding_box)

            # Ensure x1 < x2 and y1 < y2
            x1, x2 = min(x1, x2), max(x1, x2)
            y1, y2 = min(y1, y2), max(y1, y2)

            # Build up the label text out of class name, confidence score and inference speed
            label_text = f'{name} {confidence*100:.2f}% ({processing_time:.3f}s)'

            # Draw bounding boxes and label on the frame
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,255), 2)
            cv2.putText(frame, label_text, (x1, y1+20), 0, 0.7, (0,255,255), 2)

            # For further processing of a detected vehicle object (This method will be used in the next chapter)
            process_vehicle(frame, bounding_box, name, confidence)

    return frame

In [None]:
# The detectAndHandle function is a model specific code, here we can implement the detection model in the pipeline. This variant of the code works for YOLO detection models from Ultralytics.
def detectAndHandle(frame, confidence_threshold, frameIndex = -1, source = None):
    # For YOLO Models from Ultralytics:
    
    start_time = time.time()
    results = MODEL(frame, device=device)[0]
    processing_time = time.time() - start_time

    # Iterate through results
    for result in results:
        detection_count = result.boxes.shape[0]

        # Iterate through all detections
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())

            # Extract class name
            name = result.names[cls]

            # Extract confidence score
            confidence = float(result.boxes.conf[i].item())
            
            # Vehicle detected, proceed
            if name in target_classes and confidence>confidence_threshold:
                # Extract bounding boxes
                bounding_box = result.boxes.xyxy[i].cpu().numpy()
                x1, y1, x2, y2 = [int(x) for x in bounding_box]
                
                # Ensure x1 < x2 and y1 < y2
                x1, x2 = min(x1, x2), max(x1, x2)
                y1, y2 = min(y1, y2), max(y1, y2)

                # Build up the label text out of class name, confidence score and inference speed
                label_text = '{} {:.2f}% ({:.3f}s)'.format(name.upper(), confidence*100, processing_time)

                # Draw bounding boxes and label on the frame
                cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,255), 2)
                cv2.putText(frame, label_text, (x1, y1+20), 0, 0.7, (0,255,255), 2)

                # For further processing of a detected vehicle object (This will be discussed in the next chapter)
                process_vehicle(frame, bounding_box, name, confidence)

    return frame

# For video or live stream inputs, we have to use the code below, starting by initializing the parameters and opening the stream.

In [None]:
# The 0 means the first available camera of this device, or "data/video/sample.MP4" for a specific video
source = "data/video/sample.MP4"

# This variable determines how many frames do we skip, e.g. every = 60 means handle only every 60th frame from the source
every = 60

# This initializes the display method with default configuration
display_handle=display(None, display_id=True)

# This initializes the input feed from a video or camera and opens the source
cam = cv2.VideoCapture(source)

# These parameters are only used for video processing to determine the start and end count of frames
start = -1 if isinstance(source, int) else 0 # Or "if source==0" in case you only have one camera input source
end = -1 if isinstance(source, int) else int(cam.get(cv2.CAP_PROP_FRAME_COUNT))
if not isinstance(source, int): cam.set(1, start)

# This initializes a loop safety for allowing reading upto safety_limit = 2000 empty frames in sequence before terminating the while loop as a result of an error; otherwise, end it normally when input video ends or input stream gets interrupted
frameCount=start
while_safety=0
safety_limit = 2000

# Checks if the source has not yet been opened, then throws an error, otherwise, it starts handling the loop with a safety try except block, which makes sure the source is closed correctly even in the case of an error, otherwise, it could result in leaving the camera running in background.

In [None]:
if not cam.isOpened(): print("Error: Could not open source.")
else:
    try:
        # If source is opened, it enters an infinite while loop for camera feed input or a loop determined by the end frame count of the video, as frameCount counts up to the end of the video
        while True if end<0 else frameCount<end:
            # Read next frame
            _, frame = cam.read()

            # If frame is empty and the source is a video, allow the safety_limit because there will be frames later; otherwise, release the camera
            if frame is None:
                if not isinstance(source, int):
                    if while_safety > safety_limit: break
                    while_safety += 1
                    continue
                else:
                    print("Error: Could not capture frame.")
                    cam.release()
                    break

            # If there is a frame skip rate then apply it and handle frame only when it is valid, otherwise handle every frame
            if every>0:
                if (frameCount+1)%math.floor(every) == 0:
                    while_safety = 0
                    handleFrame(frame, frameCount, source)
            else:
                while_safety=0
                handleFrame(frame)

            # Increase the current frameCount
            frameCount += 1

        # After the loop handle the source release accordingly
        handleRelease()
    except KeyboardInterrupt:
        # If stream or video read is interrupted by user, handle source release accordingly as well
        handleRelease()
    except:
        # Some error occurred to reach here; handle it and print the error tree
        print("Unknown error.")
        try:
            cam.release()
            raise TypeError("Error: Source could not be released.")
        except:
            pass
        traceback.print_exc()

In [None]:
# The $handleRelease$ function handles the program flow in case the video ends or the camera source is released on user interruption or end of video file.
def handleRelease():
    cam.release()
    print("Source released.")

    # We could output anything we want here after the video ends or after the stream closes; for example, we can add arrays with statistics and print them here to show results with numbers.

# This code builds upon the codes used in Vehicle Detection.

In [None]:
def process_vehicle(frame, parent_bounding_box, parent_name, parent_confidence):
    x1, y1, x2, y2 = [int(x) for x in parent_bounding_box]
    vehicle_frame = frame[y1:y2,x1:x2]

    # Using an ultralytics YOLO model for license plate detection
    results = plate_model(vehicle_frame, agnostic_nms=True, verbose=False)[0]
    for result in results:
        detection_count = result.boxes.shape[0]
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())
            name = result.names[cls]
            confidence = float(result.boxes.conf[i].item())
            bounding_box = result.boxes.xyxy[i].cpu().numpy()
            a1, b1, a2, b2 = [int(x) for x in bounding_box2]
            # Ensure a1 < a2 and b1 < b2
            a1, a2 = min(a1, a2), max(a1, a2)
            b1, b2 = min(b1, b2), max(b1, b2)

            # Draw bounding boxes and write labels
            cv2.rectangle(frame,(x1+a1,y1+b1),(x1+a2,y1+b2),(255,0,255),2)
            cv2.putText(frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+a1+10,y1+b1-15),0,0.9,(255,0,255),2)

            # This method tries to recognize the license plate characters
            process_license(frame, [x1+a1, y1+b1, x1+a2, y1+b2], name, confidence, parent_bounding_box, parent_name, parent_confidence)

# This code shows an example of handling a specialist detection model with all needed classes for a specific use case, e.g. vehicle recognition.

In [None]:
def get_grayscale(image):
    return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

def thresholding(image):
    return cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]

def process_license(processed_frame, bounding_box, name, confidence, parentName='test', parentConfidence=0.0):
    x1, y1, x2, y2 = [int(x) for x in bounding_box]
    cv2.rectangle(processed_frame,(x1,y1),(x2,y2),(255,0,255),2)
    cv2.putText(processed_frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+10,y1-15),0,0.7,(255,0,255))

    license_frame = processed_frame[y1:y2,x1:x2]
    process_recognition(...)

# NN Layer Yolov8 Detection
def process_frame(frame):
    processed_frame = frame
    results = specialist_model(processed_frame, agnostic_nms=True, verbose=False)[0]

    # For every detection we check class names and implement the logic based on that
    for result in results:
        detection_count = result.boxes.shape[0]
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())
            name = result.names[cls]
            confidence = float(result.boxes.conf[i].item())
            bounding_box = result.boxes.xyxy[i].cpu().numpy()
            x1, y1, x2, y2 = [int(x) for x in bounding_box]
            # Ensure x1 < x2 and y1 < y2
            x1, x2 = min(x1, x2), max(x1, x2)
            y1, y2 = min(y1, y2), max(y1, y2)
            cv2.rectangle(processed_frame,(x1,y1),(x2,y2),(0,255,255),2)
            cv2.putText(processed_frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+10,y1-15),0,0.7,(0,255,255))
            if name=="license_plate": process_license(processed_frame, bounding_box, name, confidence)
    return processed_frame

# This is the implementation for recognizing text using pytesseract OCR.

In [None]:
import pytesseract   # pip install pytesseract

detected_plates = []

# Install this from "https://github.com/UB-Mannheim/tesseract/wiki"
pytesseract.pytesseract.tesseract_cmd = 'C:\\Program Files\\Tesseract-OCR\\tesseract.exe'

# get grayscale image
def get_grayscale(image):
    return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

#thresholding
def thresholding(image):
    return cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]

def process_license(frame, parent_bounding_box, parent_name, parent_confidence, grandparent_bounding_box, grandparent_name, grandparent_confidence):
    x1, y1, x2, y2 = [int(x) for x in parent_bounding_box]
    license_plate_frame = frame[y1:y2,x1:x2]

    grayscaled = get_grayscale(license_plate_frame)
    threshholded = thresholding(grayscaled)

    max_conf = 0.0
    best_text = ""

    results = pytesseract.image_to_data(threshholded, config='-c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
    # NN Layer License Plate Recognition "Pytesseract OCR"

    parsedLines = results.split('\n')
    best_depth = len(parsedLines)-2
    for line in parsedLines:
        params = line.split()
        if len(params)==12 and float(params[10].replace('conf','0.0'))>=max_conf:
            max_conf = float(params[10].replace('conf','0.0'))
            best_text = params[11] if params[11]!="text" else "-"
            if max_conf>=0.0 and len(best_text)>0:
                detected_plates.append([grandparent_confidence*100, grandparent_name.upper(), parent_confidence*100, parent_name.upper(), confidence*100, name.upper(), max_conf, best_text, "tesse"])
    cv2.putText(frame, '{} {:.2f}%'.format("[tesse]: "+best_text, max_conf),(x1+10,y2+25),0,0.9,(0,0,255),3)

# This is the implementation for recognizing text using EasyOCR.

In [None]:
import easyocr  # pip install easyocr
reader = easyocr.Reader(['en', 'de'])

detected_plates = []

# get grayscale image
def get_grayscale(image):
    return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

#thresholding
def thresholding(image):
    return cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]

def process_license(frame, parent_bounding_box, parent_name, parent_confidence, grandparent_bounding_box, grandparent_name, grandparent_confidence):
    x1, y1, x2, y2 = [int(x) for x in parent_bounding_box]
    license_plate_frame = frame[y1:y2,x1:x2]

    grayscaled = get_grayscale(license_plate_frame)
    threshholded = thresholding(grayscaled)

    max_conf = 0.0
    best_text = ""

    result = reader.readtext(threshholded)
    text = ""
    conf = 0.0
    for res in result:
        if res[2]>conf:
            conf=res[2]
            text=res[1]
            if res[2]*100>max_conf:
                max_conf = res[2]*100
                best_text = res[1]
                if max_conf>=0.0 and len(best_text)>0:
                    detected_plates.append([grandparent_confidence*100, grandparent_name.upper(), parent_confidence*100, parent_name.upper(), confidence*100, name.upper(), max_conf, best_text, "easyocr"])
    cv2.putText(frame, '{} {:.2f}%'.format("[easy]: "+str(text), conf*100),(x1+10,y2+57),0,0.9,(200,200,200),3)

# This is the implementation for recognizing text using EasyOCR. detection model

In [None]:
detected_plates = []

def process_license(frame, parent_bounding_box, parent_name, parent_confidence, grandparent_bounding_box, grandparent_name, grandparent_confidence):
    x1, y1, x2, y2 = [int(x) for x in parent_bounding_box]
    license_plate_frame = frame[y1:y2,x1:x2]

    # Using an ultralytics YOLO model for license plate recognition
    results = characters_model(license_plate_frame, agnostic_nms=True, verbose=False)[0]
    for result in results:
        detection_count = result.boxes.shape[0]
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())
            name = result.names[cls]
            confidence = float(result.boxes.conf[i].item())
            bounding_box = result.boxes.xyxy[i].cpu().numpy()
            a1, b1, a2, b2 = [int(x) for x in bounding_box2]
            # Ensure a1 < a2 and b1 < b2
            a1, a2 = min(a1, a2), max(a1, a2)
            b1, b2 = min(b1, b2), max(b1, b2)

            # Draw bounding boxes and write labels
            cv2.rectangle(frame,(x1+a1,y1+b1),(x1+a2,y1+b2),(255,0,255),2)
            cv2.putText(frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+a1+10,y1+b1-15),0,0.9,(255,0,255),2)
            plate_parts.append([a1,b1,name2])

    sorted_list = sorted(plate_parts,key=lambda l:l[0])
    for part in range(len(sorted_list)):
        if sorted_list[part][2]!="undefined":
            plate_text+=sorted_list[part][2]
    detected_plates.append(plate_text)  # We could also add parent and grandparent object information here too

print(detected_plates)

In [None]:
# This code displays every tested frame with every model name, accuracy, speed, class names and bounding boxes on the displayed frame. It also outputs a code that can be used in plots.

In [None]:
import cv2
import torch

# Imports all pre-trained YOLOv5, YOLOv8, YOLOv9 and in the future YOLOv10 models (COCO dataset with 80 classes), they will be automatically downloaded if not available
from ultralytics import YOLO

# Imports all other pre-trained detection models needed (COCO dataset with 80 classes), they will be automatically downloaded if they are not available
from torchvision.models.detection import *

from torchvision.io.image import read_image
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image
import torchvision.transforms as transforms
from PIL import Image
import IPython
from IPython.display import display, clear_output
import traceback
import math
import time

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Transforms input image to tensor, there is a commented line that resizes the input image into 300x300 before processing but it was not used in the analysis, it is kept so you can have the option to use it, but the models usually automatically resize inputs based on the trained weights resolution.
transform = transforms.Compose([
    # transforms.Resize((300, 300)),
    transforms.ToTensor(),
])

# This is a new class definition that describes a Detection Model object, where detections can be stored and averages can be calculated, this workflow is based on the RAM size because it stores data temporarily, I wanted to rebuild the code so it works with file save and file read and that every model inferences the whole video then the next model comes but I still wanted to test with camera on a live feed so I kept it as is in the end. This class downloads, loads and initializes all models before the beginning of tests.
class DetectionModel():
    def __init__(self, concept, backbone, model, weights, reference, color=None):
        self.concept = concept
        self.backbone = backbone
        self.model = model(weights) if concept=="YOLO" else model(weights=weights.DEFAULT)
        if concept!="YOLO":
            self.model.to(device)
            self.model.eval()
        self.weights = weights if concept=="YOLO" else weights.DEFAULT
        self.reference = reference
        self.color = color
        self.detections = []

    def addDetection(self, detection):
        self.detections.append(detection)

    def getDetections(self):
        return self.detections

    def clearDetections(self):
        self.detections = []

    def detectionsCount(self):
        return len(self.detections)

    def inferencesCount(self):
        length = 0
        addedFrames = []
        if len(self.detections)>0:
            for i in range(len(self.detections)):
                if self.detections[i].frame not in addedFrames:
                    addedFrames.append(self.detections[i].frame)
        return len(addedFrames)

    def getAveragePrecision(self):
        sum=0.0
        avg=0.0
        if len(self.detections)>0:
            for i in range(len(self.detections)):
                sum+=self.detections[i].precision
            avg = sum/len(self.detections)
        return  avg.item() if torch.is_tensor(avg) else avg

    def getMaxPrecision(self):
        max=0.0
        for i in range(len(self.detections)):
            if max<self.detections[i].precision:
                max=self.detections[i].precision
        return max.item() if torch.is_tensor(max) else max

    def getMinPrecision(self):
        min=9999999.99
        for i in range(len(self.detections)):
            if min>self.detections[i].precision:
                min=self.detections[i].precision
        return 0.0 if min==9999999.99 else min.item() if torch.is_tensor(min) else min

    def getAverageSpeed(self):
        sum=0.0
        avg=0.0
        if len(self.detections)>0:
            for i in range(len(self.detections)):
                sum+=self.detections[i].speed
            avg = sum/len(self.detections)
        return avg

    def getMaxSpeed(self):
        max=0.0
        for i in range(len(self.detections)):
            if max<self.detections[i].speed:
                max=self.detections[i].speed
        return max

    def getMinSpeed(self):
        min=9999999.99
        for i in range(len(self.detections)):
            if min>self.detections[i].speed:
                min=self.detections[i].speed
        return 0.0 if min==9999999.99 else min

    def getAverageInferenceSpeed(self):
        sum=0.0
        avg=0.0
        addedFrames=[]
        if len(self.detections)>0:
            for i in range(len(self.detections)):
                if self.detections[i].frame not in addedFrames:
                    sum+=self.detections[i].speed
                    addedFrames.append(self.detections[i].frame)
            avg = sum/len(addedFrames) if len(addedFrames)>0 else 0.0
        return avg

    def getMaxInferenceSpeed(self):
        max=0.0
        addedFrames=[]
        for i in range(len(self.detections)):
            if max<self.detections[i].speed and self.detections[i].frame not in addedFrames:
                max=self.detections[i].speed
                addedFrames.append(self.detections[i].frame)
        return max

    def getMinInferenceSpeed(self):
        min=9999999.99
        addedFrames=[]
        for i in range(len(self.detections)):
            if min>self.detections[i].speed and self.detections[i].frame not in addedFrames:
                min=self.detections[i].speed
                addedFrames.append(self.detections[i].frame)
        return 0.0 if min==9999999.99 else min

    def filterPrecisionsBelow(self, minPrecision):
        newDetections = []
        for i in range(len(self.detections)):
            if self.detections[i].precision>=minPrecision:
                newDetections.append(self.detections[i])
        self.detections = newDetections

    def printResults(self):
        print("Model:", self.reference, 
              "\nBest Precision:", self.getMaxPrecision(), 
              "%\nAverage Precision:", self.getAveragePrecision(), 
              "%\nWorst Precision:", self.getMinPrecision(), 
              "%\nBest Speed:", self.getMinSpeed(), 
              "seconds\nAverage Speed:", self.getAverageSpeed(), 
              "seconds\nWorst Speed:", self.getMaxSpeed(), 
              "seconds\nBest Inference Speed:", self.getMinInferenceSpeed(), 
              "seconds\nAverage Inference Speed:", self.getAverageInferenceSpeed(), 
              "seconds\nWorst Inference Speed:", self.getMaxInferenceSpeed(), 
              "seconds\nTotal Detections:", self.detectionsCount(), 
              "\nTotal Inferences:", self.inferencesCount(), 
              "\n\n")

# This class represents a Detection Result, it is saved into the detections array in the Detection Model object
class DetectionResult():
    def __init__(self, model, label, precision, speed, device=None, source=None, frame=None, boundingBox=None, details=None):
        self.model = model
        self.label = label
        self.precision = precision.item() if torch.is_tensor(precision) else precision
        self.speed = speed.item() if torch.is_tensor(speed) else speed
        self.device = device
        self.source = source
        self.frame = frame
        self.boundingBox = boundingBox
        self.details = details

target_classes = ["truck", "bus", "car", "train", "bicycle"]

# This defines all the models we want to work with, we can import any model we want above and add it in this array eventually. Defining a new model takes the concept name as the first parameter, the backbone name as the second parameter, the model object in the third, the pretrained weights name in fourth, a custom given name for the model as fifth parameter and a color tupel in the last parameter.
VEHICLE_DETECTION_MODELS = [
    DetectionModel("FCOS", "resnet50_fpn", fcos_resnet50_fpn, FCOS_ResNet50_FPN_Weights, "fcos_resnet", (255,0,0)),
    DetectionModel("RetinaNet", "resnet50_fpn", retinanet_resnet50_fpn, RetinaNet_ResNet50_FPN_Weights, "retinanet_resnet", (255,255,0)),
    DetectionModel("RetinaNet", "resnet50_fpn_v2", retinanet_resnet50_fpn_v2, RetinaNet_ResNet50_FPN_V2_Weights, "retinanet_resnet_v2", (0,234,255)),
    DetectionModel("FasterRCNN", "resnet50_fpn", fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights, "faster_rcnn_resnet", (170,0,255)),
    DetectionModel("FasterRCNN", "resnet50_fpn_v2", fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights, "faster_rcnn_resnet_v2", (255,127,0)),
    DetectionModel("FasterRCNN", "mobilenet_v3_large_fpn", fasterrcnn_mobilenet_v3_large_fpn, FasterRCNN_MobileNet_V3_Large_FPN_Weights, "faster_rcnn_mobilenet_v3", (191,255,0)),
    DetectionModel("FasterRCNN", "mobilenet_v3_large_320_fpn", fasterrcnn_mobilenet_v3_large_320_fpn, FasterRCNN_MobileNet_V3_Large_320_FPN_Weights, "faster_rcnn_mobilenet_v3_320", (0,149,255)),
    DetectionModel("MaskRCNN", "resnet50_fpn", maskrcnn_resnet50_fpn, MaskRCNN_ResNet50_FPN_Weights, "mask_rcnn_resnet", (255,0,170)),
    DetectionModel("MaskRCNN", "resnet50_fpn_v2", maskrcnn_resnet50_fpn_v2, MaskRCNN_ResNet50_FPN_V2_Weights, "mask_rcnn_resnet_v2", (255,212,0)),
    DetectionModel("SSD300", "vgg16", ssd300_vgg16, SSD300_VGG16_Weights, "ssd_vgg16", (106,255,0)),
    DetectionModel("SSDLite320", "mobilenet_v3_large", ssdlite320_mobilenet_v3_large, SSDLite320_MobileNet_V3_Large_Weights, "ssd_mobilenet_v3", (0,64,255)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5nu.pt', "yolov5nu", (237,185,185)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5su.pt', "yolov5su", (185,215,237)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5mu.pt', "yolov5mu", (231,233,185)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5lu.pt', "yolov5lu", (220,185,237)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5xu.pt', "yolov5xu", (185,237,224)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5n6u.pt', "yolov5n6u", (143,35,35)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5s6u.pt', "yolov5s6u", (35,98,143)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5m6u.pt', "yolov5m6u", (143,106,35)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5l6u.pt', "yolov5l6u", (107,35,143)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov5x6u.pt', "yolov5x6u", (79,143,35)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov8n.pt', "yolov8n", (185,237,224)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov8s.pt', "yolov8s", (115,115,115)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov8m.pt', "yolov8m", (204,204,204)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov8l.pt', "yolov8l", (255,0,0)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov8x.pt', "yolov8x", (255,255,0)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov9c.pt', "yolov9c", (0,234,255)),
    DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov9e.pt', "yolov9e", (170,0,255))#,
    # DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov10n.pt', "yolov10n", (185,237,224)),
    # DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov10s.pt', "yolov10s", (115,115,115)),
    # DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov10m.pt', "yolov10m", (204,204,204)),
    # DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov10b.pt', "yolov10b", (204,204,204)),
    # DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov10l.pt', "yolov10l", (255,0,0)),
    # DetectionModel("YOLO", "csp_darknet53", YOLO, 'yolov10x.pt', "yolov10x", (255,255,0))
]

def process(frame, frameIndex=-1, source=None):
    for i in range(len(VEHICLE_DETECTION_MODELS)):
        currentModel = VEHICLE_DETECTION_MODELS[i]
        processed_frame = frame.copy()
        confidence_threshold = 0.5
        
        if currentModel.concept=="YOLO":
            start_time = time.time()
            results = currentModel.model(processed_frame, agnostic_nms=True, verbose=False, device=device)[0]
            processing_time = time.time() - start_time
            for result in results:
                detection_count = result.boxes.shape[0]
                for j in range(detection_count):
                    cls = int(result.boxes.cls[j].item())
                    name = result.names[cls]
                    confidence = float(result.boxes.conf[j].item())
                    bounding_box = result.boxes.xyxy[j].cpu().numpy()
                    if name in target_classes and confidence>confidence_threshold:
                        x1, y1, x2, y2 = [int(x) for x in bounding_box]
                        # Ensure x1 < x2 and y1 < y2
                        x1, x2 = min(x1, x2), max(x1, x2)
                        y1, y2 = min(y1, y2), max(y1, y2)
                        label = '{} ({:.2f}%) ({:.3f}s) {}'.format(name.upper(), confidence*100, processing_time, currentModel.reference)
                        print(label)
                        VEHICLE_DETECTION_MODELS[i].addDetection(DetectionResult(currentModel.reference, name, confidence*100, processing_time, device, source, frameIndex, bounding_box, label))
                        cv2.rectangle(processed_frame, (x1, y1), (x2, y2), currentModel.color, 2)
                        cv2.putText(processed_frame, label, (x1+10, y1+25), 0, 0.8, currentModel.color, 2)
        else:
            # Convert frame to RGB and PIL Image, then apply transformation
            frame_rgb = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(frame_rgb)
            image = transform(pil_image).unsqueeze(0).to(device)
        
            # Object detection
            start_time = time.time()
            with torch.no_grad():
                predictions = currentModel.model(image)
        
            # Processing time
            processing_time = time.time() - start_time
            
            # Visualization
            scores = predictions[0]['scores']
            boxes = predictions[0]['boxes']
            labels = predictions[0]['labels']
            
            for score, box, label in zip(scores, boxes, labels):
                if currentModel.weights.meta["categories"][label.item()] in target_classes and score>confidence_threshold:
                    x1, y1, x2, y2 = map(int, box)
                    label_text = f'{currentModel.weights.meta["categories"][label.item()].upper()} ({score*100:.2f}%) ({processing_time:.3f}s) {currentModel.reference}'
                    print(label_text)
                    VEHICLE_DETECTION_MODELS[i].addDetection(DetectionResult(currentModel.reference, currentModel.weights.meta["categories"][label.item()], float(score.item())*100, processing_time, device, source, frameIndex, box, label_text))
                    cv2.rectangle(processed_frame, (x1, y1), (x2, y2), currentModel.color, 2)
                    cv2.putText(processed_frame, label_text, (x1, y1+20), 0, 0.8, currentModel.color, 2)

        img = Image.fromarray(processed_frame, 'RGB')
        display(img)
        clear_output(wait=True)
    return frame

def handleFrame(frame, frameIndex=-1, source=None):
    processed_frame = process(frame,frameIndex,source)

def handleRelease():
    cam.release()
    print("Source released.")

    # The output of the experiment result is processed below
    
    print()
    print(device)
    print(source)
    print()

    BestPrecision=None
    BestAvgPrecision=None
    WorstPrecision=None
    BestSpeed=None
    BestAvgSpeed=None
    WorstSpeed=None

    Models=[]
    BestPrecisions=[]
    AvgPrecisions=[]
    WorstPrecisions=[]
    BestSpeeds=[]
    AvgSpeeds=[]
    WorstSpeeds=[]
    BestInferenceSpeeds=[]
    AvgInferenceSpeeds=[]
    WorstInferenceSpeeds=[]
    TotalDetectionsArr=[]
    TotalInferencesArr=[]
    
    for i in range(len(VEHICLE_DETECTION_MODELS)):
        currentModel = VEHICLE_DETECTION_MODELS[i]
        
        # You can uncomment this to see details of every model
        # currentModel.printResults()

        Models.append(currentModel.reference)
        BestPrecisions.append(currentModel.getMaxPrecision())
        AvgPrecisions.append(currentModel.getAveragePrecision())
        WorstPrecisions.append(currentModel.getMinPrecision())
        BestSpeeds.append(currentModel.getMinSpeed())
        AvgSpeeds.append(currentModel.getAverageSpeed())
        WorstSpeeds.append(currentModel.getMaxSpeed())
        BestInferenceSpeeds.append(currentModel.getMinInferenceSpeed())
        AvgInferenceSpeeds.append(currentModel.getAverageInferenceSpeed())
        WorstInferenceSpeeds.append(currentModel.getMaxInferenceSpeed())
        TotalDetectionsArr.append(currentModel.detectionsCount())
        TotalInferencesArr.append(currentModel.inferencesCount())
        
        if BestPrecision is None or BestPrecision.getMaxPrecision()<=currentModel.getMaxPrecision():
            BestPrecision = currentModel
        if BestAvgPrecision is None or BestAvgPrecision.getAveragePrecision()<=currentModel.getAveragePrecision():
            BestAvgPrecision = currentModel
        if WorstPrecision is None or WorstPrecision.getMinPrecision()>=currentModel.getMinPrecision():
            WorstPrecision = currentModel
        if BestSpeed is None or BestSpeed.getMinSpeed()>=currentModel.getMinSpeed():
            BestSpeed = currentModel
        if BestAvgSpeed is None or BestAvgSpeed.getAverageSpeed()>=currentModel.getAverageSpeed():
            BestAvgSpeed = currentModel
        if WorstSpeed is None or WorstSpeed.getMaxSpeed()<=currentModel.getMaxSpeed():
            WorstSpeed = currentModel
            
    if BestPrecision is not None: 
        print("Best Precision:")
        BestPrecision.printResults()
    if BestAvgPrecision is not None: 
        print("Best Average Precision:")
        BestAvgPrecision.printResults()
    if WorstPrecision is not None: 
        print("Worst Precision:")
        WorstPrecision.printResults()
    if BestSpeed is not None: 
        print("Best Speed:")
        BestSpeed.printResults()
    if BestAvgSpeed is not None: 
        print("Best Average Speed:")
        BestAvgSpeed.printResults()
    if WorstSpeed is not None: 
        print("Worst Speed:")
        WorstSpeed.printResults()

    # This can be copied and used to represent the data as diagrams and tables
    print("models=",Models)
    print("best_precisions=",BestPrecisions)
    print("avg_precisions=",AvgPrecisions)
    print("worst_precisions=",WorstPrecisions)
    print("best_speeds=",BestSpeeds)
    print("avg_speeds=",AvgSpeeds)
    print("worst_speeds=",WorstSpeeds)
    print("best_inference_speeds=",BestInferenceSpeeds)
    print("avg_inference_speeds=",AvgInferenceSpeeds)
    print("worst_inference_speeds=",WorstInferenceSpeeds)
    print("total_detections=",TotalDetectionsArr)
    print("total_inferences=",TotalInferencesArr)

inputType="video"
source = "data/video/sample.MP4" if inputType=="video" else 0
cam = cv2.VideoCapture(source)
display_handle=display(None, display_id=True)
every = 30

start = 0 if source!=0 else -1
end = int(cam.get(cv2.CAP_PROP_FRAME_COUNT)) if source!=0 else -1
if source!=0: cam.set(1, start)

frameCount=start
while_safety=0
saved_count=0

if not cam.isOpened(): print("Error: Could not open source.")
else:
    try:
        while True if end<0 else frameCount<end:
            _, frame = cam.read()
            if frame is None:
                if source!=0:
                    if while_safety > 2000: break
                    while_safety += 1
                    continue
                else:
                    print("Error: Could not capture frame.")
                    cam.release()
                    break
                    
            if every>0:
                if (frameCount+1)%math.floor(every) == 0:
                    while_safety = 0
                    handleFrame(frame, frameCount, source)
            else:
                while_safety=0
                handleFrame(frame)
            frameCount += 1
        handleRelease()
    except KeyboardInterrupt:
        handleRelease()
    except:
        print("Unknown error.")
        try:
            cam.release()
            raise TypeError("Error: Source could not be released.")
        except:
            pass
        traceback.print_exc() 

# After we copy the output from the Detection Experiment Analysis, we can run it in another cell, then use the code below to save and display the data. I still wanted to make the primary data also part of the output from the previous code, like the device used, source input path and parameters like framerate, threshold and target classes.

In [None]:
# You can paste the array definitions from the output above in this cell and run it


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib

# Generate a color map
num_models = len(models)
cmap = matplotlib.colormaps.get_cmap('nipy_spectral')
colors = [cmap(i / num_models) for i in range(num_models)]

# Create a figure and plot with unique colors
plt.figure(figsize=(14, 10))
for i, model in enumerate(models):
    plt.scatter(avg_inference_speeds[i], avg_precisions[i], color=colors[i], label=model, edgecolor='black')

print("DEVICE: CPU (Intel Core i7-14700KF)")
print("SOURCE: data/video/sample.MP4 (1080p60fps Video)")
print("SKIP EVERY 60 FRAMES")
print("TOTALLY PROCESSED FRAMES: 84")
print("COCO Dataset, Classes truck, bus, car, train, bicycle")
print("ALL >50%")
# Axis labels and plot title
plt.xlabel('Average Speed (seconds)')
plt.ylabel('Average Precision (%)')
plt.title('Model Performance: Average Inference Speed x Average Accuracy')
plt.legend(loc='upper left', bbox_to_anchor=(1.05, 1), fontsize='small')
plt.grid(True)
plt.savefig("Analysis/Experiment1/i7_14700KF/Experiment1_Diagram.png", bbox_inches='tight')
plt.show()

# Set up the figure and axis for the table
fig, ax = plt.subplots(figsize=(14, 12))  # Adjust size as needed
ax.axis('tight')
ax.axis('off')

# The table data: transpose the array to make each column a different metric
table_data = np.transpose([models, best_precisions, avg_precisions, worst_precisions])

# Create the table in the plot
table = ax.table(cellText=table_data, colLabels=["Model", "Best Precision (%)", "Average Precision (%)", "Worst Precision (%)"],
                 cellLoc='center', loc='center', colColours=["palegreen"] * 4)
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1.2, 1.2)  # Scale table size

plt.title("Model Precision Metrics")
plt.savefig("Analysis/Experiment1/i7_14700KF/Experiment1_Precision_Table.png", bbox_inches='tight')
plt.show()

# # Set up the figure and axis for the table
# fig, ax = plt.subplots(figsize=(14, 12))  # Adjust size as needed
# ax.axis('tight')
# ax.axis('off')

# # The table data: transpose the array to make each column a different metric
# table_data = np.transpose([models, best_speeds, avg_speeds, worst_speeds])

# # Create the table in the plot
# table = ax.table(cellText=table_data, colLabels=["Model", "Best Speed (s)", "Average Speed (s)", "Worst Speed (s)"],
#                  cellLoc='center', loc='center', colColours=["palegreen"] * 4)
# table.auto_set_font_size(False)
# table.set_fontsize(10)
# table.scale(1.2, 1.2)  # Scale table size

# plt.title("Model Speed Metrics")
# plt.show()

# Set up the figure and axis for the table
fig, ax = plt.subplots(figsize=(14, 12))  # Adjust size as needed
ax.axis('tight')
ax.axis('off')

# The table data: transpose the array to make each column a different metric
table_data = np.transpose([models, best_inference_speeds, avg_inference_speeds, worst_inference_speeds])

# Create the table in the plot
table = ax.table(cellText=table_data, colLabels=["Model", "Best Inference Speed (s)", "Average Inference Speed (s)", "Worst Inference Speed (s)"],
                 cellLoc='center', loc='center', colColours=["palegreen"] * 4)
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1.2, 1.2)  # Scale table size

plt.title("Model Inference Speed Metrics")
plt.savefig("Analysis/Experiment1/i7_14700KF/Experiment1_Time_Table.png", bbox_inches='tight')
plt.show()

# This code is used to remap the data as a latex table sorted by average precision in descending order.
data = []
for i in range(len(models)):
    data.append([models[i], f'{avg_precisions[i]:.2f}', f'{avg_inference_speeds[i]:.5f}'])

# LaTeX model names mapping
latex_model_names = {
    'fcos_resnet': 'FCOS ResNet50 FPN',
    'retinanet_resnet': 'RetinaNet ResNet50 FPN',
    'retinanet_resnet_v2': 'RetinaNet ResNet50 FPN V2',
    'faster_rcnn_resnet': 'Faster R-CNN ResNet50 FPN',
    'faster_rcnn_resnet_v2': 'Faster R-CNN ResNet50 FPN V2',
    'faster_rcnn_mobilenet_v3': 'Faster R-CNN MobileNet V3 L',
    'faster_rcnn_mobilenet_v3_320': 'Faster R-CNN MobileNet V3 L 320',
    'mask_rcnn_resnet': 'Mask R-CNN ResNet50',
    'mask_rcnn_resnet_v2': 'Mask R-CNN ResNet50 FPN V2',
    'ssd_vgg16': 'SSD VGG16',
    'ssd_mobilenet_v3': 'SSDLite MobileNet V3 Large',
    'yolov5nu': 'YOLOv5nu',
    'yolov5su': 'YOLOv5su',
    'yolov5mu': 'YOLOv5mu',
    'yolov5lu': 'YOLOv5lu',
    'yolov5xu': 'YOLOv5xu',
    'yolov5n6u': 'YOLOv5n6u',
    'yolov5s6u': 'YOLOv5s6u',
    'yolov5m6u': 'YOLOv5m6u',
    'yolov5l6u': 'YOLOv5l6u',
    'yolov5x6u': 'YOLOv5x6u',
    'yolov8n': 'YOLOv8n',
    'yolov8s': 'YOLOv8s',
    'yolov8m': 'YOLOv8m',
    'yolov8l': 'YOLOv8l',
    'yolov8x': 'YOLOv8x',
    'yolov9c': 'YOLOv9c',
    'yolov9e': 'YOLOv9e'
}

# Sort data by average precision in descending order
data_sorted = sorted(data, key=lambda x: float(x[1]), reverse=True)

# Generate LaTeX table
latex_table = "\\begin{table}[H]\n\\centering\n\\begin{tabular}{lrr}\n\\toprule\n"
latex_table += "\\multicolumn{3}{c}{A100 Experiment 1a: Vehicle Detection Models Performance}\\\\ \\cmidrule{1-3}\n"
latex_table += "Model & Average Precision (\\%) & Average Inference Speed (s)\\\\\n\\midrule\n"

best_prec_index = 0
best_speed = [99999.99, 0]
for i in range(len(data_sorted)):
    if float(data_sorted[i][2])<best_speed[0]:
        best_speed = [float(data_sorted[i][2]), i]

count = 0
for entry in data_sorted:
    model = latex_model_names[entry[0]]
    avg_precision = float(entry[1])
    avg_speed = float(entry[2])

    if count==best_prec_index:
        if count==best_speed[1]:
            latex_table += f"\\textbf{{{model}}} & \\textbf{{{avg_precision:.2f}}} & \\textbf{{{avg_speed:.5f}}} \\\\ \\addlinespace\n"
        else:
            latex_table += f"\\textbf{{{model}}} & \\textbf{{{avg_precision:.2f}}} & {avg_speed:.5f} \\\\ \\addlinespace\n"
    else:
        if count==best_speed[1]:
            latex_table += f"\\textbf{{{model}}} & {avg_precision:.2f} & \\textbf{{{avg_speed:.5f}}} \\\\ \\addlinespace\n"
        else:
            latex_table += f"{model} & {avg_precision:.2f} & {avg_speed:.5f} \\\\ \\addlinespace\n"

    count+=1

latex_table += "\\bottomrule\n\\end{tabular}\n\\caption{A100 Experiment 1a: Vehicle Detection Models Performance}\n\\label{table:a100_experiment_1a_vehicle_detection_performance}\n\\end{table}"

print(latex_table)

# This is the source code for OCR implementation

In [None]:
import cv2
import IPython
import numpy as np
from typing import Tuple, Union
import math
from PIL import Image
import pytesseract
import traceback
import re
import torch
import torch.backends.cudnn as cudnn
import torch.utils.data
import torch.nn.functional as F
import matplotlib.pyplot as plt

import torchvision.transforms as transforms

from deskew import determine_skew

import time

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

import easyocr
reader = easyocr.Reader(['en', 'de'])

from ultralytics import YOLO
yolo_model = YOLO('yolov8x.pt')
# plate_model = YOLO('best_plate_model.pt')
plate_model = YOLO('best_3.pt')

detected_plates = []

# Install this from "https://github.com/UB-Mannheim/tesseract/wiki"
pytesseract.pytesseract.tesseract_cmd = 'C:\\Program Files\\Tesseract-OCR\\tesseract.exe'

target_classes = ["truck", "bus", "car", "train", "bicycle"]

characters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"

# You can also clone this and try it like I did too: https://github.com/clovaai/deep-text-recognition-benchmark

# get grayscale image
def get_grayscale(image):
    return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

# noise removal
def remove_noise(image):
    return cv2.medianBlur(image,5)

#thresholding
def thresholding(image):
    return cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]

#dilation
def dilate(image):
    kernel = np.ones((5,5),np.uint8)
    return cv2.dilate(image, kernel, iterations = 1)

#erosion
def erode(image):
    kernel = np.ones((5,5),np.uint8)
    return cv2.erode(image, kernel, iterations = 1)

#opening - erosion followed by dilation
def opening(image):
    kernel = np.ones((5,5),np.uint8)
    return cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel)

#canny edge detection
def canny(image):
    return cv2.Canny(image, 100, 200)

#skew correction
def deskew(image):
    coords = np.column_stack(np.where(image > 0))
    angle = cv2.minAreaRect(coords)[-1]
    if angle < -45:
        angle = -(90 + angle)
    else:
        angle = -angle
    (h, w) = image.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    rotated = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
    return rotated

def compute_skew(src_img):
    if len(src_img.shape) == 3:
        h, w, _ = src_img.shape
    elif len(src_img.shape) == 2:
        h, w = src_img.shape
    else:
        print('unsupported image type')
    img = cv2.medianBlur(src_img, 3)
    edges = cv2.Canny(img,  threshold1 = 30,  threshold2 = 100, apertureSize = 3, L2gradient = True)
    lines = cv2.HoughLinesP(edges, 1, math.pi/180, 30, minLineLength=w / 4.0, maxLineGap=h/4.0)
    angle = 0.0
    nlines = lines.size
    #print(nlines)
    cnt = 0
    for x1, y1, x2, y2 in lines[0]:
        ang = np.arctan2(y2 - y1, x2 - x1)
        #print(ang)
        if math.fabs(ang) <= 30: # excluding extreme rotations
            angle += ang
            cnt += 1
    if cnt == 0:
        return 0.0
    return (angle / cnt)*180/math.pi

def rotate_image(image, angle):
    image_center = tuple(np.array(image.shape[1::-1]) / 2)
    rot_mat = cv2.getRotationMatrix2D(image_center, angle, 1.0)
    result = cv2.warpAffine(image, rot_mat, image.shape[1::-1], flags=cv2.INTER_LINEAR)
    return result

def get_deskew_angle(image):
    coords = np.column_stack(np.where(image > 0))
    angle = cv2.minAreaRect(coords)[-1]
    if angle<-45: angle=-angle
    return angle

def deskew_image(image, angle):
    (h, w) = image.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    return cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)

#template matching
def match_template(image, template):
    return cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)

def rotate(
        image: np.ndarray, angle: float, background: Union[int, Tuple[int, int, int]]
) -> np.ndarray:
    old_width, old_height = image.shape[:2]
    angle_radian = math.radians(angle)
    width = abs(np.sin(angle_radian) * old_height) + abs(np.cos(angle_radian) * old_width)
    height = abs(np.sin(angle_radian) * old_width) + abs(np.cos(angle_radian) * old_height)

    image_center = tuple(np.array(image.shape[1::-1]) / 2)
    rot_mat = cv2.getRotationMatrix2D(image_center, angle, 1.0)
    rot_mat[1, 2] += (width - old_width) / 2
    rot_mat[0, 2] += (height - old_height) / 2
    return cv2.warpAffine(image, rot_mat, (int(round(height)), int(round(width))), borderValue=background)

# NN Layer License Plate Recognition "OCR"
def process_license(processed_frame, bounding_box, name, confidence, parentName, parentConfidence):
    x1, y1, x2, y2 = [int(x) for x in bounding_box]
    cv2.rectangle(processed_frame,(x1,y1),(x2,y2),(255,0,255),2)
    cv2.putText(processed_frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+10,y1-15),0,0.9,(255,0,255),2)

    # Rotation
    # license_frame = deskew(processed_frame[y1:y2,x1:x2])
    
    license_frame = processed_frame[y1:y2,x1:x2]

    grayscaled = get_grayscale(license_frame) # get_grayscale(np.array(license_frame))
    # angle = determine_skew(grayscaled)
    threshholded = thresholding(grayscaled) # deskew_image(thresholding(grayscaled), angle)

    # newdata=pytesseract.image_to_osd(threshholded)
    # angle=float(re.search('(?<=Rotate: )\d+', newdata).group(0))
    # print('osd angle:',angle)

    # angle = compute_skew(thresholding(grayscaled))
    # threshholded = rotate_image(thresholding(grayscaled), angle)
    # dilated = dilate(threshholded)
    # plt.imshow(deskew(threshholded))

    # print(angle)
    # plt.imshow(threshholded)
    # plt.show()

    max_conf = 0.0
    best_text = ""

    results = pytesseract.image_to_data(threshholded, config='-c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
    # NN Layer License Plate Recognition "Pytesseract OCR"

    parsedLines = results.split('\n')
    best_depth = len(parsedLines)-2
    for line in parsedLines:
        params = line.split()
        if len(params)==12 and float(params[10].replace('conf','0.0'))>=max_conf:
            max_conf = float(params[10].replace('conf','0.0'))
            best_text = params[11] if params[11]!="text" else "-"
            if max_conf>=0.0 and len(best_text)>0:
                detected_plates.append([parentConfidence*100, parentName.upper(), confidence*100, name.upper(), max_conf, best_text, "pytesseract"])
    cv2.putText(processed_frame, '{} {:.2f}%'.format("[tesse]: "+best_text, max_conf),(x1+10,y2+25),0,0.9,(0,0,255),3)

    result = reader.readtext(threshholded)
    text = ""
    conf = 0.0
    for res in result:
        if res[2]>conf:
            conf=res[2]
            text=res[1]
            if res[2]*100>max_conf:
                max_conf = res[2]*100
                best_text = res[1]
                if max_conf>=0.0 and len(best_text)>0:
                    detected_plates.append([parentConfidence*100, parentName.upper(), confidence*100, name.upper(), max_conf, best_text, "easyocr"])
    cv2.putText(processed_frame, '{} {:.2f}%'.format("[easy]: "+str(text), conf*100),(x1+10,y2+57),0,0.9,(200,200,200),3)

# NN Layer Vehicle Found -> License Detection
def process_vehicle(processed_frame, bounding_box, name, confidence):

    # Yolov8n Bounding Boxes
    x1, y1, x2, y2 = [int(x) for x in bounding_box]
    cv2.rectangle(processed_frame,(x1,y1),(x2,y2),(0,255,255),2)

    cv2.putText(processed_frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+10,y1+25),0,0.8,(0,255,255),2)
    vehicle_frame = processed_frame[y1:y2,x1:x2]
    results = plate_model(vehicle_frame, agnostic_nms=True, verbose=False)[0]
    for result in results:
        detection_count = result.boxes.shape[0]
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())
            name2 = result.names[cls]
            confidence2 = float(result.boxes.conf[i].item())
            bounding_box2 = result.boxes.xyxy[i].cpu().numpy()
            a1, b1, a2, b2 = [int(x) for x in bounding_box2]
            # Ensure a1 < a2 and b1 < b2
            a1, a2 = min(a1, a2), max(a1, a2)
            b1, b2 = min(b1, b2), max(b1, b2)
            # if name2=="License_Plate": process_license(processed_frame, [x1+a1, y1+b1, x1+a2, y1+b2], name2, confidence2, name, confidence)
            process_license(processed_frame, [x1+a1, y1+b1, x1+a2, y1+b2], name2, confidence2, name, confidence)

# NN Layer Other Objects Detected [Not target classes]
def process_detection(processed_frame, bounding_box, name, confidence):
    x1, y1, x2, y2 = [int(x) for x in bounding_box]
    cv2.rectangle(processed_frame,(x1,y1),(x2,y2),(255,255,0),2)
    cv2.putText(processed_frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+10,y1+25),0,0.8,(255,255,0),2)

# NN Layer Yolov8 Detection
def yolov8n_objects(frame):
    processed_frame = frame
    start_time = time.time()
    results = yolo_model(processed_frame, agnostic_nms=True, verbose=False)[0]
    end_time = time.time()
    processing_time = end_time - start_time
    # print(f"Processing time yolov8: {processing_time:.3f} seconds")  # Output processing time
    for result in results:
        detection_count = result.boxes.shape[0]
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())
            name = result.names[cls]
            confidence = float(result.boxes.conf[i].item())
            bounding_box = result.boxes.xyxy[i].cpu().numpy()
            x1, y1, x2, y2 = [int(x) for x in bounding_box]
            # Ensure x1 < x2 and y1 < y2
            x1, x2 = min(x1, x2), max(x1, x2)
            y1, y2 = min(y1, y2), max(y1, y2)
            if name in target_classes: process_vehicle(processed_frame, bounding_box, name, confidence)
            else: process_detection(processed_frame, bounding_box, name, confidence)
    return processed_frame

def process_frame(frame):
    processed_frame = yolov8n_objects(frame)
    # processed_frame = cv2.flip(processed_frame, 1)
    return processed_frame

def handleFrame(frame):
    processed_frame = process_frame(frame)
    _, processed_frame = cv2.imencode('.jpeg', processed_frame)
    display_handle.update(IPython.display.Image(data=processed_frame.tobytes()))
    IPython.display.clear_output(wait=True)

def handleRelease():
    cam.release()
    print("Source released.")
    correct_plates = []
    better_plates = []
    best_plate = []
    most_repeated_plates = []
    best_repeated_plates = []
    highest_score = 0.0
    if len(detected_plates)>0:
        repetitions = {}
        count, item = 0, ''
        for plate in detected_plates:
            if plate[0]>50.0 and plate[2]>50.0 and plate[4]>50.0: correct_plates.append(plate)
            if plate[0]>70.0 and plate[2]>70.0 and plate[4]>70.0: better_plates.append(plate)
            if plate[0] + plate[2] + plate[4] > highest_score:
                highest_score = plate[0] + plate[2] + plate[4]
                best_plate = plate
            repetitions[plate[5]] = repetitions.get(plate[5], 0) + 1
            if repetitions[plate[5]]>count: count, item = repetitions[plate[5]], plate[5]
        if len(repetitions.keys())>0:
            for itm in repetitions.keys():
                if repetitions[itm]==count:
                    repeated_detected = [plate for plate in detected_plates if plate[5]==itm]
                    best_percentages = []
                    for plate in repeated_detected:
                        if len(best_percentages)==0: best_percentages=plate
                        elif (plate[0] + plate[2] + plate[4])/3.0>(best_percentages[0] + best_percentages[2] + best_percentages[4])/3.0: best_percentages=plate
                    most_repeated_plates.append(best_percentages)
        if len(most_repeated_plates)>0:
            for plate in most_repeated_plates:
                best_repeated = []
                if len(best_repeated)==0: best_repeated=plate
                elif (plate[0] + plate[2] + plate[4])/3.0>(best_repeated[0] + best_repeated[2] + best_repeated[4])/3.0: best_repeated=plate
            best_repeated_plates.append(best_repeated)
    print("\nDetected Plates: \n", np.matrix(detected_plates))
    print("\nCorrectly Detected Plates: \n", np.matrix(correct_plates))
    print("\nBetter Detected Plates: \n", np.matrix(better_plates))
    print("\nBest Detected Plate: \n", np.matrix(best_plate))
    print("\nMost Repeated Plates: \n", np.matrix(most_repeated_plates))
    print("\nBest Repeated Plate: \n", np.matrix(best_repeated_plates))

inputType="video"
# cam = cv2.VideoCapture(0)
source = "data/video/sample.MP4" if inputType=="video" else 0
cam = cv2.VideoCapture(source)
display_handle=display(None, display_id=True)
every = 60

start = 0 if source!=0 else -1
end = int(cam.get(cv2.CAP_PROP_FRAME_COUNT)) if source!=0 else -1
if source!=0: cam.set(1, start)

frameCount=start
while_safety=0
saved_count=0

if not cam.isOpened(): print("Error: Could not open source.")
else:
    try:
        while True if end<0 else frameCount<end:
            _, frame = cam.read()
            if frame is None:
                if source!=0:
                    if while_safety > 2000: break
                    while_safety += 1
                    continue
                else:
                    print("Error: Could not capture frame.")
                    cam.release()
                    break

            if every>0:
                if (frameCount+1)%math.floor(every) == 0:
                    while_safety = 0
                    handleFrame(frame)
            else:
                while_safety=0
                handleFrame(frame)
            frameCount += 1
        handleRelease()
    except KeyboardInterrupt:
        handleRelease()
    except:
        print("Unknown error.")
        try:
            cam.release()
            raise TypeError("Error: Source could not be released.")
        except:
            pass
        traceback.print_exc()

# This code uses the model detection to recognize license plate characters

In [None]:
import cv2
import IPython
import numpy as np
from typing import Tuple, Union
import math
from PIL import Image
import traceback

import torch
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

from ultralytics import YOLO
model = YOLO('yolov8x.pt')
plate_model = YOLO('vehicle_license_best.pt')
characters_model = YOLO('plate_char/yolov8x/weights/best.pt')

detected_plates = []

target_classes = ["truck", "bus", "car", "train", "bicycle"]

# NN Layer License Plate Recognition "CNN"
def process_license(processed_frame, bounding_box, name, confidence, parentName, parentConfidence):
    x1, y1, x2, y2 = [int(x) for x in bounding_box]
    cv2.rectangle(processed_frame,(x1,y1),(x2,y2),(255,0,255),2)
    cv2.putText(processed_frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+10,y1-15),0,0.7,(255,0,255))

    license_frame = processed_frame[y1:y2,x1:x2]

    plate_parts=[]
    plate_text=""

    results = characters_model(license_frame, agnostic_nms=True, verbose=False, device=device)[0]
    for result in results:
        detection_count = result.boxes.shape[0]
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())
            name2 = result.names[cls]
            confidence2 = float(result.boxes.conf[i].item())
            bounding_box2 = result.boxes.xyxy[i].cpu().numpy()
            a1, b1, a2, b2 = [int(x) for x in bounding_box2]
            # Ensure a1 < a2 and b1 < b2
            a1, a2 = min(a1, a2), max(a1, a2)
            b1, b2 = min(b1, b2), max(b1, b2)
            if confidence2>0.5:
                cv2.rectangle(processed_frame,(x1+a1,y1+b1),(x1+a2,y1+b2),(0,0,255),1)
                cv2.putText(processed_frame, '{}'.format(name2),(x1+a1,y1+b1+75),0,0.9,(0,0,255),2)
                cv2.putText(processed_frame, '{:.2f}%'.format(confidence2*100),(x1+a1,y1+b1+100+25*i),0,0.7,(0,0,255),2)
                plate_parts.append([a1,b1,name2])

    sorted_list = sorted(plate_parts,key=lambda l:l[0])
    for part in range(len(sorted_list)):
        if sorted_list[part][2]!="undefined":
            plate_text+=sorted_list[part][2]
    detected_plates.append(plate_text)

# NN Layer Vehicle Found -> License Detection
def process_vehicle(processed_frame, bounding_box, name, confidence):
    x1, y1, x2, y2 = [int(x) for x in bounding_box]
    cv2.rectangle(processed_frame,(x1,y1),(x2,y2),(0,255,255),2)
    cv2.putText(processed_frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+10,y1-15),0,0.7,(0,255,255))
    vehicle_frame = processed_frame[y1:y2,x1:x2]
    results = plate_model(vehicle_frame, agnostic_nms=True, verbose=False, device=device)[0]
    for result in results:
        detection_count = result.boxes.shape[0]
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())
            name2 = result.names[cls]
            confidence2 = float(result.boxes.conf[i].item())

            if confidence2 > 0.5:
                bounding_box2 = result.boxes.xyxy[i].cpu().numpy()
                a1, b1, a2, b2 = [int(x) for x in bounding_box2]
                # Ensure a1 < a2 and b1 < b2
                a1, a2 = min(a1, a2), max(a1, a2)
                b1, b2 = min(b1, b2), max(b1, b2)
                # if name2=="License_Plate":
                process_license(processed_frame, [x1+a1, y1+b1, x1+a2, y1+b2], name2, confidence2, name, confidence)

# NN Layer Other Objects Detected
def process_detection(processed_frame, bounding_box, name, confidence):
    x1, y1, x2, y2 = [int(x) for x in bounding_box]
    cv2.rectangle(processed_frame,(x1,y1),(x2,y2),(255,255,0),2)
    cv2.putText(processed_frame, '{} {:.2f}%'.format(name.upper(), confidence*100),(x1+10,y1-15),0,0.7,(255,255,0))

# NN Layer Yolov8 Detection
def process_frame(frame):
    processed_frame = frame
    # processed_frame = cv2.flip(processed_frame, 1)
    results = model(processed_frame, agnostic_nms=True, verbose=False, device=device)[0]
    for result in results:
        detection_count = result.boxes.shape[0]
        for i in range(detection_count):
            cls = int(result.boxes.cls[i].item())
            name = result.names[cls]
            confidence = float(result.boxes.conf[i].item())
            bounding_box = result.boxes.xyxy[i].cpu().numpy()
            x1, y1, x2, y2 = [int(x) for x in bounding_box]
            # Ensure x1 < x2 and y1 < y2
            x1, x2 = min(x1, x2), max(x1, x2)
            y1, y2 = min(y1, y2), max(y1, y2)
            if name in target_classes: process_vehicle(processed_frame, bounding_box, name, confidence)
            else: process_detection(processed_frame, bounding_box, name, confidence)
    return processed_frame

def handleFrame(frame):
    processed_frame = process_frame(frame)
    _, processed_frame = cv2.imencode('.jpeg', processed_frame)
    display_handle.update(IPython.display.Image(data=processed_frame.tobytes()))
    IPython.display.clear_output(wait=True)

def handleRelease():
    cam.release()
    print("Source released.")
    print()
    print(detected_plates)
    # These can be trimmed with empty detections and then compared by most repeated to get the correct one

inputType="video"
# cam = cv2.VideoCapture(0)
source = "data/video/sample.MP4" if inputType=="video" else 0
cam = cv2.VideoCapture(source)
display_handle=display(None, display_id=True)
every = 60

start = 0 if source!=0 else -1
end = int(cam.get(cv2.CAP_PROP_FRAME_COUNT)) if source!=0 else -1
if source!=0: cam.set(1, start)

frameCount=start
while_safety=0
saved_count=0

if not cam.isOpened(): print("Error: Could not open source.")
else:
    try:
        while True if end<0 else frameCount<end:
            _, frame = cam.read()
            if frame is None:
                if source!=0:
                    if while_safety > 2000: break
                    while_safety += 1
                    continue
                else:
                    print("Error: Could not capture frame.")
                    cam.release()
                    break

            if every>0:
                if (frameCount+1)%math.floor(every) == 0:
                    while_safety = 0
                    handleFrame(frame)
            else:
                while_safety=0
                handleFrame(frame)
            frameCount += 1
        handleRelease()
    except KeyboardInterrupt:
        handleRelease()
    except:
        print("Unknown error.")
        try:
            cam.release()
            raise TypeError("Error: Source could not be released.")
        except:
            pass
        traceback.print_exc()