In [2]:
import cv2
import numpy as np
import os
import cv2
from ultralytics import YOLO
from tqdm import tqdm

In [3]:
def save_video_frames(video_path, video_number):
    # Open the video file
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return
    
    # Create directory for the video frames
    output_dir = f"train\\Task2\\frames"
    os.makedirs(output_dir, exist_ok=True)
    
    frame_number = 0
    while True:
        # Read a frame
        ret, frame = cap.read()
        
        # Break the loop if no frame is read (end of video)
        if not ret:
            break
        
        # Save the frame as an image file
        frame_filename = os.path.join(output_dir, f"video_{video_number}_frame_{frame_number:04d}.png")
        cv2.imwrite(frame_filename, frame)
        
        # Increment the frame number
        frame_number += 1
    
    # Release the video capture object
    cap.release()
    print(f"Saved {frame_number} frames to {output_dir}")
    
def get_video_files_from_folder(folder_path, extensions=None):
    if extensions is None:
        # Define default video file extensions
        extensions = ['.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv']

    # List all files in the directory
    all_files = os.listdir(folder_path)
    
    # Filter files based on extensions
    video_files = [f for f in all_files if os.path.splitext(f)[1].lower() in extensions]
    
    # Full path for video files
    video_files = [os.path.join(folder_path, f) for f in video_files]
    
    return video_files

In [4]:
def get_txt_files_from_folder(folder_path):
    # List all files in the directory
    all_files = os.listdir(folder_path)
    
    # Filter files to get only .txt files
    txt_files = [f for f in all_files if f.endswith('.txt')]
    
    # Full path for txt files
    txt_files = [os.path.join(folder_path, f) for f in txt_files]
    
    return txt_files

def modify_lines_in_file(input_file, output_folder,i):
    
    with open(input_file, 'r') as infile:
        for line_number, line in enumerate(infile):
            if line_number == 0:
                continue  # Skip the first line
            # Split the line into numbers
            numbers = line.split()
            if numbers:
                # Replace the first number with 0
                numbers[0] = '0'
                # Join the numbers back into a string
                modified_line = ' '.join(numbers)
                output_file = output_folder+ f"\\video_{i}_frame_{line_number:04d}.txt"
                # Open the new output file for writing
                with open(output_file, 'w') as outfile:
                    # Write the modified line to the new output file
                    outfile.write(modified_line + '\n')

def process_txt_files(folder_path, output_folder):
    # Ensure the output folder exists
    os.makedirs(output_folder, exist_ok=True)
    
    # Get the list of txt files
    txt_files = get_txt_files_from_folder(folder_path)
    i=1
    # Process each txt file
    for txt_file in txt_files:
        # Define the output file name
        base_name = os.path.basename(txt_file)
        output_file = os.path.join(output_folder, base_name)
        
        # Modify lines and write to the new file
        modify_lines_in_file(txt_file, output_folder,i)
        i+=1
        print(f"Processed {txt_file} and saved to {output_file}")

