# Milestone 2

In [1]:
from pathlib import Path
import os, random, math, sys
from collections import defaultdict
from typing import List, Tuple
import numpy as np
from PIL import Image

SRC = r"C:\Users\edwar\OneDrive\Documents\NEU\IE 7615\Project\Milestone2\celebrity"  
DST = r"C:\Users\edwar\OneDrive\Documents\NEU\IE 7615\Project\Milestone2\m2_yolo_local" 

IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff", ".jfif"}
IMG_SIZE = 640
GRID = (2, 2)               
K_MIN, K_MAX = 2, 4          
TARGET_OCCURRENCES_PER_ID = 30 
VAL_RATIO = 0.20              # 80/20 split
SEED = 42

random.seed(SEED); np.random.seed(SEED)

In [2]:
DSTP = Path(DST)
for p in [
    DSTP / "multi/images/train",
    DSTP / "multi/images/val",
    DSTP / "multi/labels/train",
    DSTP / "multi/labels/val",
]:
    p.mkdir(parents=True, exist_ok=True)


def count_images_recursive(dirpath: Path) -> int:
    return sum(1 for q in dirpath.rglob("*") if q.is_file() and q.suffix.lower() in IMG_EXTS)

def summarize_counts(src_root: Path) -> List[Tuple[str, int]]:
    id_dirs = sorted([d for d in src_root.iterdir() if d.is_dir()])
    rows = [(d.name, count_images_recursive(d)) for d in id_dirs]
    return rows

def save_counts_csv(rows, out_csv: Path):
    try:
        import pandas as pd
        df = pd.DataFrame(rows, columns=["id", "image_count"]).sort_values(["image_count","id"])
        df.to_csv(out_csv, index=False)
        print(f"[INFO] Saved counts to: {out_csv}")
    except Exception as e:
        print("[WARN] pandas not available; skipping CSV save. Install via: pip install pandas", e)

SRC_P = Path(SRC)
if not SRC_P.exists():
    sys.exit(f"[ERROR] SRC does not exist: {SRC}")

counts = summarize_counts(SRC_P)
total_ids = len(counts)
total_imgs = sum(n for _, n in counts)
print(f"[INFO] Found {total_ids} ID folders; total images (recursive): {total_imgs}")
save_counts_csv(counts, Path(DST).with_name("id_image_counts_local.csv"))

[INFO] Found 45 ID folders; total images (recursive): 1526
[INFO] Saved counts to: C:\Users\edwar\OneDrive\Documents\NEU\IE 7615\Project\Milestone2\id_image_counts_local.csv


In [3]:
id_dirs = sorted([d for d in SRC_P.iterdir() if d.is_dir()])
img_pool = {}
for d in id_dirs:
    imgs = [p for p in d.rglob("*") if p.is_file() and p.suffix.lower() in IMG_EXTS]
    if imgs:
        img_pool[d.name] = sorted(imgs)

if not img_pool:
    sys.exit("[ERROR] No images found under SRC (supported exts). Check path or extensions.")

class_names = sorted(img_pool.keys())
(DSTP / "class_names.txt").write_text("\n".join(class_names), encoding="utf-8")
id2idx = {cid: i for i, cid in enumerate(class_names)}
print(f"[INFO] Usable IDs (non-empty): {len(class_names)}")

[INFO] Usable IDs (non-empty): 45


In [4]:
try:
    import cv2, cv2.data as cvdata
    HAAR = cv2.CascadeClassifier(cvdata.haarcascades + "haarcascade_frontalface_default.xml")
except Exception as e:
    sys.exit("[ERROR] OpenCV not installed. Install: pip install opencv-python-headless")

def detect_xywh(pil_image: Image.Image) -> Tuple[int,int,int,int]:
    im = np.array(pil_image.convert("RGB"))[:, :, ::-1]   
    gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
    faces = HAAR.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(40,40))
    if len(faces) == 0:
        W, H = pil_image.size
        s = int(0.6 * min(W, H))
        return (W - s)//2, (H - s)//2, s, s
    x, y, w, h = max(faces, key=lambda b: b[2]*b[3])
    return int(x), int(y), int(w), int(h)

