In [None]:
from ultralytics import YOLO
from concurrent.futures import ThreadPoolExecutor
import cv2
import json
import os
import random
import shutil
import torch
import yaml

In [None]:
# Define paths
repo_dir = os.getcwd().split('dslab25')[0] + 'dslab25/'
dino_dir = os.path.join(repo_dir, "obj_detection/dino/")
images = os.path.join(repo_dir, "training/vacuum_pump/images/augmented/")
labels = os.path.join(repo_dir, "training/vacuum_pump/annotation/augmented/")

yolo_train_images = os.path.join(repo_dir, "yolo_dataset/images/train")
yolo_val_images = os.path.join(repo_dir, "yolo_dataset/images/val")
yolo_train_labels = os.path.join(repo_dir, "yolo_dataset/labels/train")
yolo_val_labels = os.path.join(repo_dir, "yolo_dataset/labels/val")
yaml_path = os.path.join(repo_dir, "yolo_dataset/yolo_dataset.yaml")

In [None]:
# YOLOv12 model (you can use "yolov12n.pt", "yolov12s.pt", "yolov12m.pt" etc.)
model_path = "yolo12m.pt"  # pretrained weights from Ultralytics

TRAINING_EVAL_SPLIT = 0.9

# Create the yolo dataset folder (just copying files)

In [None]:
# Create output folders
os.makedirs(yolo_train_images, exist_ok=True)
os.makedirs(yolo_val_images, exist_ok=True)
os.makedirs(yolo_train_labels, exist_ok=True)
os.makedirs(yolo_val_labels, exist_ok=True)

# This function copies one image and its label
def copy_image_label(task):
	src_image, dst_image, src_label, dst_label = task
	if os.path.exists(src_image) and os.path.exists(src_label):
		shutil.copy(src_image, dst_image)
		shutil.copy(src_label, dst_label)

tasks = []

for folder in os.listdir(images):
	folder_path = os.path.join(images, folder)
	if not os.path.isdir(folder_path):
		continue  # skip if not a folder

	image_files = os.listdir(folder_path)
	random.shuffle(image_files)

	# Split into 90% train, 10% validation
	split_idx = int(len(image_files) * TRAINING_EVAL_SPLIT)
	train_images = image_files[:split_idx]
	val_images = image_files[split_idx:]

	print(len(train_images))

	for image in train_images:
		src_image = os.path.join(images, folder, image)
		dst_image = os.path.join(yolo_train_images, image)
		src_label = os.path.join(labels, folder, image.replace(".jpg", ".txt"))
		dst_label = os.path.join(yolo_train_labels, image.replace(".jpg", ".txt"))
		tasks.append((src_image, dst_image, src_label, dst_label))

	for image in val_images:
		src_image = os.path.join(images, folder, image)
		dst_image = os.path.join(yolo_val_images, image)
		src_label = os.path.join(labels, folder, image.replace(".jpg", ".txt"))
		dst_label = os.path.join(yolo_val_labels, image.replace(".jpg", ".txt"))
		tasks.append((src_image, dst_image, src_label, dst_label))

# Parallel copy using threads
with ThreadPoolExecutor(max_workers=16) as executor:
	list(executor.map(copy_image_label, tasks))

# Create dataset YAML
dataset_yaml = {
	"path": os.path.join(repo_dir, "yolo_dataset"),
	"train": "images/train",
	"val": "images/val",
	"nc": 8,  # Number of classes, set to 1 if you only care about bounding boxes
	"names": ["stage_0", "stage_1", "stage_2", "stage_3", "stage_4", "stage_5", "stage_6", "stage_7"]  # name of your single class
}

with open(yaml_path, "w") as f:
	yaml.dump(dataset_yaml, f)


# Train model

In [None]:
# Load and train model
model = YOLO(model_path)  # Load YOLOv12 model (Ultralytics must support it)

model.train(
	data=yaml_path,
	epochs=1,
	imgsz=640,
	batch=32,
	name="yolov12_boundingbox",
	project=os.path.join(dino_dir, "yolo_runs"),
	device=0 if torch.cuda.is_available() else "cpu"
)

# Test model on real video

In [None]:
# CONFIGURATION
video_path = os.path.join(repo_dir, "assets/vacuum_pump/videos/01_run1_cam_2_1024x1024_15fps_3mbps.mp4")
video_labels_path = os.path.join(repo_dir, "assets/vacuum_pump/videos/output.txt")
# Path to your trained YOLOv12 weights (adjust as needed)
yolo_model_path = os.path.join(repo_dir, "obj_detection/dino/yolo_runs/yolov12_boundingbox11", "weights", "best.pt")


## Load labels

In [None]:
# Load ground truth labels.
print(f"Loading labels from: {video_labels_path}")

frame_to_class = {}
with open(video_labels_path, 'r') as f:
	for line in f:
		parts = line.strip().split()
		if len(parts) == 3:
			state_class, start_frame, end_frame = int(parts[0]), int(parts[1]), int(parts[2])
			for frame_idx in range(start_frame, end_frame + 1):
				frame_to_class[frame_idx] = state_class


## Test

In [None]:
# Open the video.
print(f"Loading video from: {video_path}")
video = cv2.VideoCapture(video_path)
if not video.isOpened():
	print("Error: Could not open video file.")
	return
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
fps = video.get(cv2.CAP_PROP_FPS)
print(f"Video info: {total_frames} frames, {fps} fps")

# Load the YOLO model.
print("Loading YOLO model...")
yolo_model = YOLO(yolo_model_path)

print("\n--- Starting YOLO Evaluation ---")
frame_idx = 0
frames_to_process = []
# Process every 5th frame that has a ground truth label.
while True:
	ret, frame = video.read()
	if not ret:
		break
	if frame_idx % 5 == 0 and frame_idx in frame_to_class:
		frames_to_process.append((frame_idx, frame))
	frame_idx += 1
video.release()
print(f"Total frames to evaluate: {len(frames_to_process)}")

correct_predictions = 0
total_predictions = 0

for frame_idx, frame in frames_to_process:
	true_label = frame_to_class[frame_idx]
	
	# Run YOLO detection on the frame.
	yolo_results = yolo_model(frame)
	if len(yolo_results) == 0 or len(yolo_results[0].boxes) == 0:
		print(f"Frame {frame_idx}: No detection found. Skipping frame.")
		continue
	
	# Retrieve detections and select the one with the highest confidence.
	boxes = yolo_results[0].boxes.data  # Each row: [x1, y1, x2, y2, conf, cls]
	idx = torch.argmax(boxes[:, 4])
	box = boxes[idx]
	
	# YOLO prediction: class is at index 5.
	predicted_label = int(box[5].item())
	
	is_correct = predicted_label == true_label
	if is_correct:
		correct_predictions += 1
	total_predictions += 1
	
	print(f"Frame {frame_idx}:")
	print(f"  True label: {true_label}")
	print(f"  YOLO Predicted: {predicted_label}")
	print(f"  Correct: {'Yes' if is_correct else 'No'}")
	print("-" * 20)

accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
print("\nEvaluation Summary:")
print(f"  Total frames evaluated: {total_predictions}")
print(f"  Correct predictions: {correct_predictions}")
print(f"  Accuracy: {accuracy:.2f} ({correct_predictions}/{total_predictions})")
