Imports

In [34]:
from ultralytics import YOLO
import cv2
import pandas as pd
from RealtimeTracking import *
import gc
import easyocr
import numpy as np

Load the model

In [35]:
plate_det_model_path = '/Users/banoczymartin/Library/Mobile Documents/com~apple~CloudDocs/OE/platedetector/models/YOLOv8/yolov8n_90e_cust/runs/detect/train4/weights/best.pt'
testvideo_path = '/Users/banoczymartin/Library/Mobile Documents/com~apple~CloudDocs/OE/platedetector/video_data/testvideo_short.mp4'

vehicle_det_model=YOLO('yolov8n.pt')
vehicle_ids = [2,3,5,7]
plate_det_model = YOLO(plate_det_model_path)

eocr = easyocr.Reader(['en'],gpu=False)

Using CPU. Note: This module is much faster with a GPU.


In [36]:
class VehicleDetection():
    def __init__(self,modeltype: str) -> None:
        self.type = modeltype
        if modeltype == 'YOLOv8':
            self.model = YOLO('yolov8n.pt')
            self.vehicle_ids = [2,3,5,7]
    def Detect(self, frame):
        return self.model(frame)[0]

class LicensePlateReader():
    def __init__(self,modeltype: str) -> None:
        self.type = modeltype
        if modeltype == 'easyocr':
            self.model = easyocr.Reader(['en'],gpu=False)
    def PrepAndRead(self, license_plate):
        #prep
        lp_gray = cv2.cvtColor(license_plate,cv2.COLOR_BGR2GRAY)
        retval,lp_thresholded = cv2.threshold(lp_gray,64,255,cv2.THRESH_BINARY_INV)
        #read
        detections = self.model.readtext(license_plate)
        for det in detections:
            bbox, text, score = det
            formatted_text = text.upper().replace(' ','')
            return formatted_text,score
        lp_Text='error'
        lp_Conf_score = 1
        return lp_Text, lp_Conf_score

Capture on a video

In [37]:
def PlateToVehicle(det_license_plate,det_vehicles):
    lp_x1, lp_y1, lp_x2, lp_y2, lp_score, lp_class_id = det_license_plate
    found=False
    for i in range(len(det_vehicles)):
        v_x1, v_y1, v_x2, v_y2, v_id = det_vehicles[i]
        #inside
        if lp_x1 > v_x1 and lp_y1 > v_y1 and lp_x2 < v_x2 and lp_y2 < v_y2:
            v_index=i
            found=True
            break
    if found:
        return det_vehicles[v_index]
    return -1,-1,-1,-1,-1

def lpPrep(license_plate):
    lp_gray = cv2.cvtColor(license_plate,cv2.COLOR_BGR2GRAY)
    retval,lp_threshold = cv2.threshold(lp_gray,64,255,cv2.THRESH_BINARY_INV)
    return lp_threshold

def lpRead(license_plate):
    eocr_detections = eocr.readtext(license_plate)
    for eocr_det in eocr_detections:
        bbox, text, score = eocr_det
        formatted_text = text.upper().replace(' ','')
        return formatted_text,score
    lp_Text='asd012'
    lp_Conf_score = 1
    return lp_Text, lp_Conf_score

In [38]:
gc.collect()

12488

In [39]:
# OOP
df_cols = ['fr_number','v_id', 'v_bbox','lp_bbox','lp_bbox_score','lp','lp_score']
df_rows = []
# load using Sort
video_capture = cv2.VideoCapture(testvideo_path)
vehicle_detection = VehicleDetection('YOLOv8')

lp_reader = LicensePlateReader('easyocr')

