In [None]:
import os
import kaggle
import yaml
import pathlib
import random
import shutil
import torch
from ultralytics import YOLO
import cv2
import matplotlib.pyplot as plt
import numpy as np
print('Data source import complete.')


# YOLOv8
YOLOv8 is the newest state-of-the-art YOLO model that can be used for object detection, image classification, and instance segmentation tasks.<br>
YOLOv8 includes numerous architectural and developer experience changes and improvements over YOLOv5.<br>

## Why Should I Use YOLOv8?
* YOLOv8 has a high rate of accuracy measured by COCO and Roboflow 100.
* YOLOv8 comes with a lot of developer-convenience features, from an easy-to-use CLI to a well-structured Python package.

In [None]:
dataset_slug = 'nguyngiabol/colorful-fashion-dataset-for-object-detection'
output_path = './dataset' # The folder where data will be stored

if not os.path.exists(output_path):
    print(f"Dataset not found. Downloading '{dataset_slug}' from Kaggle...")
    os.makedirs(output_path, exist_ok=True)
    # The API command to download and unzip the files
    kaggle.api.dataset_download_files(dataset_slug, path=output_path, unzip=True)
    print("✅ Download and extraction complete.")
else:
    print(f"✅ Dataset already found at '{output_path}'. Skipping download.")

In [None]:
# Define key paths for the rest of the script
dataset_root = os.path.join(output_path, 'colorful_fashion_dataset_for_object_detection')
images_path = os.path.join(dataset_root, 'JPEGImages')
original_annotations_path = os.path.join(dataset_root, 'Annotations_txt')

In [None]:
# Original classes from the downloaded dataset's labels.txt
original_classes = {
    0: 'sunglass', 1: 'hat', 2: 'jacket', 3: 'shirt', 4: 'pants',
    5: 'shorts', 6: 'skirt', 7: 'dress', 8: 'bag', 9: 'shoe'
}

# PHASE 1: New coarse classes (TOP, BOTTOM, SHOES)
phase1_classes = {'TOP': 0, 'BOTTOM': 1, 'SHOES': 2}
# This maps the ORIGINAL class ID to the NEW Phase 1 class ID
phase1_map = {
    0: None,  # sunglass
    1: None,  # hat
    2: 0,     # jacket -> TOP
    3: 0,     # shirt  -> TOP
    4: 1,     # pants  -> BOTTOM
    5: 1,     # shorts -> BOTTOM
    6: 1,     # skirt  -> BOTTOM
    7: 0,     # dress  -> TOP
    8: None,  # bag
    9: 2      # shoe   -> SHOES
}

# PHASE 2: Your new fine-grained classes (without sunglass, bag, hat)
phase2_classes = {
    'jacket': 0, 'shirt': 1, 'pants': 2, 'shorts': 3,
    'skirt': 4, 'dress': 5, 'shoe': 6
}
# This maps the ORIGINAL class ID to the NEW Phase 2 class ID
phase2_map = {
    0: None,  # sunglass
    1: None,  # hat
    2: 0,     # jacket
    3: 1,     # shirt
    4: 2,     # pants
    5: 3,     # shorts
    6: 4,     # skirt
    7: 5,     # dress
    8: None,  # bag
    9: 6      # shoe
}

In [None]:
labels_phase1_dir = os.path.join(dataset_root, 'labels_phase1')
labels_phase2_dir = os.path.join(dataset_root, 'labels_phase2')

In [None]:
# --- 3. Create the new label directories ---
# Clear any old directories first and create clean ones
if os.path.exists(labels_phase1_dir): shutil.rmtree(labels_phase1_dir)
os.makedirs(labels_phase1_dir)

if os.path.exists(labels_phase2_dir): shutil.rmtree(labels_phase2_dir)
os.makedirs(labels_phase2_dir)

In [None]:
# --- 4. Process each original label file ---
processed_files = 0
for filename in os.listdir(original_annotations_path):
    if not filename.endswith('.txt'):
        continue

    new_phase1_lines = []
    new_phase2_lines = []

    with open(os.path.join(original_annotations_path, filename), 'r') as f:
        for line in f:
            parts = line.strip().split()
            if not parts:
                continue

            try:
                original_class_id = int(parts[0])
            except ValueError:
                continue

            box_coords = ' '.join(parts[1:])

            # --- Process for Phase 1 ---
            if original_class_id in phase1_map:
                new_class_id = phase1_map[original_class_id]
                if new_class_id is not None:
                    new_phase1_lines.append(f"{new_class_id} {box_coords}")

            # --- Process for Phase 2 ---
            if original_class_id in phase2_map:
                new_class_id = phase2_map[original_class_id]
                if new_class_id is not None:
                    new_phase2_lines.append(f"{new_class_id} {box_coords}")

    # Write the new, filtered label files for each phase (create empty file if nothing)
    with open(os.path.join(labels_phase1_dir, filename), 'w') as f:
        if new_phase1_lines:
            f.write('\n'.join(new_phase1_lines))

    with open(os.path.join(labels_phase2_dir, filename), 'w') as f:
        if new_phase2_lines:
            f.write('\n'.join(new_phase2_lines))

    processed_files += 1

