# Extract CVAT ZIP and create dataset

In [1]:
import os
import shutil
import random
from pathlib import Path
import yaml
import zipfile
import json
import cv2
from tqdm import tqdm
from dataclasses import dataclass, field


@dataclass
class CocoOutput:
    info: dict = field(
        default_factory=lambda: {"description": "Converted YOLO dataset"}
    )
    licenses: list = field(default_factory=lambda: [])
    images: list = field(default_factory=lambda: [])
    annotations: list = field(default_factory=lambda: [])
    categories: list = field(default_factory=lambda: [])


def yolo_to_coco(yolo_dir, images_dir, output_json_path, class_names):
    """
    Converts YOLO format annotations to COCO format JSON.

    Args:
        yolo_dir (str): Path to the directory containing YOLO .txt files.
        images_dir (str): Path to the directory containing corresponding image files (.jpg only).
        output_json_path (str): Path to save the output COCO JSON file.
        class_names (list): A list of class names, in order corresponding to YOLO class IDs.
    """
    yolo_path = Path(yolo_dir)
    images_path = Path(images_dir)
    output_path = Path(output_json_path)

    coco_output = CocoOutput()

    # Populate categories
    for i, name in enumerate(class_names):
        coco_output.categories.append({
            "id": i + 1,  # COCO category IDs typically start from 1
            "name": name,
            "supercategory": name,  # Or a more general category if applicable
        })

    image_id_counter = 1
    annotation_id_counter = 1

    yolo_files = list(yolo_path.glob("*.txt"))
    if not yolo_files:
        print(f"Warning: No YOLO annotation files found in {yolo_path}")
        return

    print(f"Found {len(yolo_files)} annotation files. Starting conversion...")

    for yolo_file in tqdm(yolo_files):
        base_filename = yolo_file.stem
        image_path = images_path / f"{base_filename}.jpg"
        if not image_path.exists():
            image_path = images_path / f"{base_filename}.png"
            

        if not image_path.exists():
            print(
                f"Warning: Could not find corresponding image for {yolo_file.name}. Skipping."
            )
            continue

        try:
            # Read image to get dimensions
            image = cv2.imread(str(image_path))
            if image is None:
                print(f"Warning: Failed to read image {image_path}. Skipping.")
                continue
            img_height, img_width, _ = image.shape
        except Exception as e:
            print(f"Error reading image {image_path}: {e}. Skipping.")
            continue

        # Add image entry
        image_entry = {
            "id": image_id_counter,
            "file_name": str(image_path.absolute()),
            "width": img_width,
            "height": img_height,
        }
        coco_output.images.append(image_entry)

        # Process annotations for this image
        try:
            with open(yolo_file, "r") as f:
                lines = f.readlines()
        except Exception as e:
            print(
                f"Error reading annotation file {yolo_file.name}: {e}. Skipping image."
            )
            # Remove the image entry if annotation reading fails
            coco_output.images.pop()
            continue

        for line in lines:
            parts = line.strip().split()
            if len(parts) < 5:
                print(
                    f"Warning: Invalid line format in {yolo_file.name}: '{line.strip()}'. Skipping line."
                )
                continue

            try:
                class_id = int(parts[0])
                x_center_norm = float(parts[1])
                y_center_norm = float(parts[2])
                width_norm = float(parts[3])
                height_norm = float(parts[4])

                if not (0 <= class_id < len(class_names)):
                    print(
                        f"Warning: Invalid class ID {class_id} in {yolo_file.name}. Skipping line."
                    )
                    continue
                if not (
                    0 <= x_center_norm <= 1
                    and 0 <= y_center_norm <= 1
                    and 0 <= width_norm <= 1
                    and 0 <= height_norm <= 1
                ):
                    print(
                        f"Warning: Invalid coordinates/dimensions in {yolo_file.name}: {parts[1:]}. Skipping line."
                    )
                    continue

            except ValueError:
                print(
                    f"Warning: Non-numeric data found in {yolo_file.name}: '{line.strip()}'. Skipping line."
                )
                continue

            # Convert to COCO bbox format [x_min, y_min, width, height]
            abs_width = width_norm * img_width
            abs_height = height_norm * img_height
            x_center_abs = x_center_norm * img_width
            y_center_abs = y_center_norm * img_height
            x_min = max(0.0, x_center_abs - abs_width / 2)
            y_min = max(0.0, y_center_abs - abs_height / 2)
            # Ensure width and height are positive and box stays within image bounds
            abs_width = min(abs_width, img_width - x_min)
            abs_height = min(abs_height, img_height - y_min)

            if abs_width <= 0 or abs_height <= 0:
                print(
                    f"Warning: Degenerate box calculated for line '{line.strip()}' in {yolo_file.name}. Skipping."
                )
                continue

            area = abs_width * abs_height
            coco_category_id = class_id + 1  # Map YOLO 0-based ID to COCO 1-based ID

            annotation_entry = {
                "id": annotation_id_counter,
                "image_id": image_id_counter,
                "category_id": coco_category_id,
                "bbox": [x_min, y_min, abs_width, abs_height],
                "area": area,
                "iscrowd": 0,
                "segmentation": [],  # Add empty segmentation list
            }
            coco_output.annotations.append(annotation_entry)
            annotation_id_counter += 1

        image_id_counter += 1

    # Save COCO JSON file
    output_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        with open(output_path, "w") as f:
            json.dump(coco_output.__dict__, f, indent=4)
        print(f"Successfully converted dataset and saved to {output_path}")
    except Exception as e:
        print(f"Error writing JSON file {output_path}: {e}")





