## Initial Setup

In [154]:
class_id_to_name = {
    0:  ('road', [28, 42, 168]),
    1:  ('pool', [0, 50, 89]),
    2:  ('vegetation', [107, 142, 35]),
    3:  ('roof', [70, 70, 70]),
    4:  ('wall', [102, 102, 156]),
    5:  ('window', [254, 228, 12]),
    6:  ('person', [255, 22, 96]),
    7:  ('dog', [102, 51, 0]),
    8:  ('car', [9, 143, 150]),
    9:  ('bicycle', [119, 11, 32]),
    10: ('tree', [51, 51, 0]),
    11: ('truck', [160, 160, 60]),   # added truck
    12: ('bus', [200, 80, 80]),      # added bus
    13: ('vehicle', [200, 80, 80]),      # added bus
}

### Install Packages

In [155]:
# !pip install numpy
# !pip install opencv-python
# !pip install pillow
# !pip install matplotlib
# !pip install tqdm
# !pip install scikit-learn
# !pip install torch torchvision
# !pip install ultralytics



In [156]:
# !pip uninstall torch torchvision torchaudio
# !pip cache purge  # clean out pip's install cache
# !pip install torch torchvision torchaudio --force-reinstall


In [157]:
# Core packages
import os
import shutil
import json
import zipfile
import xml.etree.ElementTree as ET
from pathlib import Path

# Math and array handling
import numpy as np
from sklearn.model_selection import train_test_split

# Image and visualization
import cv2
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

# Progress bar
from tqdm.auto import tqdm

# Deep Learning Frameworks
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.models.segmentation as segmentation
import torchvision.transforms as transforms

# Object Detection and Segmentation
from ultralytics import YOLO

from torch.utils.data import DataLoader
import gc

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# Automatically use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

%matplotlib inline

import random
import gdown

from collections import defaultdict
from glob import glob

from PIL import Image

# save this as split_uavdt_train_val.py

import os
import shutil
from glob import glob
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm




### Download Datasets

In [158]:
def download_and_extract_from_gdrive1(gdrive_url, extract_to="extracted"):
    # Convert shared drive URL to direct download URL
    file_id = gdrive_url.split("/d/")[1].split("/")[0]
    download_url = f"https://drive.google.com/uc?id={file_id}"

    # Create output folder
    os.makedirs(extract_to, exist_ok=True)

    zip_path = os.path.join(extract_to, "downloaded.zip")

    print("[INFO] Downloading ZIP from Google Drive...")
    gdown.download(download_url, zip_path, quiet=False)

    print("[INFO] Extracting ZIP...")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

    # Optionally, remove the ZIP file after extraction
    os.remove(zip_path)

    print(f"[DONE] Extracted files to: {extract_to}")

def download_and_extract_from_gdrive2(gdrive_url, extract_to="extracted"):
    # Convert shared drive URL to direct download URL
    file_id = gdrive_url.split("/d/")[1].split("/")[0]
    download_url = f"https://drive.google.com/uc?id={file_id}"

    # Create output folder
    os.makedirs(extract_to, exist_ok=True)

    zip_path = os.path.join(extract_to, "downloaded.zip")

    print("[INFO] Downloading ZIP from Google Drive...")
    gdown.download(download_url, zip_path, quiet=False)

    print("[INFO] Extracting ZIP...")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

    # Optionally, remove the ZIP file after extraction
    os.remove(zip_path)

    print(f"[DONE] Extracted files to: {extract_to}")



### Convert downloaded Dataset into yolo format

In [159]:
# ----------------------------
# Parse polygon and convert to YOLO bbox
# ----------------------------
# Semantic drone datasets 
def parse_yolo_style_bbox_from_xml(xml_path, class_id_to_name):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    bboxes = []
    for obj in root.findall('object'):
        class_name = obj.find('name').text
        if class_name in [value[0] for value in class_id_to_name.values()]:
            polygon = obj.find('polygon')
            if polygon is not None:
                points = polygon.findall('pt')
                coords = [(float(pt.find('x').text), float(pt.find('y').text)) for pt in points]
                x_min = min(coord[0] for coord in coords)
                y_min = min(coord[1] for coord in coords)
                x_max = max(coord[0] for coord in coords)
                y_max = max(coord[1] for coord in coords)
                bboxes.append(((x_min, y_min), (x_max, y_max), class_name))
    return bboxes