mot_trckr = Sort()
#results = {}
ret = True
frame_indexer=0
while ret:
    if (video_capture.isOpened()== False):
        print("Error opening video stream or file")
    
    ret, frame = video_capture.read()
    
    if ret:
        #if frame_indexer>10:
        #    break
        #results[frame_indexer] = {}
        #det
        det_objs = vehicle_detection.Detect(frame)
        vehicles=[]
        for det_obj in det_objs.boxes.data.tolist():
            obj_x1, obj_y1, obj_x2, obj_y2, obj_score, obj_class_id = det_obj
            if int(obj_class_id) in vehicle_detection.vehicle_ids:
                vehicles.append([obj_x1, obj_y1, obj_x2, obj_y2, obj_score])
                #print(obj_class_id)
        
        track_ids = mot_trckr.update(np.asarray(vehicles))
        #print(track_ids)
        #plates=[]
        det_lps = plate_det_model(frame)[0]
        for det_lp in det_lps.boxes.data.tolist():
            lp_x1, lp_y1, lp_x2, lp_y2, lp_score, lp_class_id = det_lp
            
            #assign license plate to vehicle
            vehicle_x1,vehicle_y1,vehicle_x2,vehicle_y2,vehicle_id = PlateToVehicle(det_lp,track_ids)
            if vehicle_id is not -1:
                #crop
                license_plate = frame[int(lp_y1):int(lp_y2),int(lp_x1):int(lp_x2),:]
                #prepared = lpPrep(license_plate)
                #cv2.imshow('lp',license_plate)
                #cv2.imshow('lpprep',prepared)
                #cv2.waitKey(0)

                lp_text, lp_text_confscore = lp_reader.PrepAndRead(license_plate)
                if lp_text is not None:
                    df_rows.append({'fr_number':frame_indexer,
                                    'v_id':int(vehicle_id+1),
                                    'v_bbox':[vehicle_x1,vehicle_y1,vehicle_x2,vehicle_y2],
                                    'lp_bbox':[lp_x1, lp_y1, lp_x2, lp_y2],
                                    'lp_bbox_score':lp_score,
                                    'lp':lp_text,
                                    'lp_score':lp_text_confscore})
                
            
    
    frame_indexer+=1

Using CPU. Note: This module is much faster with a GPU.
  if vehicle_id is not -1:

0: 480x640 15 cars, 2 buss, 2 trucks, 123.9ms
Speed: 8.0ms preprocess, 123.9ms inference, 11.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 license, 112.3ms
Speed: 4.2ms preprocess, 112.3ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 17 cars, 2 buss, 1 truck, 123.5ms
Speed: 3.3ms preprocess, 123.5ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 license, 121.6ms
Speed: 3.6ms preprocess, 121.6ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 16 cars, 2 buss, 1 truck, 111.1ms
Speed: 3.6ms preprocess, 111.1ms inference, 1.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 license, 123.7ms
Speed: 3.2ms preprocess, 123.7ms inference, 0.9ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 13 cars, 2 buss, 1 truck, 113.3ms
Speed: 3.7ms preprocess, 113.3ms inference, 1.0ms 

In [None]:
# OOP
df_cols = ['fr_number','v_id', 'v_bbox','lp_bbox','lp_bbox_score','lp','lp_score']
df_rows = []
# load using Sort
video_capture = cv2.VideoCapture(testvideo_path)
vehicle_detection = VehicleDetection('YOLOv8')

mot_trckr = Sort()
#results = {}
ret = True
frame_indexer=0
while ret:
    if (video_capture.isOpened()== False):
        print("Error opening video stream or file")
    
    ret, frame = video_capture.read()
    
    if ret:
        #if frame_indexer>10:
        #    break
        #results[frame_indexer] = {}
        #det
        det_objs = vehicle_detection.Detect(frame)
        vehicles=[]
        for det_obj in det_objs.boxes.data.tolist():
            obj_x1, obj_y1, obj_x2, obj_y2, obj_score, obj_class_id = det_obj
            if int(obj_class_id) in vehicle_detection.vehicle_ids:
                vehicles.append([obj_x1, obj_y1, obj_x2, obj_y2, obj_score])
                #print(obj_class_id)
        
        track_ids = mot_trckr.update(np.asarray(vehicles))
        #print(track_ids)
        #plates=[]
        det_lps = plate_det_model(frame)[0]
        for det_lp in det_lps.boxes.data.tolist():
            lp_x1, lp_y1, lp_x2, lp_y2, lp_score, lp_class_id = det_lp
            
            #assign license plate to vehicle
            vehicle_x1,vehicle_y1,vehicle_x2,vehicle_y2,vehicle_id = PlateToVehicle(det_lp,track_ids)
            if vehicle_id is not -1:
                #crop
                license_plate = frame[int(lp_y1):int(lp_y2),int(lp_x1):int(lp_x2),:]
                prepared = lpPrep(license_plate)
                #cv2.imshow('lp',license_plate)
                #cv2.imshow('lpprep',prepared)
                #cv2.waitKey(0)

                lp_text, lp_text_confscore = lpRead(prepared)
                if lp_text is not None:
                    df_rows.append({'fr_number':frame_indexer,
                                    'v_id':int(vehicle_id+1),
                                    'v_bbox':[vehicle_x1,vehicle_y1,vehicle_x2,vehicle_y2],
                                    'lp_bbox':[lp_x1, lp_y1, lp_x2, lp_y2],
                                    'lp_bbox_score':lp_score,
                                    'lp':lp_text,
                                    'lp_score':lp_text_confscore})
                
            
    
    frame_indexer+=1

