<a href="https://colab.research.google.com/github/naoya1110/ai_robotics_lab_2023_hands_on/blob/main/Preprocessing_JetBot_Racing_Yolov8_2023Fall.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preprocessing JetBot Racing Video with YOLOv8

## Introduction

This notebook processes a JetBot racing video using an object detection model YOLOv8, and generates a tracking video (MP4) and a table data (CSV).

YOLOv8 https://github.com/ultralytics/ultralytics

## Upload Video to Google Drive
Take a video of your JetBot driving the racetrack for 3 laps. Then upload it to your Google Drive.

## GPU
Make sure you have access to a GPU.

## Your Team ID
Please set your team ID.

In [None]:
team_id = 0
print(f"Your team ID is {team_id}.")

## Setup YOLO v8

In [None]:
%pip install ultralytics dill
import ultralytics
ultralytics.checks()

Download the pretrained model for the JetBot detection.

In [None]:
import gdown
gdown.download('https://drive.google.com/uc?id=1dDPhi_chE4PCiyTIayPjAY07x4-jRxAv', quiet = False)

Load the pretrained model. If this cell raise an error, run it again.

In [None]:
# Load a model
from ultralytics import YOLO
model = YOLO("yolov8s_jetbot_detection_best_20231201.pt")  # load a pretrained model (recommended for training)

## Import Python Packages

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

## Prepare ArUco Markers
We use ArUco markers to calibrate the JetBot position in the racetrack.

In [None]:
aruco = cv2.aruco
dictionary = aruco.getPredefinedDictionary(aruco.DICT_4X4_50)

def update_paper_corners(ar_ids, ar_corners, paper_corners):
    if ar_ids is not None:
        for i in range(4):
            if i in ar_ids:
                j = np.where(ar_ids.flatten()==i)[0][0]
                paper_corners[i] = tuple(ar_corners[j][0][i].astype("int"))
    return paper_corners


def update_checker_corners(ar_ids, ar_corners, checker_corners):
    if ar_ids is not None:
        # update tl and bl coordinates if #4 is detected
        if 4 in ar_ids:
            idx4 = np.where(ar_ids.flatten()==4)[0][0]
            checker_corners[0] = tuple(ar_corners[idx4][0][0].astype("int"))
            checker_corners[3] = tuple(ar_corners[idx4][0][3].astype("int"))

        # update tl and bl coordinates if #4 is detected
        if 5 in ar_ids:
            idx5 = np.where(ar_ids.flatten()==5)[0][0]
            checker_corners[1] = tuple(ar_corners[idx5][0][1].astype("int"))
            checker_corners[2] = tuple(ar_corners[idx5][0][2].astype("int"))

    return checker_corners

## Input Video


Mount your google drive.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Set the filepath of your input video data.

In [None]:
input_video_path = "/content/drive/MyDrive/jetbot_videos/jetbot_cls_sample_2023_1.mp4"

if os.path.exists(input_video_path):
    dirname, filename = os.path.split(input_video_path)
    filename_body, ext = os.path.splitext(filename)
    output_video_path = os.path.join(dirname, f"{team_id}_{filename_body}_out.mp4")
    output_data_path = os.path.join(dirname, f"{team_id}_{filename_body}_data.csv")
    print("Input Video Path:", input_video_path)
    print("Output Video Path:", output_video_path)
    print("Output Data Path:", output_data_path)
else:
    print(f"No such file {input_video_path}")

## Perform Tracking
Now we can perform JetBot tracking.

In [None]:
TOTAL_LAPS = 3

print("Input Video:", input_video_path)
cap = cv2.VideoCapture(input_video_path)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_rate = int(cap.get((cv2.CAP_PROP_FPS)))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print("Frame Width:", frame_width)
print("Frame Height:", frame_height)
print("Frame Rate:", frame_rate)
print("Total Frames:", total_frames)

checker_corners = [(0, 0), (0, 1), (1, 1), (1, 0)]
paper_corners =  [(0, 0), (0, 1), (1, 1), (1, 0)]

GREEN = (0, 255, 0)
MAGENTA = (255, 0, 255)
CYAN = (255, 255, 0)

font = cv2.FONT_HERSHEY_SIMPLEX
#fourcc = cv2.VideoWriter_fourcc(*"DIVX")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(output_video_path, fourcc, frame_rate, (frame_width, frame_height))

count = 0
readable = 0
measure_time_flag = False
old_measure_time_flag = False

jetbot_states = [False]*3
new_jetbot_state = "not_in_checker"
old_jetbot_state = "not_in_checker"
current_lap = 0
lap_times = []
elapsed_time = 0
elapsed_time2 = 0
font = cv2.FONT_HERSHEY_SIMPLEX

conf = 0

output_data = []

