導入庫

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# -*- coding: utf-8 -*-
"""
把 HW2 的 <class,x,y,w,h> 像素标签，转换成 YOLO 归一化标签，写到 data_yolo 结构，并生成 dataset.yaml
- 输入：
    data/train/*.png + 同名 .txt（像素坐标）
    data/test/*.png
- 输出：
    data_yolo/
      images/{train,val,test}/
      labels/{train,val}/
      dataset.yaml
"""
import random, shutil
from pathlib import Path
from PIL import Image

# ==== 路径与类名 ====
DATA_ROOT = Path("/content/drive/MyDrive/Colab Notebooks/cv/hw2/taica-cvpdl-2025-hw-2 (Unzipped Files)/CVPDL_hw2/CVPDL_hw2")
TRAIN_IMG = DATA_ROOT / "train"
TEST_IMG  = DATA_ROOT / "test"

YOLO_ROOT = Path("data_yolo")
IMAGES_TRAIN = YOLO_ROOT / "images" / "train"
IMAGES_VAL   = YOLO_ROOT / "images" / "val"
IMAGES_TEST  = YOLO_ROOT / "images" / "test"
LABELS_TRAIN = YOLO_ROOT / "labels" / "train"
LABELS_VAL   = YOLO_ROOT / "labels" / "val"
DATA_YAML    = YOLO_ROOT / "dataset.yaml"

NAMES = ["car","hov","person","motorcycle"]
NAME2ID = {n:i for i,n in enumerate(NAMES)}
VAL_RATIO = 0.1
SEED = 42



轉換成yolo可讀格式資料

In [None]:
def ensure_dirs():
    for p in [IMAGES_TRAIN, IMAGES_VAL, IMAGES_TEST, LABELS_TRAIN, LABELS_VAL]:
        p.mkdir(parents=True, exist_ok=True)

def parse_per_image_txt(txt_path: Path):
    """
    读取每图同名 .txt：每行 <class> <x> <y> <w> <h> （像素）
    class 可为名字或 0~3 整数
    """
    items = []
    if not txt_path.exists():
        return items
    with open(txt_path, "r", encoding="utf-8") as f:
        for ln in f:
            ln = ln.strip().replace(",", " ").replace("\t"," ")
            if not ln:
                continue
            parts = [p for p in ln.split() if p]
            if len(parts) < 5:  # 容错：空标注也允许
                continue
            cls_token = parts[0]
            if cls_token.isdigit():
                cid = int(cls_token)
            else:
                cid = NAME2ID.get(cls_token, None)
                if cid is None:
                    # 非 4 类的行直接忽略
                    continue
            x, y, w, h = map(float, parts[1:5])
            items.append((cid, x, y, w, h))
    return items

def to_yolo_line(cid, bbox, W, H):
    # 像素 (x,y,w,h)-> 归一化 (cx,cy,w,h) in [0,1], 左上→中心
    x, y, w, h = bbox
    cx = (x + w/2.0) / W
    cy = (y + h/2.0) / H
    return f"{cid} {cx:.6f} {cy:.6f} {w/W:.6f} {h/H:.6f}"

def split_train_val(files, ratio=0.1, seed=42):
    files = sorted(files)
    rng = random.Random(seed)
    n = len(files)
    k = int(n * ratio)
    val = set(rng.sample(files, k)) if k > 0 else set()
    train = [f for f in files if f not in val]
    val = list(val)
    return train, val

# ==== 执行 ====
ensure_dirs()
img_exts = {".jpg",".jpeg",".png",".bmp",".tif",".tiff",".webp"}

train_imgs = [p for p in TRAIN_IMG.iterdir() if p.suffix.lower() in img_exts]
test_imgs  = [p for p in TEST_IMG.iterdir()  if p.suffix.lower() in img_exts]
assert train_imgs, f"找不到训练图片于 {TRAIN_IMG}"

train_split, val_split = split_train_val(train_imgs, VAL_RATIO, SEED)
print(f"Train: {len(train_split)}, Val: {len(val_split)}, Test: {len(test_imgs)}")

for split_files, img_out, lbl_out in [
    (train_split, IMAGES_TRAIN, LABELS_TRAIN),
    (val_split,   IMAGES_VAL,   LABELS_VAL),
]:
    for img_p in split_files:
        im = Image.open(img_p)
        W, H = im.size
        # 复制图片
        shutil.copy2(img_p, img_out / img_p.name)
        # 写 YOLO 标签
        yololines = []
        for (cid, x, y, w, h) in parse_per_image_txt(img_p.with_suffix(".txt")):
            yololines.append(to_yolo_line(cid, (x,y,w,h), W, H))
        with open(lbl_out / img_p.with_suffix(".txt").name, "w", encoding="utf-8") as f:
            f.write("\n".join(yololines))