print(f"✅ Automatically processed {processed_files} files.")
print(f"-> Phase 1 labels are ready in: '{labels_phase1_dir}'")
print(f"-> Phase 2 labels are ready in: '{labels_phase2_dir}'")
# ...existing code...

In [None]:
# --- 5. Create train/val image lists and YAML files for YOLOv8 ---
def resolve_image_path(img_name, images_dir):
    img_name = img_name.strip()
    if not img_name:
        return None
    # if already has extension, check it directly
    candidates = [img_name] if os.path.splitext(img_name)[1] else [img_name + ext for ext in ('.jpg', '.jpeg', '.png', '.png', '.JPG')]
    for c in candidates:
        p = os.path.join(images_dir, c)
        if os.path.exists(p):
            return os.path.abspath(p)
    # try any file that startswith basename (some datasets have suffixes)
    base = os.path.splitext(img_name)[0]
    for f in os.listdir(images_dir):
        if os.path.splitext(f)[0] == base:
            return os.path.abspath(os.path.join(images_dir, f))
    return None

def make_list_from_imagesets(images_dir, list_relpath, out_txt):
    list_path = os.path.join(dataset_root, list_relpath)
    if not os.path.exists(list_path):
        print(f"⚠️  Missing file: {list_path}")
        open(out_txt, 'w').close()
        return
    lines = []
    with open(list_path, 'r') as f:
        for l in f:
            p = resolve_image_path(l, images_dir)
            if p:
                lines.append(p)
    if not lines:
        print(f"❗ No images resolved from {list_path}. Check filenames and extensions in {images_dir}.")
    with open(out_txt, 'w') as f:
        f.write("\n".join(lines))
    print(f"✅ Wrote {out_txt} with {len(lines)} entries")

# image folder
images_dir = images_path  # ./dataset/.../JPEGImages

# create train/val txts with absolute image paths for Phase 1 / Phase 2 (they can reuse same lists)
train_list_txt = os.path.abspath(os.path.join(dataset_root, 'train_full.txt'))
val_list_txt   = os.path.abspath(os.path.join(dataset_root, 'val_full.txt'))

# original ImageSets/Main files (VOC-style)
make_list_from_imagesets(images_dir, 'ImageSets/Main/trainval.txt', train_list_txt)
make_list_from_imagesets(images_dir, 'ImageSets/Main/test.txt', val_list_txt)

# Helper to write yaml pointing to the list files
def write_phase_yaml(path_root, train_txt, val_txt, names, out_fname):
    cfg = {
        'path': os.path.abspath(path_root),
        'train': os.path.abspath(train_txt),
        'val': os.path.abspath(val_txt),
        'names': {i: n for i, n in enumerate(names)}
    }
    with open(out_fname, 'w') as f:
        yaml.dump(cfg, f, sort_keys=False)
    print(f"✅ Wrote {out_fname}")

# Phase1 names list in index order
phase1_names = [k for k,v in sorted(phase1_classes.items(), key=lambda kv: kv[1])]
# Phase2 names list
phase2_names = [k for k,v in sorted(phase2_classes.items(), key=lambda kv: kv[1])]

write_phase_yaml(dataset_root, train_list_txt, val_list_txt, phase1_names, 'phase1-data.yaml')
write_phase_yaml(dataset_root, train_list_txt, val_list_txt, phase2_names, 'phase2-data.yaml')
print("✅ YAML configuration files created: 'phase1-data.yaml', 'phase2-data.yaml'.")

In [None]:

# 1) Device detection (CUDA preferred, then MPS on Apple silicon, else CPU)
device = 'cuda' if torch.cuda.is_available() else ('mps' if getattr(torch.backends, 'mps', None) and torch.backends.mps.is_available() else 'cpu')
print("Using device:", device)

# 2) Quick label-coverage check (counts non-empty label files for train/val lists)
def check_label_coverage(list_txt, labels_dir):
    imgs = []
    with open(list_txt, 'r') as f:
        imgs = [l.strip() for l in f if l.strip()]
    non_empty = 0
    missing = 0
    for p in imgs:
        base = os.path.splitext(os.path.basename(p))[0]
        lf = os.path.join(labels_dir, base + '.txt')
        if not os.path.exists(lf):
            missing += 1
        else:
            if os.path.getsize(lf) > 0:
                non_empty += 1
    print(f"{os.path.basename(list_txt)}: {len(imgs)} images, {non_empty} non-empty labels, {missing} missing labels")

