# Library Setting

## Library Install

In [None]:
!pip install cvzone ultralytics
!pip install lapx>=0.5.2
!pip install kornia

In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

Mounted at /content/drive


## Library Import

In [3]:
import cv2, os, cvzone,torch
import zipfile, math, glob
from ultralytics import YOLO
from tqdm.notebook import tqdm
import numpy as np

import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F

from kornia.feature import LoFTR

In [4]:
device = (
            "cuda"
            if torch.cuda.is_available()
            else "mps"
            if torch.backends.mps.is_available()
            else "cpu"
        )

# Utils

In [5]:
def clear_folder(folder_path):
  files = glob.glob(os.path.join(folder_path, '*'))
  # Loop through the files and delete them
  for file in files:
      try:
          os.remove(file)  # Delete the file
      except Exception as e:
          print(f"Error deleting {file}: {e}")

  # Optionally, you can remove the folder itself (empty folder)
  try:
      os.rmdir(folder_path)
      print(f"Folder {folder_path} removed")
  except Exception as e:
      print(f"Error removing folder {folder_path}: {e}")

In [6]:
def zip_it(folder_to_zip,zip_filename):
  # Create a ZipFile object in write mode
  with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
    # Walk through the folder and its subdirectories
    for root, _, files in os.walk(folder_to_zip):
      for file in tqdm(files,desc="Zip files"):
        # Create the full path to the file
        file_path = os.path.join(root, file)

        # Calculate the relative path inside the zip file
        relative_path = os.path.relpath(file_path, folder_to_zip)

        # Add the file to the zip archive with its relative path
        zipf.write(file_path, relative_path)

In [8]:
def inRange(img_channel,thres_min,thres_max):
  return ((img_channel>=thres_min)&(img_channel<=thres_max)).to(int)

def remove_background(threadholds,img):
    b_min, b_max = threadholds["blue"]
    g_min, g_max = threadholds["green"]
    r_min, r_max = threadholds["red"]

    b_c = inRange(img[:,:,0],b_min, b_max)
    g_c = inRange(img[:,:,1],g_min, g_max)
    r_c = inRange(img[:,:,2],r_min, r_max)

    r= b_c*g_c*r_c
    h,w = r.shape

    return r.reshape(1,h,w)

# Level 1

In [None]:
def search_level_1(filename,yolo_model,folder_save_frame,zip_filename):
  model = YOLO(yolo_model).to(device)

  video = cv2.VideoCapture(filename)

  total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
  bar = tqdm(total=total_frames,desc="Frame extract")
  className = model.names
  frame_count = 1

  while True:
    success, frame = video.read()
    if not success: break
    results = model(frame, stream=True, verbose=False, conf=0.5)
    save_frame = False

    for r in results:
      boxes = r.boxes
      for box in boxes:
          cls = int(box.cls[0])
          if className[cls]!="truck": continue

          x1, y1, x2, y2 = list(map(int, box.xyxy[0]))
          w,h = x2-x1, y2-y1
          cvzone.cornerRect(frame, (x1,y1,w,h), l=9)
          conf = math.ceil((box.conf[0]*100))/100

          cvzone.putTextRect(frame,f"{className[cls]}|{conf}",(max(0,x1),max(35,y1)))
          save_frame = True
    if save_frame:
      frame_filename = os.path.join(folder_save_frame, f'frame_{frame_count}.jpg')
      cv2.imwrite(frame_filename, frame)

    bar.update(1)
    frame_count+=1
  zip_it(folder_save_frame,zip_filename)

In [None]:
filename = "/content/drive/MyDrive/ISODS - Entrance Test/Information entrance test/Task 1/Videos/Video 1.mp4"
yolo_model = 'yolov8l.pt'

folder_save_frame = "./video1/truck"
clear_folder(folder_save_frame)
os.makedirs(folder_save_frame, exist_ok=True)

zip_filename = "/content/drive/MyDrive/ISODS - Entrance Test/video1_truck.zip"

search_level_1(filename,yolo_model,folder_save_frame,zip_filename)

In [None]:
filename = "/content/drive/MyDrive/ISODS - Entrance Test/Information entrance test/Task 1/Videos/Video 2.mp4"
yolo_model = 'yolov8l.pt'