# ----------------------------
# Save YOLO-format txt
# ----------------------------
def save_yolo_format(image_id, bboxes, image_width, image_height, output_path, class_id_to_name):
    with open(output_path, 'w') as f:
        for (x_min, y_min), (x_max, y_max), class_name in bboxes:
            class_id = next(cid for cid, (name, _) in class_id_to_name.items() if name == class_name)
            x_center = (x_min + x_max) / 2 / image_width
            y_center = (y_min + y_max) / 2 / image_height
            width = (x_max - x_min) / image_width
            height = (y_max - y_min) / image_height
            f.write(f"{class_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}\n")


# ----------------------------
# Convert dataset (YOLO only)
# ----------------------------
def convert_fulldataset_yolo_only(dataset_path, output_dir, class_id_to_name):
    image_ids = [img.split('.')[0] for img in os.listdir(f"{dataset_path}/images") if img.endswith(".jpg")]

    os.makedirs(f"{output_dir}/images", exist_ok=True)
    os.makedirs(f"{output_dir}/labels", exist_ok=True)

    for image_id in tqdm(image_ids, desc="Converting to YOLO"):
        img_path = f"{dataset_path}/images/{image_id}.jpg"
        bbox_xml_path = f"{dataset_path}/gt/bounding_box/label_me_xml/{image_id}.xml"
        semantic_xml_path = f"{dataset_path}/gt/semantic/label_me_xml/{image_id}.xml"

        if not os.path.exists(img_path):
            print(f"[WARNING] Image not found: {img_path}, skipping...")
            continue

        try:
            bboxes1 = parse_yolo_style_bbox_from_xml(bbox_xml_path, class_id_to_name)
            bboxes2 = parse_yolo_style_bbox_from_xml(semantic_xml_path, class_id_to_name)
            all_bboxes = bboxes1 + bboxes2
        except Exception as e:
            print(f"[WARNING] Skipping image {image_id} due to parse error: {e}")
            continue

        try:
            image = Image.open(img_path)
            image_np = np.array(image)
        except Exception as e:
            print(f"[WARNING] Could not load image {image_id}: {e}")
            continue

        # Save image
        image.save(f"{output_dir}/images/{image_id}.jpg")

        # Save YOLO labels
        yolo_annotation_path = f"{output_dir}/labels/{image_id}.txt"
        save_yolo_format(image_id, all_bboxes, image_np.shape[1], image_np.shape[0], yolo_annotation_path, class_id_to_name)

    print("✅ YOLO-format annotation conversion complete!")

In [160]:
import os
import shutil
import cv2
from glob import glob
from collections import defaultdict
from sklearn.model_selection import train_test_split

# 🧠 Map UAVDT class to extended class_id_to_name
uavdt_to_extended = {
    0: 8,   # car
    1: 11,  # truck
    2: 12,  # bus
    3: 13
}

# === Function to Convert Single Annotation to YOLO Format ===
def convert_annotation(anno_path, label_path, image_path, stats):
    if not os.path.exists(image_path):
        stats["missing_image"] += 1
        return

    try:
        img = cv2.imread(image_path)
        height, width = img.shape[:2]
    except:
        stats["missing_image"] += 1
        return

    with open(anno_path, 'r') as fin, open(label_path, 'w') as fout:
        for line in fin:
            parts = line.strip().split(',')
            if len(parts) < 8:
                stats["malformed"] += 1
                continue

            try:
                x, y, w, h = map(float, parts[0:4])
                original_cls = int(parts[5])

                # 🔁 Convert original class to extended class
                if original_cls not in uavdt_to_extended:
                    stats["skipped"][original_cls] += 1
                    continue

                cls = uavdt_to_extended[original_cls]

                x_center = (x + w / 2) / width
                y_center = (y + h / 2) / height
                w /= width
                h /= height

                if not (0 <= x_center <= 1 and 0 <= y_center <= 1 and w > 0 and h > 0):
                    stats["skipped"][cls] += 1
                    continue

                fout.write(f"{cls} {x_center:.6f} {y_center:.6f} {w:.6f} {h:.6f}\n")
                stats["converted"] += 1
            except Exception:
                stats["malformed"] += 1
                continue

            stats["total"] += 1