def remove_narrow_predictions():
    """Remove any predictions where the base is much narrower than the height.
    This will ensure we dont train using wheels/hubs that are not at least 1/3 of the height.
    This is a heuristic to remove narrow predictions that are likely move train the model incorrectly.
    """
    jpgs = []
    deleted_fully = 0
    some_deleted = 0
    print()
    for txtfile in yolo_dataset_path.rglob("**/labels/*.txt"):
        data = Path(txtfile).read_text().strip().split("\n")

        newlines = []
        for line in data:
            if line.strip():
                values = line.strip().split()
                class_id = int(values[0])
                x_center = float(values[1])
                y_center = float(values[2])
                width_normalized = float(values[3])
                height_normalized = float(values[4])

                # Calculate normalized area
                normalized_area = width_normalized * height_normalized

                # Calculate ratio of length to width
                ratio = width_normalized / height_normalized

                if ratio > 0.335:
                    newlines.append(line.strip())

                else:
                    jpgs.append((ratio, str(txtfile)))

        if len(newlines) != len(data):
            if len(newlines) == 0:
                print(f"File: {txtfile} has no valid lines")
                txtfile.unlink()
                deleted_fully += 1
            else:
                "\n".join(newlines)
                txtfile.write_text("\n".join(newlines))
                some_deleted += 1
                print(txtfile)
                print(f"\tClass ID: {class_id}")
                print(f"\t\tNormalized Width: \t{width_normalized:.2f}")
                print(f"\t\tNormalized Height: \t{height_normalized:.2f}")
                print(f"\t\tNormalized Area: \t{normalized_area:.2f}")
                print(f"\t\tRatio (Length/Width): \t{ratio:.2f}")

    print(
        f"{deleted_fully} files fully removed\n{some_deleted} files had at least one entry removed"
    )
    # jpg = sorted(jpgs)
    # print(f"Total number of images: {len(jpg)}")


def extract_zip(zip_path, extract_path):
    """Extracts the contents of a zip file to a specified directory."""
    try:
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(extract_path)
        print(f"Successfully extracted '{zip_path}' to '{extract_path}'")
    except FileNotFoundError:
        print(f"Error: Zip file '{zip_path}' not found.")
    except Exception as e:
        print(f"An error occurred during extraction: {e}")