In [5]:
def read_input_txt(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
        second_line = lines[1].strip().split()
        numbers = [int(l) for l in second_line[1:]]
        
        return lines[0], tuple(numbers)

In [6]:
def load_images(directory):
    images = []
    for filename in tqdm(os.listdir(directory)):
        if filename.endswith(".png") or filename.endswith(".jpg"):
            img = cv2.imread(os.path.join(directory, filename))
            if img is not None:
                images.append(img)
    return images

def load_queries(directory):
    queries = []
    for filename in tqdm(os.listdir(directory)):
        if filename.endswith(".txt"):
            with open(os.path.join(directory, filename), 'r') as file:
                # Read all lines and strip newline characters
                query = [line.strip().split() for line in file]
                query = [int(q) for q in query[0]]
            queries.append((query[0],query[1:]))
    return queries

def display_image(image):
    cv2.imshow('Image', image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [7]:
def get_rectangle_area(coords):
    # Unpack coordinates
    x1, y1, x2, y2 = coords
    # Calculate width and height
    width = abs(x2 - x1)
    height = abs(y2 - y1)
    return width * height

def find_median_area(area1, area2):
    return (area1 + area2) / 2

def calculate_third_rectangle(rect1, rect2):
    # Calculate areas of the first two rectangles
    area1 = get_rectangle_area(rect1)
    area2 = get_rectangle_area(rect2)
    
    # Find median area
    median_area = find_median_area(area1, area2)
    
    # Calculate the direction vector and midpoint
    center1 = ((rect1[0] + rect1[2]) / 2, (rect1[1] + rect1[3]) / 2)
    center2 = ((rect2[0] + rect2[2]) / 2, (rect2[1] + rect2[3]) / 2)
    
    # Calculate the direction vector from rect1 to rect2
    direction_vector = np.array([center2[0] - center1[0], center2[1] - center1[1]])
    distance = np.linalg.norm(direction_vector)
    direction_vector /= distance  # Normalize direction vector
    
    # Determine the center point for the third rectangle
    center3 = (center2[0] + direction_vector[0] * distance, center2[1] + direction_vector[1] * distance)
    
    # Calculate the dimensions of the third rectangle
    width1 = abs(rect1[2] - rect1[0])
    height1 = abs(rect1[3] - rect1[1])
    
    # Assuming the third rectangle maintains the same aspect ratio as the first rectangle
    aspect_ratio = width1 / height1
    width3 = np.sqrt(median_area * aspect_ratio)
    height3 = median_area / width3
    
    # Calculate the coordinates of the third rectangle centered at center3
    rect3 = (
        int(center3[0] - width3 / 2), int(center3[1] - height3 / 2),
        int(center3[0] + width3 / 2), int(center3[1] + height3 / 2)
    )
    
    return rect3

In [8]:
def predict(chosen_model, img, classes=[], conf=0.5, iou=0.7):
    if classes:
        results = chosen_model.predict(img, classes=classes, conf=conf, iou= iou)
    else:
        results = chosen_model.predict(img, conf=conf, iou= iou)

    return results

def predict_and_detect(chosen_model, img, classes=[], conf=0.5, iou=0.7):
    results = predict(chosen_model, img, classes, conf=conf, iou= iou)

    for result in results:
        for box in result.boxes:
            cv2.rectangle(img, (int(box.xyxy[0][0]), int(box.xyxy[0][1])),
                          (int(box.xyxy[0][2]), int(box.xyxy[0][3])), (255, 0, 0), 2)
            cv2.putText(img, f"{result.names[int(box.cls[0])]}",
                        (int(box.xyxy[0][0]), int(box.xyxy[0][1]) - 10),
                        cv2.FONT_HERSHEY_PLAIN, 1, (255, 0, 0), 1)
    return img, results

In [9]:
class PredictionBox:
    def __init__(self, bbox, confidence, cls, iou_with_ancestor):
        self.bbox = bbox
        self.confidence = confidence
        self.cls = cls
        self.iou_with_ancestor = iou_with_ancestor

In [10]:
def intersection_over_union(rect1, rect2):
    # Unpack coordinates
    x1_min, y1_min, x1_max, y1_max = rect1
    x2_min, y2_min, x2_max, y2_max = rect2

    # Calculate the intersection coordinates
    x_inter_min = max(x1_min, x2_min)
    y_inter_min = max(y1_min, y2_min)
    x_inter_max = min(x1_max, x2_max)
    y_inter_max = min(y1_max, y2_max)

    # Compute the area of the intersection rectangle
    inter_width = max(0, x_inter_max - x_inter_min)
    inter_height = max(0, y_inter_max - y_inter_min)
    inter_area = inter_width * inter_height

    # Compute the area of both rectangles
    area1 = (x1_max - x1_min) * (y1_max - y1_min)
    area2 = (x2_max - x2_min) * (y2_max - y2_min)

    # Compute the union area
    union_area = area1 + area2 - inter_area

    # Compute the Intersection over Union (IoU)
    iou = inter_area / union_area

    return iou

In [11]:
def ensure_directory_exists(path):
    if not os.path.exists(path):
        os.makedirs(path)

def extract_results(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
        extracted_pos = []
        for line in lines[1:]:
            components = line.strip().split()
            last_four = components[-4:]
            extracted_pos.append(tuple([int(num) for num in last_four]))
        
        return extracted_pos

In [45]:
frames = load_images("train/Task2/dataset/images/train")
labels = load_queries("train/Task2/labels")

  3%|▎         | 50/1667 [00:00<00:26, 60.67it/s]


KeyboardInterrupt: 

In [None]:
def pascal_voc_to_yolo(x1, y1, x2, y2, image_w, image_h):
    return [((x2 + x1)/(2*image_w)), ((y2 + y1)/(2*image_h)), (x2 - x1)/image_w, (y2 - y1)/image_h]


yolo_pos_list = []
for label,img in tqdm(zip(labels, frames)):
    pos = label[1]
    h, w, _ = img.shape
    yolo_pos = pascal_voc_to_yolo(pos[0], pos[1], pos[2], pos[3], w, h)
    yolo_pos_list.append(yolo_pos)

dir_path = "train/Task2/labels"
files_in_path = sorted(os.listdir(dir_path), key = lambda x: x[7].isdigit())
for pos, path in zip(yolo_pos_list, files_in_path):
    label = f"0 {pos[0]} {pos[1]} {pos[2]} {pos[3]}"
    # check if current path is a file
    if os.path.isfile(os.path.join(dir_path, path)):
        with open(os.path.join(dir_path, path), 'w') as outfile:
                    # Write the modified line to the new output file
                    outfile.write(label)

In [None]:
rect1 = (10, 10, 30, 40)  # Example coordinates of the first rectangle (x1, y1, x2, y2)
rect2 = (50, 50, 80, 90)
third_rect = calculate_third_rectangle(rect1, rect2)
print("Third rectangle coordinates:", third_rect)

cv2.rectangle(frames[0], (50, 50),
                          (80, 90), (255, 97, 34), 2)
cv2.rectangle(frames[0], (10, 10),
                          (30, 40), (255, 97, 34), 2)

cv2.rectangle(frames[0], (int(third_rect[0]), int(third_rect[1])),
                          (int(third_rect[2]), int(third_rect[3])), (255, 97, 34), 2)
display_image(frames[0])

In [17]:
def track_videos_from_path(path="train/Task2", model = "yolov8x.pt", verbose=0):
    model = YOLO(model)
    videos = get_video_files_from_folder(path)
    start_positions_txt = get_txt_files_from_folder(path)
    i = 1
    for video_path, st_pos in tqdm(zip(videos[1:2], start_positions_txt[1:2])):
        capture = cv2.VideoCapture(video_path)
        line_0, initial_bbox = read_input_txt(st_pos)
        frames_pred_list = [initial_bbox]

        #Discard the first frame because we already have it 
        ok, frame = capture.read()
        if not ok:
            print('Cannot read the first frame')
            exit()
        
        # Loop through the video frames
        while capture.isOpened():
            # Read a frame from the video
            success, frame = capture.read()
            pb_list = []
            if success:
                # 31,32,34,35,74,75
                results = predict(model, frame, conf=0.005, iou=0.5, classes=[])
                
                for r in results:
                    boxes = r.boxes

                    for box in boxes:
                        x1, y1, x2, y2 = box.xyxy[0]
                        bbox = int(x1), int(y1), int(x2), int(y2)
                        confidence = float(box.conf[0])
                        cls = r.names[int(box.cls[0])]
                        iou_with_ancestor = intersection_over_union(frames_pred_list[-1], bbox)

                        pbox = PredictionBox(bbox, confidence, cls, iou_with_ancestor)
                        pb_list.append(pbox)
                pb_list = [pb for pb in pb_list if pb.iou_with_ancestor >= 0.2]
                pb_list = sorted(pb_list, key= lambda x: (x.iou_with_ancestor), reverse=True)

                if len(pb_list) == 0 and len(frames_pred_list)>1:
                    pos_1 = frames_pred_list[-2]
                    pos_2 = frames_pred_list[-1]
                    actual_pos = calculate_third_rectangle(pos_1, pos_2)
                    frames_pred_list.append(actual_pos)
                elif len(pb_list) == 0 and len(frames_pred_list) > 1:
                    frames_pred_list.append(frames_pred_list[-1])
                else:
                    frames_pred_list.append(pb_list[0].bbox)

                if verbose == 1:
                    for pb in pb_list:
                        cv2.rectangle(frame, (pb.bbox[0], pb.bbox[1]),
                                    (pb.bbox[2], pb.bbox[3]), (255, 0, 0), 2)
                        cv2.putText(frame, f"{pb.cls}",
                                    (pb.bbox[0], pb.bbox[1] - 10),
                                    cv2.FONT_HERSHEY_PLAIN, 1, (255, 0, 0), 1)
                    cv2.rectangle(frame, (frames_pred_list[-1][0], frames_pred_list[-1][1]),
                            (frames_pred_list[-1][2], frames_pred_list[-1][3]), (35, 178, 200), 2)
                    cv2.imshow("Image", frame)

                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
            else:
                break
        capture.release()
        cv2.destroyAllWindows()

        if i<10:
            file_path = os.path.join(f"{os.getcwd()}\\train\\Task2\\predited-truth", f"0{i}_pt.txt")
        else:
            file_path = os.path.join(f"{os.getcwd()}\\train\\Task2\\predited-truth", f"{i}_pt.txt")

        ensure_directory_exists(os.path.dirname(file_path))

        with open(file_path, 'w') as file:
            j=1
            file.write(line_0)
            for pos in frames_pred_list:
                file.write(f"{j} {pos[0]} {pos[1]} {pos[2]} {pos[3]}\n")
                j+=1

        i += 1



In [18]:
track_videos_from_path()

0it [00:00, ?it/s]


0: 384x640 15 persons, 1 traffic light, 2 handbags, 1 suitcase, 1 baseball glove, 1 skateboard, 12 tvs, 2 cell phones, 1883.6ms
Speed: 3.0ms preprocess, 1883.6ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 13 persons, 1 car, 1 traffic light, 4 handbags, 1 suitcase, 1 baseball glove, 1 skateboard, 14 tvs, 2 cell phones, 1874.4ms
Speed: 3.0ms preprocess, 1874.4ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 13 persons, 1 traffic light, 2 handbags, 2 baseball gloves, 3 skateboards, 10 tvs, 2 cell phones, 1 clock, 1851.9ms
Speed: 2.0ms preprocess, 1851.9ms inference, 3.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 11 persons, 1 car, 1 traffic light, 1 handbag, 2 baseball gloves, 6 skateboards, 10 tvs, 2 cell phones, 1847.5ms
Speed: 2.0ms preprocess, 1847.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 persons, 1 car, 1 traffic light, 1 backpack, 2 handbags, 6 skateboards,

1it [06:46, 406.67s/it]


In [15]:
def get_accuracy(pt, gt):
    total=0
    good=0
    for i in range(len(gt)):
        if intersection_over_union(gt[i], pt[i]) >= 0.3:
            good+=1
        total+=1
    return good/total

predicted = get_txt_files_from_folder("train/Task2/predited-truth")
ground = get_txt_files_from_folder("train/Task2/ground-truth")
pred_results = []
ground_results = []
acc_list=[]

for filepath in predicted:
    pred_results.append(extract_results(filepath))
for filepath in ground:
    ground_results.append(extract_results(filepath))

for pt,gt in zip(pred_results[:1], ground_results[:1]):
    acc = get_accuracy(pt, gt)
    acc_list.append(acc)


In [16]:
acc

0.8520408163265306