# === Step 1: Convert UAVDT annotations to YOLO format ===
def convert_dataset(root_dir):
    annotation_paths = glob(os.path.join(root_dir, "M*/annotations/*.txt"))
    total_files = len(annotation_paths)

    stats = {
        "total": 0,
        "converted": 0,
        "malformed": 0,
        "missing_image": 0,
        "skipped": defaultdict(int)
    }

    print(f"🔄 Converting {total_files} annotation files to YOLO format...")

    for anno_path in tqdm(annotation_paths, desc="Converting", unit="file"):
        sequence_dir = os.path.dirname(os.path.dirname(anno_path))  # Mxxxx
        file_name = os.path.basename(anno_path)

        label_dir = os.path.join(sequence_dir, "labels")
        os.makedirs(label_dir, exist_ok=True)

        label_path = os.path.join(label_dir, file_name)

        # Construct image path
        image_name = file_name.replace(".txt", ".jpg")
        image_path = os.path.join(sequence_dir, "images", image_name)

        convert_annotation(anno_path, label_path, image_path, stats)

    print("\n✅ Conversion complete.")
    print(f"📊 Total boxes:     {stats['total']}")
    print(f"✅ Converted boxes: {stats['converted']}")
    print(f"❌ Skipped boxes:   {sum(stats['skipped'].values())}")
    for cls, count in sorted(stats["skipped"].items()):
        print(f"   - Skipped class {cls}: {count}")
    print(f"⚠️ Malformed lines: {stats['malformed']}")
    print(f"🖼️  Missing images: {stats['missing_image']}")

# === Step 2: Copy to train/val structure ===
def copy_split_sequences(src_root, dst_root, train_ratio=0.8):
    all_sequences = sorted(glob(os.path.join(src_root, "M*")))
    train_seqs, val_seqs = train_test_split(all_sequences, train_size=train_ratio, random_state=42)

    for split_name, split_list in zip(['train', 'val'], [train_seqs, val_seqs]):
        for seq_path in tqdm(split_list, desc=f"Copying {split_name}"):
            images_src = os.path.join(seq_path, "images")
            labels_src = os.path.join(seq_path, "labels")

            images_dst = os.path.join(dst_root, split_name, "images")
            labels_dst = os.path.join(dst_root, split_name, "labels")

            os.makedirs(images_dst, exist_ok=True)
            os.makedirs(labels_dst, exist_ok=True)

            for img_file in glob(os.path.join(images_src, "*.jpg")):
                shutil.copy(img_file, os.path.join(images_dst, os.path.basename(img_file)))

            for label_file in glob(os.path.join(labels_src, "*.txt")):
                shutil.copy(label_file, os.path.join(labels_dst, os.path.basename(label_file)))

    print("\n✅ Dataset split into 'train/' and 'val/' with images and YOLO labels.")


### Visualizing images with bounding boxes

In [161]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import matplotlib.patches as mpatches

# Load YOLO annotations
def load_yolo_annotations(anno_file):
    with open(anno_file, 'r') as f:
        lines = f.readlines()
    boxes = []
    for line in lines:
        class_id, x_center, y_center, width, height = map(float, line.split())
        boxes.append((class_id, x_center, y_center, width, height))
    return boxes

# Visualize image + boxes with dynamic font/thickness
def visualize_data(image_id, class_id_to_name, image_dir, annotations_dir):
    img_path = os.path.join(image_dir, f"{image_id}.jpg")
    annotation_path = os.path.join(annotations_dir, f"{image_id}.txt")

    if not os.path.exists(img_path) or not os.path.exists(annotation_path):
        print(f"[WARNING] Missing files for {image_id}, skipping...")
        return

    # Load image
    image = Image.open(img_path)
    image = np.array(image)

    height, width = image.shape[:2]

    # Dynamic scaling factors
    scale = max(width, height) / 1000.0  # adjust divisor for tuning
    rectangle_thickness = int(2 * scale)
    font_scale = 0.7 * scale
    font_thickness = int(2 * scale)
    text_color = (255, 0, 0)

    # Load annotations
    boxes = load_yolo_annotations(annotation_path)

    # Draw boxes
    image_with_boxes = image.copy()
    for box in boxes:
        class_id, x_center, y_center, w, h = box
        class_name, color = class_id_to_name[int(class_id)]

        x_min = int((x_center - w / 2) * width)
        y_min = int((y_center - h / 2) * height)
        x_max = int((x_center + w / 2) * width)
        y_max = int((y_center + h / 2) * height)

        cv2.rectangle(image_with_boxes, (x_min, y_min), (x_max, y_max), color, rectangle_thickness)
        cv2.putText(
            image_with_boxes,
            class_name,
            (x_min, max(y_min - 10, 0)),
            cv2.FONT_HERSHEY_SIMPLEX,
            font_scale,
            text_color,
            font_thickness,
            lineType=cv2.LINE_AA
        )

    # Plot with class legend
    plt.figure(figsize=(12, 10))
    plt.imshow(image_with_boxes)
    plt.title(f"Image ID: {image_id}")
    legend_handles = [
        mpatches.Patch(color=np.array(rgb) / 255.0, label=f"{name} ({cid})")
        for cid, (name, rgb) in class_id_to_name.items()
    ]
    plt.legend(handles=legend_handles, loc='upper right', fontsize=10)
    plt.axis('off')
    plt.tight_layout()
    plt.show()