labels_dir = os.path.join(dataset_root, 'labels')  # this is what YOLO expects
check_label_coverage(train_list_txt, labels_dir)
check_label_coverage(val_list_txt, labels_dir)

In [None]:


# 3) Train with GPU/MPS, caching and mixed precision + early stopping (lower patience)
print("\n--- 🚀 STARTING PHASE 1 TRAINING ---")
# ensure labels for phase1 are available at dataset_root/labels (you already rename earlier)
os.rename(labels_phase1_dir, labels_dir) if os.path.exists(labels_phase1_dir) and not os.path.exists(labels_dir) else None

model_p1 = YOLO('yolov8n.pt')
model_p1.train(
    data='phase1-data.yaml',
    epochs=50,            # reduce if you want faster cycles
    imgsz=512,            # smaller images = faster
    batch=16,
    workers=8,            # increase if you have CPU cores and fast I/O
    device=device,        # use GPU/CUDA or MPS if available
    cache=True,           # cache images for faster epochs (ram/disk)
    half=(device != 'cpu'),# use fp16 on GPU/MPS where supported
    patience=10,          # early stopping after 10 epochs without val improvement
    name='yolov8_phase1_coarse',
    exist_ok=True
)
print("--- ✅ PHASE 1 TRAINING COMPLETE ---")
os.rename(labels_dir, labels_phase1_dir)

# Phase 2 (transfer learning) - same flags; ensure phase2 labels are present
print("\n--- 🚀 STARTING PHASE 2 TRAINING ---")
os.rename(labels_phase2_dir, labels_dir) if os.path.exists(labels_phase2_dir) and not os.path.exists(labels_dir) else None

model_p2 = YOLO('runs/detect/yolov8_phase1_coarse/weights/best.pt')
model_p2.train(
    data='phase2-data.yaml',
    epochs=50,
    imgsz=512,
    batch=16,
    workers=8,
    device=device,
    cache=True,
    half=(device != 'cpu'),
    patience=10,
    name='yolov8_phase2_fine',
    exist_ok=True
)
print("--- ✅ PHASE 2 TRAINING COMPLETE ---")
os.rename(labels_dir, labels_phase2_dir)


In [None]:
print("\n--- 🕵️‍♂️ RUNNING HIERARCHICAL PREDICTION ---")

# --- Load both trained models ---
model_coarse = YOLO('runs/detect/yolov8_phase1_coarse/weights/best.pt')
model_fine = YOLO('runs/detect/yolov8_phase2_fine/weights/best.pt')

# --- Helper function for IoU ---
def calculate_iou(box1, box2):
    x1_inter = max(box1[0], box2[0])
    y1_inter = max(box1[1], box2[1])
    x2_inter = min(box1[2], box2[2])
    y2_inter = min(box1[3], box2[3])
    inter_area = max(0, x2_inter - x1_inter) * max(0, y2_inter - y1_inter)
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union_area = box1_area + box2_area - inter_area
    return inter_area / union_area if union_area > 0 else 0

# --- Select a random image to test ---
test_image_name = random.choice(os.listdir(images_path))
test_image_path = os.path.join(images_path, test_image_name)
print(f"-> Testing on image: {test_image_name}")

img = cv2.imread(test_image_path)
results_coarse = model_coarse(img, verbose=False)[0]
results_fine = model_fine(img, verbose=False)[0]
IOU_THRESHOLD = 0.7

for fine_box in results_fine.boxes:
    fine_xyxy = fine_box.xyxy[0].cpu().numpy()
    fine_class_name = model_fine.names[int(fine_box.cls)]
    best_match_coarse_name = "Unknown"
    max_iou = 0
    
    for coarse_box in results_coarse.boxes:
        iou = calculate_iou(fine_xyxy, coarse_box.xyxy[0].cpu().numpy())
        if iou > max_iou:
            max_iou = iou
            best_match_coarse_name = model_coarse.names[int(coarse_box.cls)]
    
    label = f"{best_match_coarse_name}: {fine_class_name}" if max_iou > IOU_THRESHOLD else fine_class_name
        
    x1, y1, x2, y2 = map(int, fine_xyxy)
    cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
    cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

# --- Display the final result ---
plt.figure(figsize=(12, 12))
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
plt.title("Hierarchical Prediction Result")
plt.axis('off')
plt.show()