In [40]:
out = pd.DataFrame(df_rows,columns=df_cols)
out.to_csv('log.csv')

In [41]:
import csv
import numpy as np
from scipy.interpolate import interp1d


def interpolate_bounding_boxes(data):
    # Extract necessary data columns from input data
    frame_numbers = np.array([int(row['fr_number']) for row in data])
    car_ids = np.array([int(float(row['v_id'])) for row in data])
    car_bboxes = np.array([list(map(float, row['v_bbox'][1:-1].split(', '))) for row in data])
    license_plate_bboxes = np.array([list(map(float, row['lp_bbox'][1:-1].split(', '))) for row in data])

    interpolated_data = []
    unique_car_ids = np.unique(car_ids)
    for car_id in unique_car_ids:

        frame_numbers_ = [p['fr_number'] for p in data if int(float(p['v_id'])) == int(float(car_id))]
        print(frame_numbers_, car_id)

        # Filter data for a specific car ID
        car_mask = car_ids == car_id
        car_frame_numbers = frame_numbers[car_mask]
        car_bboxes_interpolated = []
        license_plate_bboxes_interpolated = []

        first_frame_number = car_frame_numbers[0]
        last_frame_number = car_frame_numbers[-1]

        for i in range(len(car_bboxes[car_mask])):
            frame_number = car_frame_numbers[i]
            car_bbox = car_bboxes[car_mask][i]
            license_plate_bbox = license_plate_bboxes[car_mask][i]

            if i > 0:
                prev_frame_number = car_frame_numbers[i-1]
                prev_car_bbox = car_bboxes_interpolated[-1]
                prev_license_plate_bbox = license_plate_bboxes_interpolated[-1]

                if frame_number - prev_frame_number > 1:
                    # Interpolate missing frames' bounding boxes
                    frames_gap = frame_number - prev_frame_number
                    x = np.array([prev_frame_number, frame_number])
                    x_new = np.linspace(prev_frame_number, frame_number, num=frames_gap, endpoint=False)
                    interp_func = interp1d(x, np.vstack((prev_car_bbox, car_bbox)), axis=0, kind='linear')
                    interpolated_car_bboxes = interp_func(x_new)
                    interp_func = interp1d(x, np.vstack((prev_license_plate_bbox, license_plate_bbox)), axis=0, kind='linear')
                    interpolated_license_plate_bboxes = interp_func(x_new)

                    car_bboxes_interpolated.extend(interpolated_car_bboxes[1:])
                    license_plate_bboxes_interpolated.extend(interpolated_license_plate_bboxes[1:])

            car_bboxes_interpolated.append(car_bbox)
            license_plate_bboxes_interpolated.append(license_plate_bbox)

        for i in range(len(car_bboxes_interpolated)):
            frame_number = first_frame_number + i
            row = {}
            row['fr_number'] = str(frame_number)
            row['v_id'] = str(car_id)
            row['v_bbox'] = ' '.join(map(str, car_bboxes_interpolated[i]))
            row['lp_bbox'] = ' '.join(map(str, license_plate_bboxes_interpolated[i]))

            if str(frame_number) not in frame_numbers_:
                # Imputed row, set the following fields to '0'
                row['lp_bbox_score'] = '0'
                row['lp'] = '0'
                row['lp_score'] = '0'
            else:
                # Original row, retrieve values from the input data if available
                original_row = [p for p in data if int(p['fr_number']) == frame_number and int(float(p['v_id'])) == int(float(car_id))][0]
                row['lp_bbox_score'] = original_row['lp_bbox_score'] if 'lp_bbox_score' in original_row else '0'
                row['lp'] = original_row['lp'] if 'lp' in original_row else '0'
                row['lp_score'] = original_row['lp_score'] if 'lp_score' in original_row else '0'

            interpolated_data.append(row)

    return interpolated_data


# Load the CSV file
with open('log.csv', 'r') as file:
    reader = csv.DictReader(file)
    data = list(reader)

# Interpolate missing data
interpolated_data = interpolate_bounding_boxes(data)