folder_save_frame = "./video2/truck"
clear_folder(folder_save_frame)
os.makedirs(folder_save_frame, exist_ok=True)

zip_filename = "/content/drive/MyDrive/ISODS - Entrance Test/video2_truck.zip"

search_level_1(filename,yolo_model,folder_save_frame,zip_filename)

# Level 2

In [None]:
threadholds = {
    "blue":(0,50),
    "green":(0,50),
    "red":(120,255)
}


def search_level_2(filename,yolo_model,folder_save_frame,zip_filename):
  model = YOLO(yolo_model).to(device)

  video = cv2.VideoCapture(filename)

  total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
  bar = tqdm(total=total_frames,desc="Frame extract")
  className = model.names
  frame_count = 1

  while True:
    success, frame = video.read()
    if not success: break
    results = model(frame, stream=True, verbose=False, conf=0.5)
    save_frame = False

    for r in results:
      boxes = r.boxes
      object_masks = r.masks

      for box_index, box in enumerate(boxes):
          cls = int(box.cls[0])
          if className[cls]!="truck": continue

          x1, y1, x2, y2 = list(map(int, box.xyxy[0]))

          object_mask = object_masks[box_index].data.cpu().numpy()

          object_mask_channels, object_mask_height, object_mask_width  = object_mask.shape
          resized_mask = object_mask.reshape(object_mask_height, object_mask_width)

          resized_mask = cv2.resize(resized_mask, (frame.shape[1],frame.shape[0]), interpolation=cv2.INTER_LINEAR)

          frame_mask = frame.copy()*resized_mask.reshape(resized_mask.shape[0],resized_mask.shape[1],1)
          roi_mask = frame_mask[y1:y2, x1:x2]

          red_mask = remove_background(threadholds, torch.from_numpy(roi_mask).to(device)).cpu().numpy()

          red_pixel_count = np.count_nonzero(red_mask)
          object_pixel_count = np.count_nonzero(resized_mask)

          red_percentage = (red_pixel_count / object_pixel_count)
          del red_mask,object_mask, frame_mask, roi_mask, resized_mask
          if red_percentage<0.25: continue
          w,h = x2-x1, y2-y1
          cvzone.cornerRect(frame, (x1,y1,w,h), l=9)
          conf = math.ceil((box.conf[0]*100))/100

          cvzone.putTextRect(frame,f"red {className[cls]}|{conf}",(max(0,x1),max(35,y1)))
          save_frame = True
    if save_frame:
      frame_filename = os.path.join(folder_save_frame, f'frame_{frame_count}.jpg')
      cv2.imwrite(frame_filename, frame)

    bar.update(1)
    frame_count+=1
  zip_it(folder_save_frame,zip_filename)

In [None]:
filename = "/content/drive/MyDrive/(ISODS) Entrance-Test/Information entrance test/Task 1/Videos/Video 1.mp4"
yolo_model = 'yolov8n-seg.pt'

folder_save_frame = "./video1/red_truck"
clear_folder(folder_save_frame)
os.makedirs(folder_save_frame, exist_ok=True)

zip_filename = "/content/drive/MyDrive/(ISODS) Entrance-Test/Task 1 - Results/Video 1/Red_truck.zip"
if os.path.exists(zip_filename):
  os.remove(zip_filename)

search_level_2(filename,yolo_model,folder_save_frame,zip_filename)

In [None]:
filename = "/content/drive/MyDrive/(ISODS) Entrance-Test/Information entrance test/Task 1/Videos/Video 2.mp4"
yolo_model = 'yolov8n-seg.pt'

folder_save_frame = "./video2/red_truck"
clear_folder(folder_save_frame)
os.makedirs(folder_save_frame, exist_ok=True)

zip_filename = "/content/drive/MyDrive/(ISODS) Entrance-Test/Task 1 - Results/Video 2/Red_truck.zip"
if os.path.exists(zip_filename):
  os.remove(zip_filename)

search_level_2(filename,yolo_model,folder_save_frame,zip_filename)

# Level 3