### Convert into train set and val set 

In [162]:
# Semantic Drone Datasets
def move_files(file_list, 
               source_image_dir, 
               source_annotation_dir,
               target_image_dir, 
               target_annotation_dir):
    
    os.makedirs(target_image_dir, exist_ok=True)
    os.makedirs(target_annotation_dir, exist_ok=True)

    for image_id in tqdm(file_list, desc=f"Moving to {os.path.basename(os.path.dirname(target_image_dir))}"):
        image_path = os.path.join(source_image_dir, f"{image_id}.jpg")
        annotation_path = os.path.join(source_annotation_dir, f"{image_id}.txt")

        target_image_path = os.path.join(target_image_dir, f"{image_id}.jpg")
        target_annotation_path = os.path.join(target_annotation_dir, f"{image_id}.txt")

        if os.path.exists(image_path):
            shutil.copy(image_path, target_image_path)
        else:
            print(f"[Warning] Missing image: {image_path}")

        if os.path.exists(annotation_path):
            shutil.copy(annotation_path, target_annotation_path)
        else:
            print(f"[Warning] Missing annotation: {annotation_path}")

def split_and_move_dataset(source_base_dir="./datasets/new_dataset_yolo",
                           target_base_dir="./datasets/new_dataset_yolo_split",
                           split_ratio=0.8,
                           seed=42):
    random.seed(seed)

    image_dir = os.path.join(source_base_dir, "images")
    label_dir = os.path.join(source_base_dir, "labels")

    image_ids = [os.path.splitext(f)[0] for f in os.listdir(image_dir) if f.endswith(".jpg")]
    random.shuffle(image_ids)

    split_idx = int(len(image_ids) * split_ratio)
    train_ids = image_ids[:split_idx]
    val_ids = image_ids[split_idx:]

    # Train
    move_files(train_ids,
               source_image_dir=image_dir,
               source_annotation_dir=label_dir,
               target_image_dir=os.path.join(target_base_dir, "train/images"),
               target_annotation_dir=os.path.join(target_base_dir, "train/labels"))

    # Val
    move_files(val_ids,
               source_image_dir=image_dir,
               source_annotation_dir=label_dir,
               target_image_dir=os.path.join(target_base_dir, "val/images"),
               target_annotation_dir=os.path.join(target_base_dir, "val/labels"))

    print(f"\n[✓] Dataset split completed: {len(train_ids)} train / {len(val_ids)} val samples")


### Normalize labels

In [163]:
import os
from PIL import Image

def normalize_label_file(label_file, img_width, img_height):
    """
    Normalize the label coordinates in a label file to ensure they are within [0, 1] range.
    """
    with open(label_file, 'r') as f:
        lines = f.readlines()
    
    with open(label_file, 'w') as f:
        for line in lines:
            parts = line.strip().split()
            class_id = int(parts[0])
            x_center, y_center, width, height = map(float, parts[1:])
            
            # Normalize coordinates to ensure they are within the range [0, 1]
            x_center = min(1.0, max(0.0, x_center))
            y_center = min(1.0, max(0.0, y_center))
            width = min(1.0, max(0.0, width))
            height = min(1.0, max(0.0, height))

            # Write normalized values back to file
            f.write(f"{class_id} {x_center} {y_center} {width} {height}\n")


def get_image_size(img_path):
    """
    Get the width and height of the image to normalize the coordinates properly.
    """
    with Image.open(img_path) as img:
        return img.size  # returns (width, height)