def create_yolo_dataset(
    temp_dataset_path: Path,
    yolo_dataset_path: Path,
    train_ratio=0.9,
    inc_no_dets: bool = True,
):
    """Creates a YOLO dataset structure with train/test split."""
    test_ratio = 1 - train_ratio

    # Create the new YOLO dataset directory structure
    yolo_train_images_path = yolo_dataset_path / "train/images"
    yolo_train_labels_path = yolo_dataset_path / "train/labels"
    yolo_test_images_path = yolo_dataset_path / "val/images"
    yolo_test_labels_path = yolo_dataset_path / "val/labels"

    yolo_train_images_path.mkdir(parents=True, exist_ok=True)
    yolo_train_labels_path.mkdir(parents=True, exist_ok=True)
    yolo_test_images_path.mkdir(parents=True, exist_ok=True)
    yolo_test_labels_path.mkdir(parents=True, exist_ok=True)

    # Get all image and label files from the base dataset
    temp_images_path = temp_dataset_path / "images/train"
    temp_labels_path = temp_dataset_path / "labels/train"

    if not temp_images_path.is_dir() or not temp_labels_path.is_dir():
        print(
            f"Error: '{temp_images_path}' or '{temp_labels_path}' not found in the base dataset."
        )
        return

    image_files = list(sorted(temp_images_path.glob("*")))
    label_files = list(sorted(temp_labels_path.glob("*")))

    if len(image_files) != len(label_files):
        print("Warning: Number of image files and label files do not match.")

    # Create a list of (image_path, label_path) pairs
    data_pairs = []
    missing_labels = []
    for image_file in sorted(image_files):
        label_file_name = image_file.stem + ".txt"
        corresponding_label_file = temp_labels_path / label_file_name

        if not corresponding_label_file.exists():
            if inc_no_dets:
                corresponding_label_file: Path
                corresponding_label_file.write_text("")
                label_files.append(corresponding_label_file)
            else:
                continue

        data_pairs.append((image_file, corresponding_label_file))

    print(
        f"Missing labels for {len(missing_labels) / len(image_files) * 100:.1f}% images: "
    )

    random.shuffle(data_pairs)
    split_index = int(len(data_pairs) * train_ratio)
    train_data = sorted(data_pairs[:split_index])
    test_data = sorted(data_pairs[split_index:])

    train_image_list = []
    train_label_list = []
    for image_path, label_path in train_data:
        if label_path.read_text() == "":
            continue
        dest_image_path = yolo_train_images_path / image_path.name
        dest_label_path = yolo_train_labels_path / label_path.name
        shutil.copy2(image_path, dest_image_path)
        shutil.copy2(label_path, dest_label_path)
        train_image_list.append(str(dest_image_path))
        train_label_list.append(str(dest_label_path))

    test_image_list = []
    test_label_list = []
    for image_path, label_path in test_data:
        label_path: Path
        # if label_path.read_text() == "":
        #     continue
        dest_image_path = yolo_test_images_path / image_path.name
        dest_label_path = yolo_test_labels_path / label_path.name
        shutil.copy2(image_path, dest_image_path)
        shutil.copy2(label_path, dest_label_path)
        test_image_list.append(str(dest_image_path))
        test_label_list.append(str(dest_label_path))

    # Read YAML file
    with open(temp_dataset_path / "data.yaml", "r") as stream:
        data_yaml_content = yaml.safe_load(stream)

    data_yaml_content["val"] = "val.txt"

    with open(yolo_dataset_path / "data.yaml", "w") as f:
        yaml.dump(data_yaml_content, f)

    with open(yolo_dataset_path / "train.txt", "w") as f:
        f.write("\n".join(train_image_list))

    with open(yolo_dataset_path / "val.txt", "w") as f:
        f.write("\n".join(test_image_list))

    print(f"Successfully created YOLO dataset structure at: {yolo_dataset_path}")
    print(
        f"Train images and labels in: {yolo_train_images_path} and {yolo_train_labels_path}"
    )
    print(
        f"Test images and labels in: {yolo_test_images_path} and {yolo_test_labels_path}"
    )
    print(f"data.yaml created at: {yolo_dataset_path / 'data.yaml'}")
    print(f"train.txt created at: {yolo_dataset_path / 'train.txt'}")
    print(f"val.txt created at: {yolo_dataset_path / 'val.txt'}")