In [16]:
def search_level_3(target_object_image:str, search_video:str,
                   folder_save_frame:str, zip_filename:str, yolo_model:str):

  model = YOLO(yolo_model).to(device)
  className = model.names
  matcher = LoFTR(pretrained="outdoor").eval().to(device)

  cap = cv2.VideoCapture(search_video)
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  
  matched_object_ids={}
  frame_count = 0
  max_reject = 20

  # Using YOLO to get target object
  reference_image = cv2.imread(target_object_image)
  results = model(reference_image, stream=True, verbose=False, conf=0.25)
  for r in results:
    boxes = r.boxes
    for box_index, box in enumerate(boxes):
      cls = int(box.cls[0])
      if className[cls]!="person": continue

      x1, y1, x2, y2 = list(map(int, box.xyxy[0]))

      # Convert color to gray as requirement of LoFTS
      img0_raw = cv2.cvtColor(reference_image[y1:y2, x1:x2], cv2.COLOR_BGR2GRAY)

  
  bar = tqdm(total=total_frames,desc="Video create")
  
  while True:
      ret, frame = cap.read()
      if not ret: break
      results = model.track(frame, persist=True, verbose=False)
      save_frame=False

      for r in results:
        boxes = r.boxes
        track_ids = boxes.id.int().cpu().tolist()
        
        for box_index, (box, track_id) in enumerate(zip(boxes, track_ids)):
            cls = int(box.cls[0])
            if className[cls]!="person": continue

            # Using YOLO to get suggested object
            x1, y1, x2, y2 = list(map(int, box.xyxy[0]))

            if track_id not in matched_object_ids or matched_object_ids.get(track_id,0)!=-1: # Check whether the object has been accepted or is new
              if matched_object_ids.get(track_id,0)<max_reject: # Check the number of being rejected
                
                img1_raw = cv2.cvtColor(frame[y1:y2, x1:x2], cv2.COLOR_BGR2GRAY) # Convert color to gray as requirement of LoFTS
                
                img0 = torch.from_numpy(img0_raw)[None][None].to(device) / 255.
                img1 = torch.from_numpy(img1_raw)[None][None].to(device) / 255.
                batch = {'image0': img0, 'image1': img1}
                correspondences_dict = matcher(batch)

                score = len(correspondences_dict["confidence"]>0.5)# Get list of confidence scores that are greater than 0.5
                del img1_raw, img1, correspondences_dict

                # Check similarity
                if score > 85:
                  w,h = x2-x1, y2-y1
                  cvzone.cornerRect(frame, (x1,y1,w,h), l=9)
                  conf = math.ceil((box.conf[0]*100))/100
                  cvzone.putTextRect(frame,f"Target person",(max(0,x1),max(35,y1)),scale = 2, thickness = 2)
                  save_frame = True
                  matched_object_ids[track_id] = -1
                else:
                  matched_object_ids[track_id] = matched_object_ids.get(track_id,0) + 1
            else:
              w,h = x2-x1, y2-y1
              cvzone.cornerRect(frame, (x1,y1,w,h), l=9)
              conf = math.ceil((box.conf[0]*100))/100
              cvzone.putTextRect(frame,f"Target person",(max(0,x1),max(35,y1)),scale = 2, thickness = 2)
              save_frame = True
      if save_frame:
        frame_filename = os.path.join(folder_save_frame, f'frame_{frame_count}.jpg')
        cv2.imwrite(frame_filename, frame)
      bar.update(1)
      frame_count+=1
  zip_it(folder_save_frame,zip_filename)

In [None]:
target_object_image = "/content/drive/MyDrive/(ISODS) Entrance-Test/Information entrance test/Task 1/Input/Level 3.jpg"
search_video = "/content/drive/MyDrive/(ISODS) Entrance-Test/Information entrance test/Task 1/Videos/Video 3.mp4"

folder_save_frame = "./video3/target_person"
clear_folder(folder_save_frame)
os.makedirs(folder_save_frame, exist_ok=True)

zip_filename = "/content/drive/MyDrive/(ISODS) Entrance-Test/Task 1 - Results/Video 3/target_person.zip"

search_level_3(target_object_image,search_video,folder_save_frame, zip_filename,'yolov8l.pt')