In [5]:
def cells(grid, img_size):
    r, c = grid
    cw, ch = img_size // c, img_size // r
    return [(x*cw, y*ch, cw, ch) for y in range(r) for x in range(c)]

def paste_one(canvas: Image.Image, cid: str, img_path: Path, cell_xywh):
    x0, y0, cw, ch = cell_xywh
    im = Image.open(img_path).convert("RGB")

    x, y, w, h = detect_xywh(im)
    padw, padh = int(0.06 * w), int(0.07 * h)
    x = max(0, x - padw); y = max(0, y - padh)
    w = min(im.width - x, w + 2 * padw)
    h = min(im.height - y, h + 2 * padh)
    face = im.crop((x, y, x + w, y + h))

    # fit cell with margin + jitter
    margin = 0.08
    tw, th = int(cw * (1 - 2 * margin)), int(ch * (1 - 2 * margin))
    face = face.resize((tw, th), Image.BILINEAR)
    jx, jy = int((cw - tw) * 0.2), int((ch - th) * 0.2)
    px = x0 + (cw - tw)//2 + random.randint(-jx, jx)
    py = y0 + (ch - th)//2 + random.randint(-jy, jy)
    px = max(x0, min(px, x0 + cw - tw))
    py = max(y0, min(py, y0 + ch - th))
    canvas.paste(face, (px, py))

    # YOLO box normalized
    cx = (px + tw/2) / IMG_SIZE
    cy = (py + th/2) / IMG_SIZE
    nw = tw / IMG_SIZE
    nh = th / IMG_SIZE
    return id2idx[cid], (cx, cy, nw, nh)

In [6]:
OUT = DSTP / "multi"
for split in ["train", "val"]:
    (OUT / f"images/{split}").mkdir(parents=True, exist_ok=True)
    (OUT / f"labels/{split}").mkdir(parents=True, exist_ok=True)

use_counts = defaultdict(int)
cell_list = cells(GRID, IMG_SIZE)
val_every = int(1 / VAL_RATIO)
made = 0

print("[INFO] Building concatenated images (this may take a bit)...")
while not all(use_counts[c] >= TARGET_OCCURRENCES_PER_ID for c in class_names):
    k = random.randint(K_MIN, K_MAX)
    # pick k least-used IDs
    cids = sorted(class_names, key=lambda c: (use_counts[c], random.random()))[:k]

    canvas = Image.new("RGB", (IMG_SIZE, IMG_SIZE), (255, 255, 255))
    labels = []
    cells_shuf = cell_list[:]; random.shuffle(cells_shuf)

    for cid, cell in zip(cids, cells_shuf):
        path = random.choice(img_pool[cid])
        cls, box = paste_one(canvas, cid, path, cell)
        labels.append((cls, *box))
        use_counts[cid] += 1

    split = "val" if (made % val_every == 0) else "train"
    stem = f"{split}_{made:06d}"
    canvas.save(OUT / f"images/{split}/{stem}.jpg", quality=95)
    with open(OUT / f"labels/{split}/{stem}.txt", "w", encoding="utf-8") as f:
        for r in labels:
            f.write(f"{r[0]} {r[1]:.6f} {r[2]:.6f} {r[3]:.6f} {r[4]:.6f}\n")
    made += 1

print(f"[INFO] Made composites: {made} | IDs: {len(class_names)}")

[INFO] Building concatenated images (this may take a bit)...
[INFO] Made composites: 447 | IDs: 45


In [7]:
yaml_lines = [
    f"path: {OUT.as_posix()}",
    "train: images/train",
    "val: images/val",
    f"nc: {len(class_names)}",
    "names:",
    *[f"  - {n}" for n in class_names],
]
(DSTP / "data.yaml").write_text("\n".join(yaml_lines), encoding="utf-8")
print(f"[INFO] Wrote {DSTP / 'data.yaml'}")

[INFO] Wrote C:\Users\edwar\OneDrive\Documents\NEU\IE 7615\Project\Milestone2\m2_yolo_local\data.yaml