# --- Main Execution in Jupyter ---
DATASET_ROOT = Path("/home/hidara/Documents/datasets/yolo_dataset_under/")
TEMP_DATASET = Path("/home/hidara/Documents/datasets/temp_dataset/")
# 1. Extract the zip file
ZIP_FILE_PATH = "new_thermal_under.zip"  # Make sure this file is in the same directory

yolo_annotation_directory = (DATASET_ROOT / "train/labels").absolute()
image_directory = DATASET_ROOT / "train/images"
output_file = DATASET_ROOT / "annotations/instances_train.json"
if "under" in DATASET_ROOT.name:
    classes = ["brakes"]  # Replace with your actual class names
else:
    classes = ["hub", "tyre"]  # Replace with your actual class names

shutil.rmtree(
    TEMP_DATASET, ignore_errors=True
)  # Remove existing yolo_dataset if it exists
extract_zip(ZIP_FILE_PATH, TEMP_DATASET)

# 2. Create the YOLO dataset
shutil.rmtree(
    DATASET_ROOT, ignore_errors=True
)  # Remove existing yolo_dataset if it exists
create_yolo_dataset(TEMP_DATASET, DATASET_ROOT, inc_no_dets=True)
# remove_narrow_predictions()



# # --- Example Usage ---
# Define your class names in the order they appear in YOLO IDs (0, 1, 2...)

yolo_to_coco(yolo_annotation_directory, image_directory, output_file, classes)

# Repeat for validation set
yolo_annotation_directory_val = DATASET_ROOT / "val/labels"
image_directory_val = DATASET_ROOT / "val/images"
output_file_val = DATASET_ROOT / "annotations/instances_val.json"
yolo_to_coco(
    yolo_annotation_directory_val, image_directory_val, output_file_val, classes
)

Successfully extracted 'new_thermal_under.zip' to '/home/hidara/Documents/datasets/temp_dataset'
Missing labels for 0.0% images: 
Successfully created YOLO dataset structure at: /home/hidara/Documents/datasets/yolo_dataset_under
Train images and labels in: /home/hidara/Documents/datasets/yolo_dataset_under/train/images and /home/hidara/Documents/datasets/yolo_dataset_under/train/labels
Test images and labels in: /home/hidara/Documents/datasets/yolo_dataset_under/val/images and /home/hidara/Documents/datasets/yolo_dataset_under/val/labels
data.yaml created at: /home/hidara/Documents/datasets/yolo_dataset_under/data.yaml
train.txt created at: /home/hidara/Documents/datasets/yolo_dataset_under/train.txt
val.txt created at: /home/hidara/Documents/datasets/yolo_dataset_under/val.txt
Found 3568 annotation files. Starting conversion...


100%|██████████| 3568/3568 [00:03<00:00, 1002.77it/s]


Successfully converted dataset and saved to /home/hidara/Documents/datasets/yolo_dataset_under/annotations/instances_train.json
Found 804 annotation files. Starting conversion...


100%|██████████| 804/804 [00:00<00:00, 1136.54it/s]

Successfully converted dataset and saved to /home/hidara/Documents/datasets/yolo_dataset_under/annotations/instances_val.json





# YOLO

In [None]:
import os
from datetime import datetime
from ultralytics import YOLO
from tensorboard import notebook
from IPython.display import clear_output
from pathlib import Path
import gc