# 测试集图片复制
for img_p in test_imgs:
    shutil.copy2(img_p, IMAGES_TEST / img_p.name)

# dataset.yaml
yaml_text = f"""\
path: {YOLO_ROOT.as_posix()}
train: images/train
val: images/val
test: images/test

nc: {len(NAMES)}
names: {NAMES}
"""
with open(DATA_YAML, "w", encoding="utf-8") as f:
    f.write(yaml_text)

print("DONE. Wrote", DATA_YAML)


In [None]:
#   1) 先运行原本的“转换数据格式”cell（产生 data_yolo/images & labels + dataset.yaml）
#   2) 再运行本 cell，会生成：
#        - data_yolo/train_upsampled.txt  ← 训练图像列表（含重复行）
#        - data_yolo/dataset_upsampled.yaml ← 仅将 train 指向上面的 txt，其他不变
#   3) 训练时把 data 参数改成 data_yolo/dataset_upsampled.yaml

from pathlib import Path
from collections import defaultdict, Counter
import math
import random
import json

YOLO_ROOT = Path("data_yolo")
IMAGES_TRAIN = YOLO_ROOT / "images" / "train"
LABELS_TRAIN = YOLO_ROOT / "labels" / "train"
DATA_YAML    = YOLO_ROOT / "dataset.yaml"
UP_TXT       = YOLO_ROOT / "train_upsampled.txt"
UP_YAML      = YOLO_ROOT / "dataset_upsampled.yaml"

# ===== 可调参数（按需改） =====
MAX_REP_FACTOR = 3        # 单张图片最大重复次数（含原始那1次）；防止过度上采样
POWER_ALPHA    = 1.0      # 稀有类放大力度；1.0 等价于按“max_count / cls_count”；0.5 更温和
SEED       = 42           # 随机种子（用于重复的顺序等）
PRINT_TOPN     = 10       # 打印前 N 张重复最多的图片，便于检查

random.seed(SEED)

# 读 dataset.yaml，拿到类名（与原作业一致）
import yaml
with open(DATA_YAML, "r", encoding="utf-8") as f:
    cfg = yaml.safe_load(f)
names = cfg.get("names", [])
nc = int(cfg.get("nc", len(names) or 4))
if not names or len(names) != nc:
    # 兜底：作业类名固定 4 类
    names = ["car", "hov", "person", "motorcycle"]
cls_names = {i: n for i, n in enumerate(names)}

# 扫描训练标签，统计每类框数量 & 每张图包含的类别
img_exts = {".jpg",".jpeg",".png",".bmp",".tif",".tiff",".webp"}
label_files = sorted([p for p in LABELS_TRAIN.iterdir() if p.suffix == ".txt"], key=lambda p: p.stem)

per_class_count = Counter()
per_image_classes = {}  # img_name -> set(cls_ids)
per_image_clsbox  = {}  # img_name -> Counter({cls_id: num_boxes_in_that_image})

for lbl in label_files:
    img_name = lbl.with_suffix("").name
    per_image_classes[img_name] = set()
    per_image_clsbox[img_name]  = Counter()
    try:
        lines = lbl.read_text(encoding="utf-8").strip().splitlines()
    except Exception:
        lines = []
    for ln in lines:
        ln = ln.strip()
        if not ln:
            continue
        parts = ln.split()
        try:
            cid = int(float(parts[0]))
        except Exception:
            continue
        if cid < 0 or cid >= nc:
            continue
        # YOLO label: cid cx cy w h
        per_class_count[cid] += 1
        per_image_classes[img_name].add(cid)
        per_image_clsbox[img_name][cid] += 1

# 若某些图片没有标签（空文件），也要记录，避免丢图
for img_p in sorted(IMAGES_TRAIN.iterdir(), key=lambda p: p.name):
    if img_p.suffix.lower() not in img_exts:
        continue
    img_name = img_p.with_suffix("").name
    per_image_classes.setdefault(img_name, set())
    per_image_clsbox.setdefault(img_name, Counter())

# 打印原始分布
print("== 原始每类框数量 ==")
for cid in range(nc):
    print(f"  {cid}: {cls_names.get(cid, str(cid)):<12}  {per_class_count.get(cid,0)}")
max_count = max(per_class_count.values()) if per_class_count else 1

# 计算每类需要的“目标倍率”
# 经典思路：希望每类的“可见次数”≈ max_count
# ratio_c = (max_count / cls_count_c) ** POWER_ALPHA
# 注意：对 0 计数的类保持 ratio=MAX_REP_FACTOR（只在极端冷门时避免无限放大）
desire_ratio = {}
for cid in range(nc):
    cnt = per_class_count.get(cid, 0)
    if cnt <= 0:
        ratio = MAX_REP_FACTOR  # 没见过的类，顶格上采样
    else:
        ratio = (max_count / cnt) ** POWER_ALPHA
    desire_ratio[cid] = max(1.0, min(float(ratio), float(MAX_REP_FACTOR)))