In [8]:
# ---------- 8) TRAIN YOLOv8 ----------
try:
    from ultralytics import YOLO
    import torch
except Exception as e:
    sys.exit("[ERROR] Missing ultralytics/torch. Install packages first (e.g., pip install ultralytics torch torchvision).")

device = 0 if torch.cuda.is_available() else "cpu"
print(f"[INFO] Training device: {device}")
model = YOLO("yolov8n.pt")   # start small; later try 'yolov8s.pt'

# Adjust batch/workers to your machine
results = model.train(
    data=str(DSTP / "data.yaml"),
    imgsz=640, epochs=150, batch=8, workers=2,
    device=device, patience=20
)
print("[INFO] Training complete.")

[INFO] Training device: 0
New https://pypi.org/project/ultralytics/8.3.205 available  Update with 'pip install -U ultralytics'
Ultralytics 8.3.204  Python-3.12.3 torch-2.5.1+cu121 CUDA:0 (NVIDIA GeForce RTX 4060, 8188MiB)
[34m[1mengine\trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=8, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, compile=False, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=C:\Users\edwar\OneDrive\Documents\NEU\IE 7615\Project\Milestone2\m2_yolo_local\data.yaml, degrees=0.0, deterministic=True, device=0, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=150, erasing=0.4, exist_ok=False, fliplr=0.5, flipud=0.0, format=torchscript, fraction=1.0, freeze=None, half=False, hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, imgsz=640, int8=False, iou=0.7, keras=False, kobj=1.0, line_width=None, lr0=0.01, lrf=0.01, mask_ratio=4, max_det=300, mixup=0.0, mode=train,

In [9]:
# ---------- 9) EVALUATE & SAMPLE PREDICTIONS ----------
metrics = model.val()
print(f"[INFO] mAP50-95: {metrics.box.map:.4f} | mAP50: {metrics.box.map50:.4f} | mAP75: {metrics.box.map75:.4f}")

# Save predictions on a few val images into runs/
pred = model.predict(source=str(OUT / "images/val"), conf=0.25, save=True, max_det=10)
print("[INFO] Saved sample predictions into the 'runs' folder.")

Ultralytics 8.3.204  Python-3.12.3 torch-2.5.1+cu121 CUDA:0 (NVIDIA GeForce RTX 4060, 8188MiB)
Model summary (fused): 72 layers, 3,014,423 parameters, 0 gradients, 8.1 GFLOPs
[34m[1mval: [0mFast image access  (ping: 0.00.0 ms, read: 715.5232.5 MB/s, size: 65.2 KB)
[K[34m[1mval: [0mScanning C:\Users\edwar\OneDrive\Documents\NEU\IE 7615\Project\Milestone2\m2_yolo_local\multi\labels\val.cache... 90 images, 0 backgrounds, 0 corrupt: 100% ━━━━━━━━━━━━ 90/90 90.0Kit/s 0.0s
[K                 Class     Images  Instances      Box(P          R      mAP50  mAP50-95): 100% ━━━━━━━━━━━━ 12/12 4.5it/s 2.7s<0.1s
                   all         90        274      0.931      0.822      0.923      0.922
          images_7904_          6          6          1      0.796      0.972      0.972
        images__10046_          7          7      0.584      0.429      0.643      0.643
        images__10173_          2          2      0.928          1      0.995      0.995
         images__1158_        

In [12]:
# ======================================================================
# 10) JUPYTER UPLOAD BUTTON → INSTANT ID & LOCATION (no manual paths)
# ======================================================================
from pathlib import Path
from ultralytics import YOLO
import torch
import io
import numpy as np
from PIL import Image

try:
    device
except NameError:
    device = 0 if torch.cuda.is_available() else "cpu"

# ---- helper: find latest best.pt if needed
def _latest(path_glob: str):
    import glob, os
    files = glob.glob(path_glob, recursive=True)
    if not files:
        return None
    files.sort(key=lambda p: os.path.getmtime(p), reverse=True)
    return files[0]

# ---- resolve trained weights for this run (fallback: most recent run)
try:
    BEST_WEIGHTS = Path(results.save_dir) / "weights" / "best.pt"  # from this training
except Exception:
    BEST_WEIGHTS = None

if not BEST_WEIGHTS or not Path(BEST_WEIGHTS).is_file():
    lb = _latest(str(Path.cwd() / "runs" / "detect" / "*" / "weights" / "best.pt"))
    if lb:
        BEST_WEIGHTS = Path(lb)

if not BEST_WEIGHTS or not Path(BEST_WEIGHTS).is_file():
    print("[WARN] Could not auto-find best.pt. Upload UI will use current 'model' weights.")
    DETECT_MODEL = model
else:
    print(f"[INFO] Using trained weights for upload UI: {BEST_WEIGHTS}")
    DETECT_MODEL = YOLO(str(BEST_WEIGHTS))

# ---- notebook UI deps (skip gracefully if not in Jupyter)
try:
    import ipywidgets as widgets
    import matplotlib.pyplot as plt
    from IPython.display import display, HTML
    _IN_NOTEBOOK = True
except Exception:
    _IN_NOTEBOOK = False
    print("[INFO] ipywidgets/matplotlib not available; skip upload UI (script mode).")

# ---- support both ipywidgets 8.x (tuple of UploadedFile) and 7.x (dict) shapes
def _get_upload_bytes(value):
    """
    Return (filename, bytes) from FileUpload.value.
    Supports ipywidgets 8.x (tuple/list of UploadedFile) and 7.x (dict).
    """
    # ipywidgets 8.x: tuple/list of UploadedFile with .name/.content
    if isinstance(value, (tuple, list)) and len(value) > 0:
        uf = value[0]
        if hasattr(uf, "content"):
            return getattr(uf, "name", "uploaded_image"), uf.content
        if isinstance(uf, dict) and "content" in uf:  # rare fallback
            return uf.get("name", "uploaded_image"), uf["content"]
    # ipywidgets 7.x: dict {filename: {content: bytes, ...}}
    if isinstance(value, dict) and value:
        key = next(iter(value))
        item = value[key]
        if isinstance(item, dict) and "content" in item:
            return key, item["content"]
    return None, None

# ---- inference helper: bytes → annotated image + meta + rows
def _infer_bytes(image_bytes: bytes, model_obj, conf=0.30, iou=0.45, imgsz=640):
    pil = Image.open(io.BytesIO(image_bytes)).convert("RGB")
    np_img = np.array(pil)
    H, W = np_img.shape[:2]

    res = model_obj.predict(source=np_img, conf=conf, iou=iou, imgsz=imgsz,
                            device=device, verbose=False)[0]
    annotated = res.plot()[:, :, ::-1]  # BGR→RGB

    rows = []
    if res.boxes is not None and len(res.boxes) > 0:
        for b in res.boxes:
            cls_id = int(b.cls.item())
            name   = DETECT_MODEL.names.get(cls_id, str(cls_id))
            conf_f = float(b.conf.item())
            x1, y1, x2, y2 = [float(v) for v in b.xyxy.squeeze().tolist()]
            cx = ((x1 + x2) / 2.0) / W
            cy = ((y1 + y2) / 2.0) / H
            ww = (x2 - x1) / W
            hh = (y2 - y1) / H
            rows.append({
                "celebrity_id": name, "name": cls_id, "conf": conf_f,
                "x1": x1, "y1": y1, "x2": x2, "y2": y2,
                "cx": cx, "cy": cy, "w": ww, "h": hh
            })
    rows.sort(key=lambda d: d["conf"], reverse=True)
    for i, r in enumerate(rows, start=1):
        r["row_id"] = i
    meta = {"W": W, "H": H, "count": len(rows)}
    return annotated, meta, rows

# ---- simple HTML table (no pandas)
def _rows_to_html(rows, mode="pixels"):
    if not rows:
        return "<p><b>No detections above thresholds.</b></p>"
    if mode == "pixels":
        headers = ["ID","celebrity_id","name","conf","x1","y1","x2","y2"]
        def fmt(r): return [
            r["row_id"], r["celebrity_id"], r["name"], f"{r['conf']:.3f}",
            f"{r['x1']:.1f}", f"{r['y1']:.1f}", f"{r['x2']:.1f}", f"{r['y2']:.1f}"
        ]
    else:
        headers = ["ID","celebrity_id","name","conf","cx","cy","w","h"]
        def fmt(r): return [
            r["row_id"], r["celebrity_id"], r["name"], f"{r['conf']:.3f}",
            f"{r['cx']:.4f}", f"{r['cy']:.4f}", f"{r['w']:.4f}", f"{r['h']:.4f}"
        ]
    th = "".join(f"<th style='padding:6px 10px;text-align:left;border-bottom:1px solid #ddd'>{h}</th>" for h in headers)
    trs = []
    for r in rows:
        tds = "".join(f"<td style='padding:4px 10px;border-bottom:1px solid #f0f0f0'>{v}</td>" for v in fmt(r))
        trs.append(f"<tr>{tds}</tr>")
    return (
        "<table style='border-collapse:collapse;font-family:ui-monospace,Consolas,monospace;font-size:13px'>"
        f"<thead><tr>{th}</tr></thead><tbody>{''.join(trs)}</tbody></table>"
    )

# ---- Jupyter UI
if _IN_NOTEBOOK:
    uploader    = widgets.FileUpload(accept="image/*", multiple=False, description="Upload image")
    conf_slider = widgets.FloatSlider(value=0.30, min=0.05, max=0.90, step=0.05, description="conf")
    iou_slider  = widgets.FloatSlider(value=0.45, min=0.10, max=0.90, step=0.05, description="iou")
    imgsz_dd    = widgets.Dropdown(options=[512, 640, 768, 960], value=640, description="imgsz")
    fmt_dd      = widgets.Dropdown(options=[("Pixels (x1,y1,x2,y2)","pixels"),
                                            ("YOLO normalized (cx,cy,w,h)","yolo")],
                                   value="pixels", description="format")
    run_btn     = widgets.Button(description="Run detection", button_style="primary", icon="search")

    out_meta = widgets.Output()
    out_img  = widgets.Output()
    out_tbl  = widgets.Output()

    def _do_run(file_value):
        import traceback
        out_meta.clear_output(True); out_img.clear_output(True); out_tbl.clear_output(True)

        fname, img_bytes = _get_upload_bytes(file_value)
        if not img_bytes:
            with out_tbl:
                print("Please upload an image first (no bytes found).")
                l = (len(file_value) if hasattr(file_value, "__len__") else "n/a")
                print(f"Debug: type={type(file_value)}, len={l}")
            return
        try:
            ann, meta, rows = _infer_bytes(
                img_bytes, DETECT_MODEL,
                conf=conf_slider.value, iou=iou_slider.value, imgsz=imgsz_dd.value
            )
            with out_meta:
                print(f"File: {fname}")
                print(f"Image size: {meta['W']} x {meta['H']}  |  Detections: {meta['count']}")
            with out_img:
                plt.figure(figsize=(8, 8))
                plt.imshow(ann); plt.axis("off"); plt.title("Detections (YOLOv8)")
                plt.show()
            with out_tbl:
                display(HTML(_rows_to_html(rows, mode=fmt_dd.value)))
        except Exception as e:
            with out_tbl:
                print("Error during detection:", e)
                traceback.print_exc()

    def _on_upload(change):
        if change.get("name") == "value":
            _do_run(uploader.value)

    def _on_click(_):
        _do_run(uploader.value)

    uploader.observe(_on_upload, names="value")
    run_btn.on_click(_on_click)

    ui = widgets.VBox([
        widgets.HBox([uploader]),
        widgets.HBox([conf_slider, iou_slider, imgsz_dd, fmt_dd, run_btn]),
        out_meta, out_img, out_tbl
    ])
    display(ui)
else:
    print("[INFO] Not in a notebook; upload UI skipped.")

[INFO] Using trained weights for upload UI: C:\Users\edwar\OneDrive\Documents\NEU\IE 7615\Project\Milestone2\code\runs\detect\train4\weights\best.pt


VBox(children=(HBox(children=(FileUpload(value=(), accept='image/*', description='Upload image'),)), HBox(chil…