# Bài tập 1

In [None]:
!gdown --id 1pESuZ_SjtR-kcxokNKuBwaSufVtsQCql

Downloading...
From: https://drive.google.com/uc?id=1pESuZ_SjtR-kcxokNKuBwaSufVtsQCql
To: /content/vietnam.mp4
100% 5.47M/5.47M [00:00<00:00, 37.9MB/s]


In [None]:
!pip install ultralytics



## simple version

In [None]:
from collections import defaultdict
import cv2
import numpy as np
from ultralytics import YOLO

In [None]:
model = YOLO("yolo11l.pt")

video_path = "./vietnam.mp4"
cap = cv2.VideoCapture(video_path)

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11l.pt to 'yolo11l.pt'...


100%|██████████| 49.0M/49.0M [00:00<00:00, 276MB/s]


In [None]:
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

video_name = video_path.split("/")[-1]
output_path = f"./{video_name.split('.')[0]}_tracked.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path,fourcc,fps,(width,height))

In [None]:
track_history = defaultdict(lambda:[])

while cap.isOpened():

  success,frame =cap.read()

  if success:
    results = model.track(frame,persist=True,show=False)

    boxes = results[0].boxes.xywh.cpu()
    try:
      track_ids = results[0].boxes.id
      if track_ids is not None:
        track_ids = track_ids.int().cpu().tolist()
      else:
        track_ids = []
    except AttributeError:

      track_ids = []
    annotated_frame = results[0].plot()

    if track_ids:
      for box,track_id in zip(boxes,track_ids):
        x,y,w,h = box
        track = track_history[track_id]
        track.append((float(x),float(y)))

        if len(track)> 120:
          track.pop(0)
        points = np.hstack(track).astype(np.int32).reshape((-1,1,2))
        cv2.polylines(
            annotated_frame,
            [points],
            isClosed=False,
            color = (230,230,230),
            thickness=4,
        )
      out.write(annotated_frame)
  else:
      break
cap.release()
out.release()
print(f"Video has been saved to {output_path}")


Video has been saved to ./vietnam_tracked.mp4


## optimized version

In [None]:
!pip install loguru



In [None]:
import argparse
from collections import defaultdict
import cv2
import numpy as np
from tqdm import tqdm
from ultralytics import YOLO
from loguru import logger

In [None]:
def load_config():
  return {
      "model_path":"yolo11x.pt",
      "track_history_length":120,
      "batch_size":64,
      "line_thickness":4,
      "track_color":(230,230,230)
  }
def initialize_video(video_path):
  cap = cv2.VideoCapture(video_path)
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  fps = int(cap.get(cv2.CAP_PROP_FPS))

  video_name = video_path.split("/")[-1]
  output_path = f"./{video_name.split('.')[0]}_tracked.mp4"
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
  out = cv2.VideoWriter(output_path,fourcc,fps,(width,height))
  return cap,out,output_path

In [None]:
from threading import current_thread
def update_track_history(
    track_history,
    last_seen,
    track_ids,
    frame_count,
    batch_size,
    frame_idx,
    history_length
):
  current_tracks = set(track_ids)
  for track_id in list(track_history.keys()):
    if track_id in current_tracks:
      last_seen[track_id] = frame_count - (batch_size-frame_idx-1)
    elif frame_count-last_seen[track_id] > history_length:
      del track_history[track_id]
      del last_seen[track_id]


In [None]:
def draw_tracks(frame,boxes,track_ids,track_history,config):
  if not track_ids:
    return frame
  for box,track_id in zip(boxes,track_ids):
    x,y,w,h = box
    track = track_history[track_id]
    track.append((float(x),float(y)))
    if len(track) > config['track_history_length']:
      track.pop(0)
    points = np.hstack(track).astype(np.int32).reshape((-1,1,2))
    cv2.polylines(
        frame,
        [points],
        isClosed=False,
        color=config['track_color'],
        thickness=config['line_thickness']
    )
  return frame

In [None]:
def process_batch(model,batch_frames,track_history,last_seen,frame_count,config):
  results = model.track(batch_frames,persist=True,tracker="botsort.yaml",show=False,verbose=False,iou=0.5)
  processed_frames=[]
  for frame_idx,result in enumerate(results):
    boxes = result.boxes.xywh.cpu()
    track_ids = (result.boxes.id.int().tolist() if result.boxes.id is not None else [])

    update_track_history(track_history,last_seen,track_ids,frame_count,len(batch_frames),frame_idx,config['track_history_length'])

    annotated_frame = result.plot(font_size=4,line_width=2)
    annotated_frame = draw_tracks(
        annotated_frame,boxes,track_ids,track_history,config
    )
    processed_frames.append(annotated_frame)
  return processed_frames

