In [126]:
import cv2
import numpy as np
from mediapipe.tasks.python import vision
import mediapipe as mp
import time
from collections import defaultdict
import matplotlib.pyplot as plt

In [83]:
objectDectector_model_filepath = "model/object_detector/efficientdet_lite0_uint8.tflite"
video_filepath = "dataset/video_cars.mp4"
embedder_model_filepath = "model/embedder/mobilenet_v3_small_075_224_embedder.tflite"

In [76]:
ObjecttDetectorOptions = mp.tasks.vision.ObjectDetectorOptions(
    base_options=mp.tasks.BaseOptions(model_asset_path=objectDectector_model_filepath),
    max_results=50,
    score_threshold=0.4,
    running_mode=mp.tasks.vision.RunningMode.VIDEO
)

    

In [77]:
cap = cv2.VideoCapture(video_filepath)
fps = int(cap.get(cv2.CAP_PROP_FPS))
print("FPS: ",fps)


FPS:  60


In [79]:
# setting up video writer for testing the object detector
# We need to set resolutions.
# so, convert them from float to integer.
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
   
size = (frame_width, frame_height)
   
# Below VideoWriter object will create
# a frame of above defined The output 
# # is stored in 'filename.avi' file.
# videoWriter = cv2.VideoWriter('objDetect_video_cars_test.avi', 
#                          cv2.VideoWriter_fourcc(*'MJPG'),
#                          30, size)

In [81]:
frame_idx = 0
all_detections=defaultdict(list)
avg_processing_time = 0
with mp.tasks.vision.ObjectDetector.create_from_options(ObjecttDetectorOptions) as detector:
    while cap.isOpened():
        ret, img = cap.read()
        if ret == True:

            frame_idx+=1
            # Calculate the timestamp of the current frame
            frame_timestamp_ms = int(1000 * frame_idx / fps)
            # print(frame_timestamp_ms)
            time_start = time.perf_counter()
            # Convert the frame received from OpenCV to a MediaPipe’s Image object.
            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=img)

            # Perform object detection on the video frame.
            detection_results = detector.detect_for_video(mp_image, frame_timestamp_ms)
            # print("time taken : ",time.perf_counter()-time_start)
            # print("Detections:",len(detection_results.detections))
            # print("--------------")
            for det_res in detection_results.detections:
                x = det_res.bounding_box.origin_x
                y = det_res.bounding_box.origin_y
                width = det_res.bounding_box.width
                height = det_res.bounding_box.height
                all_detections[frame_idx].append((x,y,x+width,y+height))
                # img = cv2.rectangle(img, (x,y),(x+width,y+height),color=(0,255,0),thickness=2)
            # cv2.imwrite("box_img.png",img)
            # videoWriter.write(img)
            # break
            time_processing = time.perf_counter()-time_start
            avg_processing_time += time_processing
        else:
            print("Not able to read the frame")
            break
cap.release()
# videoWriter.release()
print("time taken : ",avg_processing_time/frame_idx)
# print("Detections:",len(detection_results.detections))
print("--------------")

Not able to read the frame
time taken :  0.09204005826148247
--------------


In [69]:
detection_results.detections[0]

Detection(bounding_box=BoundingBox(origin_x=2269, origin_y=1173, width=497, height=388), categories=[Category(index=None, score=0.62890625, display_name=None, category_name='car')], keypoints=[])

In [87]:
# embeddings:
# Create options for Image Embedder
embedder_base_options = mp.tasks.BaseOptions(model_asset_path=embedder_model_filepath)
l2_normalize = True #@param {type:"boolean"}
quantize = True #@param {type:"boolean"}
options = vision.ImageEmbedderOptions(
    base_options=embedder_base_options, l2_normalize=l2_normalize, quantize=quantize,
    running_mode= mp.tasks.vision.RunningMode.VIDEO)


In [174]:
cap = cv2.VideoCapture(video_filepath)
fps = int(cap.get(cv2.CAP_PROP_FPS))
print("FPS: ",fps)

# setting up video writer for testing the object detector
# We need to set resolutions.
# so, convert them from float to integer.
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
   