# --- Trainer Class ---
class YOLOTrainer:
    def __init__(self, model_path, data_path, base_save_dir="yolo_runs"):
        self.model_path = model_path
        self.data_path = data_path
        self.base_save_dir = base_save_dir
        self.model = None
        self.results = None
        self.current_run_dir = None

    def load_model(self) -> None:
        self.model = YOLO(self.model_path)

    def train(self, epochs=50, imgsz=640, **kwargs):
        if self.model is None:
            self.load_model()

        # Create a timestamp for the current run directory
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.current_run_dir = os.path.join(self.base_save_dir, timestamp)

        # Train the model with specified parameters and save directory
        self.results = self.model.train(
            data=self.data_path,
            epochs=epochs,
            imgsz=imgsz,
            project=self.base_save_dir,  # Base directory for runs
            name=timestamp,  # Subdirectory name based on timestamp
            **kwargs,  # Allow passing other training arguments
        )
        return self.results

    def val(self, **kwargs):
        if self.model is None:
            print(
                "Model not loaded. Please run train() first or load a model manually."
            )
            return None
        return self.model.val(**kwargs)

    def predict(self, source, **kwargs):
        if self.model is None:
            print(
                "Model not loaded. Please run train() first or load a model manually."
            )
            return None
        return self.model.predict(source=source, **kwargs)

    def export(self, format="torchscript", **kwargs):
        if self.model is None:
            print(
                "Model not loaded. Please run train() first or load a model manually."
            )
            return None
        return self.model.export(format=format, **kwargs)

    def tune(self, **kwargs):
        if self.model is None:
            print(
                "Model not loaded. Please run train() first or load a model manually."
            )
            return None
        return self.model.tune(**kwargs)

    def info(self):
        if self.model is None:
            print(
                "Model not loaded. Please run train() first or load a model manually."
            )
            return None
        return self.model.info()


# --- Usage in the Notebook ---
if __name__ == "__main__":
    # Instantiate the trainer
    # latest_models = sorted(
    #     Path("yolo_runs").rglob("best.pt"), key=lambda x: x.stat().st_ctime
    # )
    for batch_size, version in [(32, "n")]:
        latest_models = sorted(
            Path("yolo_runs").rglob(f"{version}/**/best.pt"),
            key=lambda x: x.stat().st_ctime,
        )

        # if latest_models:
        #     model_name = latest_models[-1].absolute()
        # else:
        #
        model_name = f"yolo12{version}.pt"

        trainer = YOLOTrainer(
            model_path=model_name,  # latest model
            data_path="/home/hidara/Documents/datasets/yolo_dataset_under/data.yaml",
            base_save_dir=f"yolo_runs/{version}/",
        )

        notebook.start("--logdir " + "yolo_runs")

        # --- Training with adjustable settings ---
        epochs_to_train = 250  # Example: Change the number of epochs
        image_size = 512  # Example: Change the image size
        batch_size = 32  # Example: Change the batch size

        clear_output(wait=True)
        print(f"using {model_name} for training")
        print(
            f"Starting training with epochs={epochs_to_train}, image size={image_size}, batch size={batch_size}/n/n"
        )
        training_results = trainer.train(
            epochs=epochs_to_train,
            imgsz=image_size,
            batch=batch_size,
            augment=True,
            multi_scale=True,
            flipud=0.5,
            bgr=0.5,
            mixup=0.8,
            copy_paste=0.8,
            shear=1.0,  # Adjust this value. It's often in degrees.
            perspective=0.001,  # Adjust this value. It's often a small factor.
            crop_fraction=0.9,
        )

        if training_results:
            print("Training completed.")
            print(f"Results saved in: {trainer.current_run_dir}")

            # --- Running TensorBoard ---
            print("\n--- Running TensorBoard ---")
            print(
                f"Navigate to the 'runs' directory within '{trainer.base_save_dir}' to view TensorBoard logs."
            )
            print("You can typically start TensorBoard in your terminal using:")
            print(f"`tensorboard --logdir {trainer.base_save_dir}`")
            print("Then open your browser to http://localhost:6006/")

        else:
            print("Training did not start or encountered an issue.")

        del trainer
        gc.collect()