# Write updated data to a new CSV file
header = ['fr_number','v_id', 'v_bbox','lp_bbox','lp_bbox_score','lp','lp_score']
with open('log_interpolated.csv', 'w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=header)
    writer.writeheader()
    writer.writerows(interpolated_data)

['3', '4'] 1566
['15', '16'] 1567
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', '91', '92', '93', '94', '95', '96', '97', '98', '99', '100', '101', '102', '103', '104', '105', '106', '107', '108', '109', '110', '111', '112', '113', '114', '115', '116', '117', '118', '119', '120', '121', '122', '123', '124', '125', '126', '127', '128', '129', '129', '130', '130', '131', '132', '133', '134', '135', '136', '137', '138', '139', '140', '141', '142', '143', '144', '145', '146', '147', '148', '149', '150', '151

In [42]:
import cv2
import pandas as pd
import ast

def draw_bounding_boxes(input_video_path, csv_file_path, output_video_path):
    # Read the CSV file into a DataFrame
    df = pd.read_csv(csv_file_path)

    # Open the input video
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        raise Exception(f"Failed to open video file: {input_video_path}")

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define the codec and create a VideoWriter object for the output video
    fourcc = cv2.VideoWriter_fourcc(*"MJPG")  # You can change the codec as needed
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    # Process each frame and draw bounding boxes
    frame_number = 0
    ret=True
    while ret:
        ret, frame = cap.read()

        # Filter the DataFrame for the current frame number
        frame_data = df[df['fr_number'] == frame_number]        

        #print(frame_data)
        for _, row in frame_data.iterrows():
            #car
            car_bbox_vals = row['v_bbox'].split(' ')
            car_bbox_vals[:]=map(lambda x: int(float(x)),car_bbox_vals)
            car_bbox_vals = str(car_bbox_vals)
            car_bbox_vals = "[" + car_bbox_vals[1:-1] + "]"
            car_bbox = ast.literal_eval(car_bbox_vals)


            #license plate
            lp_bbox_vals = row['lp_bbox'].split(' ')
            lp_bbox_vals[:]=map(lambda x: int(float(x)),lp_bbox_vals)
            lp_bbox_vals = str(lp_bbox_vals)
            lp_bbox_vals_str = "[" + lp_bbox_vals[1:-1] + "]"
            license_plate_bbox = ast.literal_eval(lp_bbox_vals_str)

            # Draw bounding boxes on the frame
            #cv2.rectangle(frame, (int(car_bbox[0]), int(car_bbox[1]), (int(car_bbox[2]), int(car_bbox[3]))), (0, 255, 0), 2)
            cv2.rectangle(frame, (int(car_bbox[0]), int(car_bbox[1])), (int(car_bbox[2]), int(car_bbox[3])), (0, 255, 0), 1,cv2.LINE_4)
            cv2.rectangle(frame, (int(license_plate_bbox[0]), int(license_plate_bbox[1])),
                          (int(license_plate_bbox[2]), int(license_plate_bbox[3])), (0, 0, 255), 1)

            lp_bbox_vals = row['lp_bbox'].split(' ')
            lp_bbox_vals[:]=map(lambda x: int(float(x)),lp_bbox_vals)
            car_bbox_vals = row['v_bbox'].split(' ')
            car_bbox_vals[:]=map(lambda x: int(float(x)),car_bbox_vals)
            #print(lp_bbox_vals)
            #x1, y1, x2, y2
            #H, W, _ = lp_bbox_vals.shape
            H = lp_bbox_vals[3]-lp_bbox_vals[1]
            W = lp_bbox_vals[2]-lp_bbox_vals[0]
            #print(H,W)

            try:
                #frame[int(car_bbox_vals[1]) - H - 100:int(car_bbox_vals[1]) - 100,
                #      int((car_bbox_vals[2] + car_bbox_vals[0] - W) / 2):int((car_bbox_vals[2] + car_bbox_vals[0] + W) / 2), :] = license_crop

                #frame[int(car_bbox_vals[1]) - H - 400:int(car_bbox_vals[1]) - H - 100,
                #      int((car_bbox_vals[2] + car_bbox_vals[0] - W) / 2):int((car_bbox_vals[2] + car_bbox_vals[0] + W) / 2), :] = (255, 255, 255)

                license_crop = frame[lp_bbox_vals[1]+2:lp_bbox_vals[3],lp_bbox_vals[0]+2:lp_bbox_vals[2], :]
                #frame[int(car_bbox_vals[1]) - H - 100:int(car_bbox_vals[1]) - 100,
                #      int((car_bbox_vals[2] + car_bbox_vals[0] - W) / 2):int((car_bbox_vals[2] + car_bbox_vals[0] + W) / 2), :] = license_crop
                #cv2.imshow('frame', license_crop)
                #cv2.waitKey(0)

                padding = 10
                border_thickness = 1
                (text_width, text_height), _ = cv2.getTextSize(
                    row['lp'],
                    cv2.FONT_HERSHEY_SIMPLEX,
                    2,
                    5)
                text_x = int((car_bbox[0] + car_bbox[2] - text_width) / 2)
                text_y = int(car_bbox[1] - H + (text_height / 2))

                background_width = car_bbox[2] - car_bbox[0]
                cv2.rectangle(frame, (car_bbox[0], car_bbox[1]-text_height-H), (car_bbox[2], car_bbox[1]), (0, 0, 0), thickness=cv2.FILLED)
                #cv2.rectangle(frame, (text_x, text_y - text_height), (text_x + background_width, text_y), (255, 255, 255), thickness=cv2.FILLED)

                #cv2.rectangle(frame, (text_x-padding, text_y-text_height-padding), (text_x + text_width+padding, text_y+padding), (0, 0, 0), thickness=cv2.FILLED)
                cv2.rectangle(frame, (car_bbox[0], car_bbox[1]-text_height-H), (car_bbox[2], car_bbox[1]), (0, 255, 0), thickness=border_thickness)

                #frame[car_bbox_vals[1]+2:car_bbox_vals[3], car_bbox_vals[0]+2:car_bbox_vals[2], :] = license_crop
                #frame[car_bbox[1]:car_bbox[1]+(lp_bbox_vals[3]-lp_bbox_vals[1]), car_bbox[0]:car_bbox[0]+(lp_bbox_vals[2]-lp_bbox_vals[0]), :] = license_crop
                #x=100
                #y=100
                #alpha = 1.0  # Controls the transparency of the cropped image
                #beta = 1.0 - alpha
                #cropped_height = lp_bbox_vals[3] - lp_bbox_vals[1] - 2
                #cropped_width = lp_bbox_vals[2] - lp_bbox_vals[0] - 2

                #cv2.addWeighted(frame, alpha, license_crop, beta, 0, frame[y:y + cropped_height, x:x + cropped_width])

                if(row['lp'] is not '0'):
                    cv2.putText(frame,
                            row['lp'],
                            (int((car_bbox[0] + car_bbox[2] - text_width) / 2), int(car_bbox[1] - H + (text_height / 2))),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            2,
                            (255, 255, 255),
                            3)

            except:
                pass

        # Write the frame with bounding boxes to the output video
        if frame is not None:
            out.write(frame)
        frame_number += 1
        #frame = cv2.resize(frame, (1280, 720))

        #cv2.imshow('frame', frame)
        #cv2.waitKey(0)

    # Release the video capture and writer objects
    cap.release()
    out.release()
    

    
draw_bounding_boxes(testvideo_path,
                    '/Users/banoczymartin/Library/Mobile Documents/com~apple~CloudDocs/OE/platedetector/Logic/log_interpolated.csv',
                    '/Users/banoczymartin/Library/Mobile Documents/com~apple~CloudDocs/OE/platedetector/video_data/out.avi')


  if(row['lp'] is not '0'):

(<unknown>:4750): GStreamer-CRITICAL **: 23:51:16.549: gst_element_make_from_uri: assertion 'gst_uri_is_valid (uri)' failed

(<unknown>:4750): GStreamer-CRITICAL **: 23:51:16.550: gst_element_make_from_uri: assertion 'gst_uri_is_valid (uri)' failed
[ERROR:0@3599.293] global /private/var/folders/nz/j6p8yfhx1mv_0grj5xl4650h0000gp/T/abs_562_cazh1h/croots/recipe/opencv-suite_1664548333142/work/modules/videoio/src/cap.cpp (597) open VIDEOIO(CV_IMAGES): raised OpenCV exception:

OpenCV(4.6.0) /private/var/folders/nz/j6p8yfhx1mv_0grj5xl4650h0000gp/T/abs_562_cazh1h/croots/recipe/opencv-suite_1664548333142/work/modules/videoio/src/cap_images.cpp:253: error: (-5:Bad argument) CAP_IMAGES: can't find starting number (in the name of file): /Users/banoczymartin/Library/Mobile Documents/com~apple~CloudDocs/OE/platedetector/video_data/out.avi in function 'icvExtractPattern'




In [None]:
writer = cv2.VideoWriter("output.avi", cv2.VideoWriter_fourcc(*"MJPG"), 30,(640,480))

for frame in range(1000):
    writer.write(np.random.randint(0, 255, (480,640,3)).astype('uint8'))

writer.release()