def normalize_all_labels(labels_dir, img_dir):
    """
    Normalize all label files in the specified directory.
    """
    for label_file in tqdm(os.listdir(labels_dir)):
       
        if label_file.endswith('.txt'):  # Process only label files
            label_path = os.path.join(labels_dir, label_file)
            img_path = os.path.join(img_dir, label_file.replace('.txt', '.jpg'))  # Assuming JPG images
            if os.path.exists(img_path):
                # Get image dimensions to normalize the labels
                img_width, img_height = get_image_size(img_path)
                # print(f"Normalizing {label_file}...")
                normalize_label_file(label_path, img_width, img_height)
            else:
                print(f"Warning: Image for label {label_file} not found!")
    print("Normalize Complete")



### Training 

In [164]:
def train_yolo(data_yaml="uavdt_yolo.yaml", epochs=40, imgsz=720, batch=8, name="yolov8-uavdt"):
    model = YOLO("yolov8s.pt")

    model.train(
        data=data_yaml,
        epochs=epochs,
        imgsz=imgsz,
        batch=batch,
        name=name,
        project="runs_yolo/train",
        patience=20,  # Early stopping
        augment=True,  # Apply augmentations
        degrees=10,  # Image rotation
        scale=0.5,  # Scale range
        flipud=0.2,  # Vertical flip
        fliplr=0.5,  # Horizontal flip
        hsv_h=0.015,  # Hue augmentation
        hsv_s=0.7,  # Saturation augmentation
        hsv_v=0.4,  # Value augmentation
        mosaic=1.0,  # Mosaic augmentation
        mixup=0.2,  # Mixup augmentation
        lr0=0.01,  # Initial learning rate (you can tune this)
        lrf=0.01,  # Learning rate final factor (for cosine annealing)
        verbose=True  # Print progress
    )

    metrics = model.val()
    print(metrics)


### Finding best model path

In [165]:
def find_best_pt(base_dir='runs_yolo/'):
    best_paths = list(Path(base_dir).rglob('best.pt'))
    if not best_paths:
        raise FileNotFoundError("No 'best.pt' file found in the 'runs/' directory.")
    
    # Optionally, sort by latest modified time
    best_paths.sort(key=lambda p: p.stat().st_mtime, reverse=True)
    
    print(f"✅ Found best.pt at: {best_paths[0]}")
    return str(best_paths[0])


### Prediciton on val images

In [166]:
import os
import random
import numpy as np
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
from ultralytics import YOLO

# -------- Load YOLO Model -------- #
def load_yolo_model(weight_path: str):
    return YOLO(weight_path)

# -------- Run YOLO Prediction -------- #
def run_yolo(model, image: Image.Image):
    temp_path = "temp_yolo.jpg"
    image.save(temp_path)
    result = model.predict(source=temp_path, conf=0.5, save=False, verbose=False)[0]
    os.remove(temp_path)
    return result

# -------- Load Scalable Font -------- #
def get_large_font(size=100):
    try:
        return ImageFont.truetype("arial.ttf", size=size)
    except:
        try:
            return ImageFont.truetype("DejaVuSans.ttf", size=size)
        except:
            return ImageFont.load_default()

# -------- Draw Bounding Boxes -------- #
def draw_yolo_boxes(draw: ImageDraw.ImageDraw, result, font):
    boxes = result.boxes
    for i in range(len(boxes)):
        box = boxes.xyxy[i].cpu().numpy()
        cls_id = int(boxes.cls[i])
        conf = float(boxes.conf[i])
        name, color = class_id_to_name.get(cls_id, (f"id_{cls_id}", [255, 255, 255]))
        label = f"{name} {conf:.2f}"

        draw.rectangle(box.tolist(), outline=tuple(color), width=10)  # Thicker box
        draw.text((box[0], box[1] - 120), label, fill=tuple(color), font=font)  # Adjust for bigger font

# -------- Draw Embedded Legend -------- #
def draw_legend_on_image(image: Image.Image, result, x_offset=20, y_offset=20, font=None):
    draw = ImageDraw.Draw(image)
    class_ids = result.boxes.cls.cpu().numpy().astype(int)
    unique_ids = np.unique(class_ids)
    spacing = 120  # Increased spacing
    box_size = 80  # Larger color box

    for idx, cls_id in enumerate(unique_ids):
        name, color = class_id_to_name.get(cls_id, (f"id_{cls_id}", [255, 255, 255]))
        y = y_offset + idx * spacing
        draw.rectangle([x_offset, y, x_offset + box_size, y + box_size], fill=tuple(color))
        draw.text((x_offset + box_size + 30, y), f"{cls_id}: {name}", fill=(255, 255, 255), font=font)