# 为每张图计算“重复次数”
# 直观做法：该图包含的所有类别的“需要倍率”的最大值
# （这样一张同时含稀有类和常见类的图片，会被按稀有类的倍率来重复）
rep_factor_img = {}
for img_name, cls_set in per_image_classes.items():
    if not cls_set:
        # 如果这张图没有任何标注（空框），保持 1 次
        rep_factor_img[img_name] = 1
        continue
    r = 1.0
    for c in cls_set:
        r = max(r, desire_ratio.get(c, 1.0))
    r_int = int(math.ceil(r))
    r_int = max(1, min(r_int, MAX_REP_FACTOR))
    rep_factor_img[img_name] = r_int

# 生成上采样训练列表
lines = []
for img_p in sorted(IMAGES_TRAIN.iterdir(), key=lambda p: p.name):
    if img_p.suffix.lower() not in img_exts:
        continue
    img_name = img_p.with_suffix("").name
    k = rep_factor_img.get(img_name, 1)
    # 重复写入 k 行（每行一个图像路径）
    for _ in range(k):
        lines.append(str(img_p.resolve()))

# 写出 txt
UP_TXT.write_text("\n".join(lines), encoding="utf-8")

# 写出新的 dataset_upsampled.yaml：把 train 指向上面的 txt，其它与原 dataset.yaml 相同
with open(DATA_YAML, "r", encoding="utf-8") as f:
    base_yaml = yaml.safe_load(f)
base_yaml = base_yaml or {}
base_yaml["train"] = str(UP_TXT.resolve())
with open(UP_YAML, "w", encoding="utf-8") as f:
    yaml.safe_dump(base_yaml, f, sort_keys=False, allow_unicode=True)

# 打印汇总信息
dup_counter = Counter(rep_factor_img.values())
print("\n== 上采样完成 ==")
print(f"写出: {UP_TXT}  (共 {len(lines)} 行; 去重前图像数={len(rep_factor_img)})")
print(f"写出: {UP_YAML}")
print("重复次数分布（图片张数）：", dict(sorted(dup_counter.items())))
print("\n前几张重复最多的图片（便于人工检查）：")
top_imgs = sorted(rep_factor_img.items(), key=lambda kv: (-kv[1], kv[0]))[:PRINT_TOPN]
for name, k in top_imgs:
    print(f"  {name}: x{k}")


訓練

In [None]:
!pip install ultralytics

In [None]:
from ultralytics import YOLO

# Load a model: 从 YAML 初始化（随机权重）
model = YOLO("yolo11l.yaml")

# Train the model with custom parameters
results = model.train(
    data="data_yolo/dataset_upsampled.yaml",
    epochs=200,          # 你可根据时间调
    imgsz=960,           # 显存吃紧可降到 640/768
    batch=1,             # 12GB 建议 8~16
    device=0,            # 或"cpu"
    pretrained=False,    # 禁用预训练
    optimizer="SGD",
    lr0=0.01,
    lrf=0.1,
    momentum=0.937,
    weight_decay=5e-4,
    warmup_epochs=3.0,
    cos_lr=True,
    project="runs/train",
    name="hw2",
)


驗證集

In [None]:
from ultralytics import YOLO
from pathlib import Path

ckpt = Path("/content/runs/train/hw210/weights/best.pt") # Point to the best.pt file
assert ckpt.exists(), f"未找到模型权重：{ckpt}"

model = YOLO(str(ckpt))  # load a custom model（刚训练出来的）
metrics = model.val(data="data_yolo/dataset_upsampled.yaml", imgsz=1920)
metrics  # mAP 等指标

生成submission

In [None]:
# -*- coding: utf-8 -*-
from ultralytics import YOLO
from pathlib import Path
import pandas as pd
import numpy as np
from PIL import Image

# ======= 可调参数 =======
MAIN_CONF = 0.01      # 想要的主阈值
PRED_CONF = 0.001     # 推理时使用的低阈值，用于抓全候选，后续再按 MAIN_CONF 过滤
IOU_THRES = 0.7
IMG_SIZE  = 960
DEVICE    = 0         # 或"cpu"

# 载入刚训练的权重
ckpt = Path("/content/runs/train/hw210/weights/best.pt")
assert ckpt.exists(), f"未找到模型权重：{ckpt}"
model = YOLO(str(ckpt))

# 收集 test 文件并确定 Image_ID（按文件名排序→1-based）
IMAGES_TEST = Path("data_yolo/images/test")
img_exts = {'.jpg','.jpeg','.png','.bmp','.tif','.tiff','.webp'}
test_files = sorted([p for p in IMAGES_TEST.iterdir() if p.suffix.lower() in img_exts], key=lambda p: p.name)
assert len(test_files) > 0, f"测试目录为空：{IMAGES_TEST}"
id_map = {p.name: i+1 for i, p in enumerate(test_files)}  # 1-based