size = (frame_width, frame_height)
   
# Below VideoWriter object will create
# a frame of above defined The output 
# # is stored in 'filename.avi' file.
videoWriter = cv2.VideoWriter('objEmbed_video_cars_test.avi', 
                         cv2.VideoWriter_fourcc(*'MJPG'),
                         10, size)

FPS:  60


In [175]:
# Create Image Embedder
frame_idx = 0
prev_embeddings = [] # prev frame as 1, next frame as 2
next_embeddings = []
crop_idx = 0
frame_interval = 10 # after how many frames check similarity
avg_processing_time = 0
car_idx = 0 #car counts detected cars in the video, same car will be not counted more than 1
matching_thresh = 0.7
with vision.ImageEmbedder.create_from_options(options) as embedder:
   while cap.isOpened():
        ret, img = cap.read()
        if ret == True:

            frame_idx+=1
            if (frame_idx-1)%frame_interval==0:

                time_start = time.perf_counter()
                if len(prev_embeddings)==0:
                    # take first frame as the reference for starting tracking
                    for x1,y1,x2,y2 in all_detections[1]:
                        car_idx+=1
                        img_crop = img[y1:y2,x1:x2,:].astype(np.uint8)
                        crop_idx+=1
                        # Calculate the timestamp of the current frame
                        frame_timestamp_ms = int(1000 * crop_idx / fps)
                        # Convert the frame received from OpenCV to a MediaPipe’s Image object.
                        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=img_crop)

                        # Perform object detection on the video frame.
                        embedding_result = embedder.embed_for_video(mp_image, frame_timestamp_ms)

                        # print(int(frame_idx//frame_interval),embedding_result)
                        prev_embeddings.append((img_crop,embedding_result,car_idx,(x1,y1,x2,y2)))
                else:
                    for x1,y1,x2,y2 in all_detections[frame_idx]:
                        img_crop = img[y1:y2,x1:x2,:].astype(np.uint8)
                        crop_idx+=1
                        # Calculate the timestamp of the current frame
                        frame_timestamp_ms = int(1000 * crop_idx / fps)
                        # Convert the frame received from OpenCV to a MediaPipe’s Image object.
                        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=img_crop)

                        # Perform object detection on the video frame.
                        embedding_result = embedder.embed_for_video(mp_image, frame_timestamp_ms)
                        em1 = embedding_result.embeddings[0]
                        best_match = 0
                        new_car_idx = None
                        for im_cr_prev, embed_prev, c_idx, coords in prev_embeddings:
                            em2 = embed_prev.embeddings[0]
                            similarity = vision.ImageEmbedder.cosine_similarity(
                                em1,
                                em2)

                            if similarity>matching_thresh and similarity>best_match:
                                best_match = similarity
                                new_car_idx = c_idx
                        if new_car_idx is None:
                            car_idx+=1
                            new_car_idx = car_idx
                        # print(int(frame_idx//frame_interval),embedding_result)
                        next_embeddings.append((img_crop,embedding_result,new_car_idx,(x1,y1,x2,y2)))
                    prev_embeddings = next_embeddings.copy()
                    next_embeddings = []
     
                for im_cr_prev, embed_prev, c_idx, coords in prev_embeddings:
                    x1 = coords[0]
                    y1 = coords[1]
                    x2 = coords[2]
                    y2 = coords[3]
                    img = cv2.putText(img, f'{c_idx}', org=(x1,y1),
                                        fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                                        fontScale=3,
                                        color=(255,0,0),
                                        thickness=2,
                                        lineType=cv2.LINE_AA)
                    img = cv2.rectangle(img, (x1,y1),(x2,y2),color=(0,255,0),thickness=5)
                videoWriter.write(img)
                try:    
                    avg_processing_time+=(time.perf_counter()-time_start)/((frame_idx-1)//frame_interval)
                except ZeroDivisionError as ZE:
                    avg_processing_time+=(time.perf_counter()-time_start)

        else:
            print("Not able to read the frame")
            break
cap.release()
videoWriter.release()
print("time taken : ",avg_processing_time)

Not able to read the frame
time taken :  1.5047214900066364