In [None]:
def main(video_path):

  CONFIG = load_config()
  model = YOLO(CONFIG.get("model_path","yolo11x.pt"))
  cap,out,output_path = initialize_video(video_path)
  track_history = defaultdict(lambda:[])

  last_seen = defaultdict(int)
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

  with tqdm(total=total_frames,desc="Processing frames",colour="green") as pbar:
    frame_count = 0
    batch_frames = []

    while cap.isOpened():
      success,frame=cap.read()
      if not success:
        break
      batch_frames.append(frame)
      frame_count += 1

      if len(batch_frames) == CONFIG['batch_size'] or frame_count == total_frames:
        try:
          processed_frames = process_batch(model,batch_frames,track_history,last_seen,frame_count,CONFIG)
          for frame in processed_frames:
            out.write(frame)
            pbar.update(1)
          batch_frames = []
        except Exception as e:
          logger.error(f"Error when handling frames {frame_count-len(batch_frames)+1} to {frame_count}: {str(e)}")
          batch_frames = []
          continue
    try:
      cap.release()
      out.release()
      cv2.destroyAllWindows()
      logger.info(f"{output_path}")
    except Exception as e:
      logger.error(f"{str(e)}")
main("./vietnam.mp4")
# if __name__ == "__main__":
#   parser = argparse.ArgumentParser()
#   parser.add_argument("--video-path",type=str,default="samples/vietnam-2.mp4")
#   args = parser.parse_args()
#   main(args.video_path)

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11x.pt to 'yolo11x.pt'...


100%|██████████| 109M/109M [00:00<00:00, 213MB/s]
Processing frames: 100%|[32m█████████▉[0m| 777/780 [01:40<00:00, 15.37it/s][32m2025-01-27 05:14:18.460[0m | [1mINFO    [0m | [36m__main__[0m:[36mmain[0m:[36m37[0m - [1m./vietnam_tracked.mp4[0m
Processing frames: 100%|[32m██████████[0m| 780/780 [01:40<00:00,  7.76it/s]


In [None]:
!python main.py --video-path "samples/vietnam-2.mp4"

python3: can't open file '/content/main.py': [Errno 2] No such file or directory


# Bài tập 2

In [None]:
!gdown --id 1a4vuMZJOC8MGpy_tcRDsmt8-yPtAIW9q

In [None]:
from ultralytics import solutions
import cv2

In [None]:
cap = cv2.VideoCapture("./highway.mp4")
assert cap.isOpened(), "Error reading video file"
w,h,fps = (
    int(cap.get(x))
    for x in (cv2.CAP_PROP_FRAME_WIDTH,cv2.CAP_PROP_FRAME_HEIGHT,cv2.CAP_PROP_FPS)
)


In [None]:
region_points = [
    (430,700),
    (1600,700),
    (1600,1080),
    (430,1080)
]

In [None]:
video_writer = cv2.VideoWriter(
    "./highway_counted.mp4", cv2.VideoWriter_fourcc(*"mp4v"),fps,(w,h)
)

In [None]:
counter = solutions.ObjectCounter(
    show = False,
    region = region_points,
    model="yolo11x.pt"
)

In [None]:
while cap.isOpened():
  success,in0 = cap.read()
  if not success:
    print("Video frame is empty or video processing has been successfully completed")
    break
  in0 = counter.count(in0)
  video_writer.write(in0)
cap.release()
video_writer.release()
cv2.destroyAllWindows()

# Bài tập 3

In [None]:
!gdown --id 11qqiZsW6nqB_VHG42mwCdgA_ASrSu3nr

Downloading...
From: https://drive.google.com/uc?id=11qqiZsW6nqB_VHG42mwCdgA_ASrSu3nr
To: /content/bus.jpg
100% 137k/137k [00:00<00:00, 107MB/s]


In [None]:
!pip install ultralytics



In [None]:
!pip install loguru

Collecting loguru
  Downloading loguru-0.7.3-py3-none-any.whl.metadata (22 kB)
Downloading loguru-0.7.3-py3-none-any.whl (61 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/61.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.6/61.6 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: loguru
Successfully installed loguru-0.7.3


In [None]:
from ultralytics import YOLOWorld
from ultralytics.engine.results import Boxes
import sys
import uuid
from pathlib import Path
import cv2
from loguru import logger



In [None]:
def save_detection_results(results):

  saved_paths = []

  for i,result in enumerate(results):

    if len(result.boxes) > 0:
      annotated_image = result.plot()

      output_path = f"./img_{uuid.uuid4()}.jpg"

      cv2.imwrite(output_path,annotated_image)

      saved_path = Path(output_path).resolve()
      print(f"Image saved to {saved_path}")
      saved_paths.append(str(saved_path))
  return saved_paths

In [None]:
model = YOLOWorld("yolov8s-world.pt")
model.set_classes(['bus'])
results: Boxes = model.predict("bus.jpg")


image 1/1 /content/bus.jpg: 640x480 1 bus, 21.3ms
Speed: 8.8ms preprocess, 21.3ms inference, 1.9ms postprocess per image at shape (1, 3, 640, 480)


In [None]:
save_detection_results(results)

Image saved to /content/img_82f58d30-974b-4b94-84b5-322803e78eb5.jpg


['/content/img_82f58d30-974b-4b94-84b5-322803e78eb5.jpg']