# 用较低 conf 推理，确保能拿到“top1 兜底候选”
pred_list = list(model.predict(
    source=str(IMAGES_TEST),
    imgsz=IMG_SIZE,
    conf=PRED_CONF,    # 低阈值，保留尽可能多候选
    iou=IOU_THRES,
    device=DEVICE,
    stream=False,
    save=True, 
    verbose=False
))

# 建索引：文件名 -> 预测结果
name_to_pred = {Path(r.path).name: r for r in pred_list}

rows = []
for p in test_files:
    img_name = p.name
    image_id = id_map[img_name]

    # 读取原图尺寸，保证像素坐标正确
    try:
        W, H = Image.open(p).size
    except Exception:
        # 打不开图时，写空串以避免 NaN/null
        rows.append({"Image_ID": int(image_id), "PredictionString": ""})
        continue

    r = name_to_pred.get(img_name, None)

    # 若完全没返回结果对象，直接空串
    if r is None or getattr(r, "boxes", None) is None or len(r.boxes) == 0:
        rows.append({"Image_ID": int(image_id), "PredictionString": ""})
        continue

    xyxy_all = r.boxes.xyxy.detach().cpu().numpy()
    conf_all = r.boxes.conf.detach().cpu().numpy()
    cls_all  = r.boxes.cls.detach().cpu().numpy().astype(int)

    # 先按 MAIN_CONF 过滤
    keep = conf_all >= MAIN_CONF
    xyxy = xyxy_all[keep]
    conf = conf_all[keep]
    cls  = cls_all[keep]

    # 如果没有任何框通过 MAIN_CONF，则兜底：取该图中置信度最高的那个框（即使低于 MAIN_CONF）
    if xyxy.shape[0] == 0 and len(conf_all) > 0:
        top_idx = int(np.argmax(conf_all))
        xyxy = xyxy_all[top_idx:top_idx+1]
        conf = conf_all[top_idx:top_idx+1]
        cls  = cls_all[top_idx:top_idx+1]

    parts = []
    for i in range(len(xyxy)):
        x1, y1, x2, y2 = map(float, xyxy[i])
        s = float(conf[i])
        c = int(cls[i])

        # 数值健壮性检查
        if not (np.isfinite(x1) and np.isfinite(y1) and np.isfinite(x2) and np.isfinite(y2) and np.isfinite(s)):
            continue

        w = max(0.0, x2 - x1)
        h = max(0.0, y2 - y1)

        # 输出格式：<conf> <bb_left> <bb_top> <bb_width> <bb_height> <class>
        parts.extend([
            f"{s:.6f}",
            f"{x1:.2f}", f"{y1:.2f}",
            f"{w:.2f}",  f"{h:.2f}",
            str(c)
        ])

    pred_str = " ".join(parts) if parts else ""   # parts 可能因数值不合法被全过滤
    rows.append({"Image_ID": int(image_id), "PredictionString": pred_str})

# 生成 DataFrame，强制无 NaN/null
sub = pd.DataFrame(rows, columns=["Image_ID", "PredictionString"]).sort_values("Image_ID").reset_index(drop=True)
sub["Image_ID"] = sub["Image_ID"].astype(int)
sub["PredictionString"] = sub["PredictionString"].astype(str).fillna("")

# 保险：若意外缺失某些 ID，补空行
expected = set(range(1, len(test_files)+1))
actual = set(sub["Image_ID"].tolist())
missing = expected - actual
if missing:
    sub = pd.concat([
        sub,
        pd.DataFrame({"Image_ID": sorted(missing), "PredictionString": [""] * len(missing)})
    ], ignore_index=True).sort_values("Image_ID").reset_index(drop=True)

out_path = "submission29.csv"
sub.to_csv(out_path, index=False)
print(f"写出 {out_path} 完成！行数={len(sub)}，是否存在空值：", sub.isna().any().to_dict())
sub.head()

結果可視化

In [None]:
from pathlib import Path
from PIL import Image
from IPython.display import display

# Example: Ultralytics default save to runs/detect/predict* or runs/predict*/images
# Adjust this path based on your Cell #6 save results
# Corrected visualization directory path
vis_dir = Path("/content/runs") / "detect"
img_dir = vis_dir / "predict6" # Point to the test prediction directory
assert img_dir.exists(), f"Directory does not exist: {img_dir}"

exts = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"}
files = sorted([p for p in img_dir.iterdir() if p.suffix.lower() in exts], key=lambda p: p.stem)[:5]
assert files, f"No images found in {img_dir}"

print("Using directory:", img_dir)
for p in files:
    print(p.name)
    display(Image.open(p))