# INFER

In [None]:
from pathlib import Path
from multiprocessing import Pool
from tools.inference.onnx_inf_super_io import (
    main as onnx_main,
)  # Renamed to avoid confusion with the main script


def process_video(video_path):
    """
    Processes a single video using ONNX inference within a multiprocessing pool.

    Args:
        video_path (Path): Path to the video file.
    """
    # **Crucially, the ONNX inference initialization happens within this function**
    # This ensures each process initializes its own CUDA context if needed.
    return onnx_main(
        input_path=video_path,  # Ensure path is a string for potential compatibility
        onnx_path=Path("best_sides.onnx"),
        labels_dict={0: "Hub", 1: "Wheel"}, #{0: "Brakes"},  # 
        output_dir=Path("outputs"),
        batch_size=16,
        conf_threshold=0.79,
        debug=False,
        cuda_device_id=0,  # You might want to manage device IDs per process if using multiple GPUs
    )

if __name__ == "__main__":
    videos = sorted(Path("videos").glob("*c330e_TC0[12]*.mp4"))

    with Pool(8) as p:
        p.map(process_video, videos)
    # for video_path in videos:
    #     process_video(video_path)

In [None]:
from pathlib import Path
from multiprocessing import Pool
from tools.inference.onnx_inf_super_io import (
    main as onnx_main,
)  # Renamed to avoid confusion with the main script


def process_video(video_path):
    """
    Processes a single video using ONNX inference within a multiprocessing pool.

    Args:
        video_path (Path): Path to the video file.
    """
    # **Crucially, the ONNX inference initialization happens within this function**
    # This ensures each process initializes its own CUDA context if needed.
    return onnx_main(
        input_path=video_path,  # Ensure path is a string for potential compatibility
        onnx_path=Path("deim_outputs/sides/20250416_204621/best_stg2.onnx"),
        labels_dict={0: "Hub", 1: "Wheel", 2: "Wheel", 3: "Wheel"},
        output_dir=Path("outputs"),
        batch_size=16,
        conf_threshold=0.3,
        debug=False,
        cuda_device_id=0,  # You might want to manage device IDs per process if using multiple GPUs
        is_tracking=True
    )


if __name__ == "__main__":
    videos = sorted(Path("videos").glob("*TC0*.mp4"))

    with Pool(8) as p:
        p.map(process_video, videos)
    # for video_path in videos:
    #     process_video(video_path)

In [None]:
from pathlib import Path
from multiprocessing import Pool
from tools.inference.onnx_inf_super_io import (
    main as onnx_main,
)  # Renamed to avoid confusion with the main script


def process_video(video_path):
    """
    Processes a single video using ONNX inference within a multiprocessing pool.

    Args:
        video_path (Path): Path to the video file.
    """
    # **Crucially, the ONNX inference initialization happens within this function**
    # This ensures each process initializes its own CUDA context if needed.
    return onnx_main(
        input_path=video_path,  # Ensure path is a string for potential compatibility
        onnx_path=Path("deim_outputs/under/20250419_124347/best_stg2.onnx"),
        labels_dict={0: "Brakes"}, # {0: "Wheel", 1: "Test"},
        output_dir=Path("outputs"),
        batch_size=64,
        conf_threshold=0.8,
        debug=False,
        cuda_device_id=0,  # You might want to manage device IDs per process if using multiple GPUs
        is_tracking=True
    )


if __name__ == "__main__":
    videos = sorted(Path("videos").glob("*TC03*.mp4"))

    process_video("videos/67a2d700554dd5faf24c592b_TC03_cupy.mp4")

In [None]:
from ultralytics import YOLO

model = YOLO("yolo_runs/n/20250416_215957/weights/best.pt")


model.predict("videos/66d8c56ea502fd4f902c330e_TC03_cupy.mp4", save=True, conf=0.75)