# -------- Visualize Random Images -------- #
def visualize_yolo_on_random_images(val_image_dir, yolo_model, num_images=5):
    image_files = [f for f in os.listdir(val_image_dir) if f.lower().endswith((".jpg", ".jpeg", ".png"))]
    random.shuffle(image_files)
    selected_images = image_files[:num_images]

    font = get_large_font(size=100)

    for image_file in selected_images:
        image_path = os.path.join(val_image_dir, image_file)
        original_image = Image.open(image_path).convert("RGB")

        # Run YOLO
        yolo_result = run_yolo(yolo_model, original_image)

        # Draw on image
        yolo_image = original_image.copy()
        draw = ImageDraw.Draw(yolo_image)
        draw_yolo_boxes(draw, yolo_result, font=font)
        draw_legend_on_image(yolo_image, yolo_result, font=font)

        # Show
        plt.figure(figsize=(10, 8))
        plt.imshow(yolo_image)
        plt.title(f"YOLO Prediction: {image_file}", fontsize=32)
        plt.axis("off")
        plt.show()


### Predcitions on videos

In [167]:

# ========== FRAME PROCESSING ==========
def process_frame(frame, yolo_model, w, h, class_id_to_name):
    annotated = frame.copy()
    results = yolo_model(annotated, verbose=False)[0]
    boxes = results.boxes.xyxy.cpu().numpy()
    class_ids = results.boxes.cls.cpu().numpy()

    for box, cls_id in zip(boxes, class_ids):
        x1, y1, x2, y2 = map(int, box)
        class_name, color = class_id_to_name[int(cls_id)]
        cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
        cv2.putText(annotated, class_name, (x1, max(y1 - 10, 10)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2)

    return annotated, boxes, class_ids

# ========== VIDEO CAPTURE ==========
def setup_video_capture(video_path):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    return cap, total_frames, fps, w, h

# ========== MAIN FUNCTION ==========
def process_all_videos(yolo_weights_path, class_id_to_name, video_dir='videos', output_base='opt', max_frames=None):
    yolo_model = YOLO(yolo_weights_path)

    image_out_dir = os.path.join(output_base, 'images')
    label_out_dir = os.path.join(output_base, 'labels')
    output_video_dir = os.path.join(output_base, 'output')

    os.makedirs(image_out_dir, exist_ok=True)
    os.makedirs(label_out_dir, exist_ok=True)
    os.makedirs(output_video_dir, exist_ok=True)

    for video_file in tqdm(sorted(os.listdir(video_dir))):
        if not video_file.lower().endswith(".mp4"):
            continue

        video_id = os.path.splitext(video_file)[0]
        video_path = os.path.join(video_dir, video_file)
        output_video_path = os.path.join(output_video_dir, f"{video_id}.mp4")

        print(f"\n========== STARTED: {video_id} ==========")
        cap, total_frames, fps, w, h = setup_video_capture(video_path)
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        writer = cv2.VideoWriter(output_video_path, fourcc, fps, (w, h))

        frame_count = 0
        pbar = tqdm(total=max_frames if max_frames else total_frames, desc=video_id)

        while True:
            ret, frame = cap.read()
            if not ret or (max_frames and frame_count >= max_frames):
                break

            annotated_bgr, boxes, class_ids = process_frame(frame, yolo_model, w, h, class_id_to_name)

            # ✅ Save original image
            img_filename = f'{video_id}_{frame_count:04d}.jpg'
            img_path = os.path.join(image_out_dir, img_filename)
            cv2.imwrite(img_path, frame)

            # ✅ Save YOLO-format label
            label_filename = f'{video_id}_{frame_count:04d}.txt'
            label_path = os.path.join(label_out_dir, label_filename)
            with open(label_path, 'w') as f:
                for box, cls_id in zip(boxes, class_ids):
                    x1, y1, x2, y2 = box
                    w_box = x2 - x1
                    h_box = y2 - y1
                    cx = x1 + w_box / 2
                    cy = y1 + h_box / 2
                    f.write(f"{int(cls_id)} {cx/w:.6f} {cy/h:.6f} {w_box/w:.6f} {h_box/h:.6f}\n")

            writer.write(annotated_bgr)
            frame_count += 1
            pbar.update(1)

        cap.release()
        writer.release()
        pbar.close()
        print(f"✔ DONE: {video_id} — Processed {frame_count} frames")


### Print Metrics

In [168]:
import os
import pandas as pd

def find_results_csv(directory):
    """Find the results.csv file in the specified directory."""
    for root, dirs, files in os.walk(directory):
        if 'results.csv' in files:
            return os.path.join(root, 'results.csv')
    return None

def load_results_csv(results_csv_path):
    """Load the results CSV into a pandas DataFrame."""
    return pd.read_csv(results_csv_path)

def calculate_total_epochs(df):
    """Calculate the total number of epochs from the DataFrame."""
    return df['epoch'].max()

def calculate_training_loss(epoch_data):
    """Calculate the total training loss from the given epoch data."""
    train_box_loss = epoch_data['train/box_loss']
    train_cls_loss = epoch_data['train/cls_loss']
    train_dfl_loss = epoch_data['train/dfl_loss']
    return train_box_loss + train_cls_loss + train_dfl_loss

def calculate_validation_loss(epoch_data):
    """Calculate the total validation loss from the given epoch data."""
    val_box_loss = epoch_data['val/box_loss']
    val_cls_loss = epoch_data['val/cls_loss']
    val_dfl_loss = epoch_data['val/dfl_loss']
    return val_box_loss + val_cls_loss + val_dfl_loss

def print_final_metrics(df):
    """Print the final metrics for the last epoch."""
    final_epoch_data = df.iloc[-1]

    # Calculate total training and validation loss
    train_loss = calculate_training_loss(final_epoch_data)
    val_loss = calculate_validation_loss(final_epoch_data)

    # Print results
    print("\n========== Final Training Metrics ==========")
    print(f"Training Loss: {train_loss:.6f}")
    print(f"Precision: {final_epoch_data['metrics/precision(B)']:.6f}")
    print(f"Recall: {final_epoch_data['metrics/recall(B)']:.6f}")
    print(f"mAP@0.5: {final_epoch_data['metrics/mAP50(B)']:.6f}")
    print(f"mAP@0.5:0.95: {final_epoch_data['metrics/mAP50-95(B)']:.6f}")

    print("\n========== Final Validation Metrics ==========")
    print(f"Validation Loss: {val_loss:.6f}")
    print(f"Validation Precision: {final_epoch_data['metrics/precision(B)']:.6f}")  # Validation precision (use corresponding val column)
    print(f"Validation Recall: {final_epoch_data['metrics/recall(B)']:.6f}")  # Validation recall (use corresponding val column)
    print(f"Validation mAP@0.5: {final_epoch_data['metrics/mAP50(B)']:.6f}")  # Validation mAP@0.5 (use corresponding val column)
    print(f"Validation mAP@0.5:0.95: {final_epoch_data['metrics/mAP50-95(B)']:.6f}")  # Validation mAP@0.5:0.95 (use corresponding val column)

def main(directory):
    """Main function to process and print final metrics."""
    # Find the results.csv file
    results_csv_path = find_results_csv(directory)
    
    if not results_csv_path:
        print("Error: 'results.csv' file not found in the specified directory.")
        return

    print(f"Found results.csv at: {results_csv_path}")

    # Load results CSV
    df = load_results_csv(results_csv_path)
    # Get the total number of epochs
    total_epochs = calculate_total_epochs(df)
    print(f"Total number of epochs: {total_epochs}")

    # Print columns in the CSV
    # print("\n========== Columns in CSV ==========")
    # print(df.columns)

    # Print final metrics
    print_final_metrics(df)


### Retrain and Quantization

## Full Pipeline

### Downaload and Convert Dataset

In [169]:

# gdrive_url = "https://drive.google.com/file/d/1UppumYqYOi-kto6BWPfFxwJK2Eph46oY/view?usp=sharing"
# download_and_extract_from_gdrive1(gdrive_url, extract_to="datasets")

# --------- Usage ----------
# gdrive_url = "https://drive.google.com/file/d/12cbrTaBAMIsuU-mwAA7IgDk9wSLC9cC-/view?usp=sharing"
# download_and_extract_from_gdrive2(gdrive_url, extract_to="datasets")

In [170]:


# # Path to the dataset
# dataset_path = "./datasets/semantic_drone_dataset/training_set"
# output_dir = "./datasets/new_dataset_yolo"

# convert_fulldataset_yolo_only(dataset_path, output_dir, class_id_to_name)



In [171]:
# #UAVDT-2024

# source_root = "./datasets/UAVDT-2024"
# output_root = "./datasets/new_dataset_yolo_split"

# convert_dataset(source_root)
# copy_split_sequences(source_root, output_root, train_ratio=0.8)


# # Semantic dorne datasets
# split_and_move_dataset()


In [172]:
# # Set your paths
# dataset_path = "./datasets/new_dataset_yolo_split/train"
# image_dir = os.path.join(dataset_path, "images")
# annotations_dir = os.path.join(dataset_path, "labels")

# normalize_all_labels(annotations_dir, image_dir)

# dataset_path = "./datasets/new_dataset_yolo_split/val"
# image_dir = os.path.join(dataset_path, "images")
# annotations_dir = os.path.join(dataset_path, "labels")

# normalize_all_labels(annotations_dir, image_dir)

In [173]:
# dataset_path = "./datasets/new_dataset_yolo_split/train"
# image_dir = os.path.join(dataset_path, "images")
# annotations_dir = os.path.join(dataset_path, "labels")


# # Visualize 10 random images
# image_ids = [f.split('.')[0] for f in os.listdir(image_dir) if f.endswith('.jpg')]
# random_image_ids = random.sample(image_ids, min(30, len(image_ids)))

# for image_id in random_image_ids:
#     visualize_data(image_id, class_id_to_name, image_dir, annotations_dir)



### Training the datatsets 

In [174]:
# import shutil
# import os

# # List of folders to delete
# folders_to_delete = ['./datasets/new', './datasets/new_dataset_yolo', './datasets/uavdt-processed', './runs_yolo']

# for folder_path in folders_to_delete:
#     if os.path.exists(folder_path):
#         shutil.rmtree(folder_path)
#         print(f"✅ Deleted folder: {folder_path}")
#     else:
#         print(f"⚠️ Folder does not exist: {folder_path}")


In [175]:
# print("[+] Training Start")
# gc.collect()
# torch.cuda.empty_cache()
# # # Train YOLOv8
# train_yolo(data_yaml="uavdt_yolo.yaml",  epochs=200, imgsz=720, batch=8, name="yolov8-uavdt")


### Prediction on val images

In [176]:
# # Load the model
# best_pt_path = find_best_pt()

# val_image_dir = "./datasets/new_dataset_yolo_split/val/images"  # <<-- make sure this path exists
    
# yolo_model = load_yolo_model(best_pt_path)
# visualize_yolo_on_random_images(val_image_dir, yolo_model, num_images=10)  # Show only 10 images

### Prediciton on videos

In [177]:
# Path to your results directory (where 'results.csv' should be located)
directory = './runs_yolo'
main(directory)

Found results.csv at: ./runs_yolo\train\yolov8-uavdt\results.csv
Total number of epochs: 97

Training Loss: 1.896580
Precision: 0.482240
Recall: 0.319210
mAP@0.5: 0.353800
mAP@0.5:0.95: 0.229580

Validation Loss: 5.205170
Validation Precision: 0.482240
Validation Recall: 0.319210
Validation mAP@0.5: 0.353800
Validation mAP@0.5:0.95: 0.229580


In [None]:
best_pt_path = find_best_pt()
process_all_videos(yolo_weights_path=best_pt_path, class_id_to_name=class_id_to_name, video_dir='./videos', output_base='./opt', max_frames=None)

✅ Found best.pt at: runs_yolo\train\yolov8-uavdt\weights\best.pt


  0%|          | 0/12 [00:00<?, ?it/s]




v1:   0%|          | 0/897 [00:00<?, ?it/s]

✔ DONE: v1 — Processed 897 frames



v10:   0%|          | 0/415 [00:00<?, ?it/s]

✔ DONE: v10 — Processed 415 frames



v11:   0%|          | 0/1242 [00:00<?, ?it/s]

✔ DONE: v11 — Processed 1242 frames



v12:   0%|          | 0/1122 [00:00<?, ?it/s]

✔ DONE: v12 — Processed 1122 frames



v2:   0%|          | 0/778 [00:00<?, ?it/s]

In [None]:
import torch

# Print the CUDA version PyTorch is built with
print("Built CUDA Version:", torch.version.cuda)

# Print the CUDA version runtime (if CUDA is available)
if torch.cuda.is_available():
    print("CUDA Runtime Version:", torch._C._cuda_getCompiledVersion())
    print("GPU Name:", torch.cuda.get_device_name(0))
else:
    print("CUDA is not available.")


Built CUDA Version: None
CUDA is not available.