while cap.isOpened():

    count += 1
    ret, frame1 = cap.read()
    if not ret:
        break

    else:
        timestamp = cap.get(cv2.CAP_PROP_POS_MSEC)/1000
        results = model(frame1, save=False, device=0, verbose=False)
        detections = results[0].boxes.data

        if len(detections) == 1:
            left, top, right, bottom, conf, clasa_id = detections.to("cpu").numpy()[0]
            left, top, right, bottom = int(left), int(top), int(right), int(bottom)

        else:
            left, top, right, bottom, conf = -1, -1, -1, -1, -1

        jetbot_center = (int((right+left)/2), int((top+bottom)/2))

        frame2 = np.zeros(frame1.shape, dtype=np.uint8)
        ar_corners, ar_ids, _ = aruco.detectMarkers(frame1, dictionary)

        # update & draw paper corners
        paper_corners = update_paper_corners(ar_ids, ar_corners, paper_corners)
        cv2.polylines(frame1, pts=np.array([paper_corners]), isClosed=True, color=(51, 212, 255), thickness=2)
        # update & draw checker corners
        checker_corners = update_checker_corners(ar_ids, ar_corners, checker_corners)
        cv2.polylines(frame1, pts=np.array([checker_corners]), isClosed=True, color=GREEN, thickness=2)
        cv2.fillPoly(frame2, pts=np.array([checker_corners]), color=GREEN)

        # check if jetbot is in the checker area
        jetbot_position_color = frame2[jetbot_center[1], jetbot_center[0]]
        if np.array_equal(jetbot_position_color, np.array(GREEN)):
            is_in_checker = True
            jetbot_color = GREEN
        else:
            is_in_checker = False
            jetbot_color = MAGENTA

        # draw on jetbot
        cv2.rectangle(frame1, (left, top), (right, bottom), jetbot_color, 2)
        cv2.putText(frame1, f"{float(conf):.0%}", (left, top - 5),cv2.FONT_HERSHEY_SIMPLEX, 0.6, jetbot_color, 2)
        cv2.circle(frame1, jetbot_center, 10, jetbot_color, thickness=-1)

        jetbot_states.append(is_in_checker)
        jetbot_states.pop(0)
        if sum(jetbot_states) == 3:
            new_jetbot_state = "in_checker"
        elif sum(jetbot_states) == 0:
            new_jetbot_state = "not_in_checker"

        if old_jetbot_state == "not_in_checker" and new_jetbot_state == "in_checker":
            measure_time_flag = True
            current_lap += 1
            if current_lap <= TOTAL_LAPS+1:
                lap_times.append(np.round(elapsed_time,3))

            if current_lap > TOTAL_LAPS:
                measure_time_flag = False
                #laps = TOTAL_LAPS

        # when JetBot started
        if measure_time_flag == True:
            if old_measure_time_flag == False:
                t_start = timestamp

            elapsed_time = timestamp - t_start
            output_data.append(
                np.concatenate(
                    (np.array([elapsed_time]),
                    np.array([current_lap]),
                    np.array(jetbot_center).flatten(),
                    np.array(paper_corners).flatten(),
                    np.array(checker_corners).flatten()))
            )

        # when JetBot finished
        if (measure_time_flag == False) & (old_measure_time_flag == True):
            t_finish = timestamp
            elapsed_time = timestamp - t_start
            output_data.append(
                np.concatenate(
                    (np.array([elapsed_time]),
                    np.array([current_lap]),
                    np.array(jetbot_center).flatten(),
                    np.array(paper_corners).flatten(),
                    np.array(checker_corners).flatten()))
            )

        text = f"TIME {elapsed_time:.2f}"
        cv2.putText(frame1, text, (20, 60), font, 1.5, CYAN, 2, cv2.LINE_AA)

        if len(lap_times) > 1:
            for i in range(len(lap_times)-1):
                cv2.putText(frame1, f"LAP{i+1} {lap_times[i+1]-lap_times[i]:.2f}", (20, 60*(i+2)), font, 1.5, CYAN, 2, cv2.LINE_AA)

        old_jetbot_state = new_jetbot_state
        old_measure_time_flag = measure_time_flag

        print(f"\rProgress {count}/{total_frames} JetBot:(x, y)={jetbot_center} conf={conf:.1%}, {text}, Current Lap {current_lap}, {lap_times}", end="")
        writer.write(frame1)

writer.release()
cap.release()

# save output data
output_data = np.array(output_data)
output_data_df = pd.DataFrame(output_data)
output_data_df.columns = ["time", "lap", "jb_x", "jb_y",
                          "pa_tl_x", "pa_tl_y", "pa_tr_x", "pa_tr_y", "pa_br_x", "pa_br_y", "pa_bl_x", "pa_bl_y",
                          "ch_tl_x", "ch_tl_y", "ch_tr_x", "ch_tr_y", "ch_br_x", "ch_br_y", "ch_bl_x", "ch_bl_y"]
output_data_df.to_csv(output_data_path, index=False)
# ----------------

print("\nFinished")
print("Output Video :", output_video_path)
print("Output Data:", output_data_path)

## Submission
Download your `{team_id}_{something}_data.csv` file and submit it from here.

### Classification Model Competition
https://forms.gle/QFqYqr3h4amv5X166

### Regression Model Competition
https://forms.gle/XFfHbtqzn6FVZAABA
