In [None]:
# === check gpu ===
!nvidia-smi -L || true
!pip -q install torch torchvision timm gdown

In [None]:
# ==== basic libraries ====
import os, zipfile, shutil, json, math, random, time, textwrap
from pathlib import Path
from datetime import datetime

# ==== Deep Learning ====
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, WeightedRandomSampler
from torchvision import datasets, transforms, models
import timm

# ==== Data Science ====
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report, precision_recall_curve, average_precision_score

GPU 0: Tesla T4 (UUID: GPU-defcd148-6cbc-abdc-1a1b-39ecdb32b80b)
Mounted at /content/drive


In [None]:
# ==== mount google drive ====
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
# ==== file path setting ====

# type A: file path in drive (/content/drive/MyDrive/...)
ZIP_PATH_IN_DRIVE = "/content/drive/MyDrive/2025_project_dataset_cls/2025_project_data_CNN_1204.zip"  # ←有就填，沒有就留空字串
# type B: file url (support drive share link or general http(s) direct link)
ZIP_FILE_URL = ""       # if you have a file url, fill it in


# save results to drive (can be modified)
SAVE_TO_DRIVE_DIR = "/content/drive/MyDrive/Colab_Results/leaf_stage1_cls"


# training parameters (can be modified)
CFG = dict (
    seed=42,
    img_size=224,
    batch_size=64,           # colab T4/RAM 通常可用 64；OOM 就降 32/16
    num_workers=2,
    epochs=25,
    lr=3e-4,
    weight_decay=1e-4,
    model_name="mobilenetv3_large_100",  # timm 名稱（或用 torchvision 官方 MobileNetV3-Large）
    mixup_p=0.0,            # 分類任務可選擇關閉
    label_smoothing=0.05,
    early_stop_patience=7
)

# ==== random seed setting ====
random.seed(CFG["seed"]); np.random.seed(CFG["seed"]); torch.manual_seed(CFG["seed"])

# ==== device setting ====
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [None]:
# ==== download zip file and unzip ====
DATA_ROOT = Path("/content/dataset_zip")
DATA_ROOT.mkdir(parents=True, exist_ok=True)

zip_local = DATA_ROOT / "data.zip"

if ZIP_PATH_IN_DRIVE:
    # copy from drive
    src = Path(ZIP_PATH_IN_DRIVE)
    assert src.exists(), f"找不到：{src}"
    shutil.copy(str(src), str(zip_local))
    print(f"[OK] 由 Drive 複製 zip -> {zip_local}")
elif ZIP_FILE_URL:
    import gdown, re
    url = ZIP_FILE_URL.strip()
    # support various google drive share links
    m = re.search(r"/d/([-\w]{20,})", url)
    if m:
        file_id = m.group(1)
        url = f"https://drive.google.com/uc?id={file_id}"
    gdown.download(url, str(zip_local), quiet=False)
    print(f"[OK] 透過連結下載 zip -> {zip_local}")
else:
    raise ValueError("請設定 ZIP_PATH_IN_DRIVE 或 ZIP_FILE_URL 其中之一。")

assert zip_local.exists(), "zip 檔案不存在"

UNZIP_DIR = DATA_ROOT / "unzipped"
if UNZIP_DIR.exists():
    shutil.rmtree(UNZIP_DIR)
UNZIP_DIR.mkdir(parents=True, exist_ok=True)

with zipfile.ZipFile(zip_local, 'r') as zf:
    zf.extractall(UNZIP_DIR)
print("[OK] 解壓完成：", UNZIP_DIR)


[OK] 由 Drive 複製 zip -> /content/dataset_zip/data.zip
[OK] 解壓完成： /content/dataset_zip/unzipped


In [None]:
# ==== find train/ val/ folder ====
def find_train_val(root: Path):
    # some zip files have an outer shell folder; here try to find train/ val/
    candidates = [root] + [p for p in root.iterdir() if p.is_dir()]
    for base in candidates:
        t = base / "train"
        v = base / "val"
        if t.exists() and v.exists():
            return t, v
    raise RuntimeError("在解壓路徑中找不到 train/ 與 val/ 目錄")

train_dir, val_dir = find_train_val(UNZIP_DIR)
print("train_dir:", train_dir)
print("val_dir  :", val_dir)

# check class folders
print("\n[train class folders]")
print([p.name for p in train_dir.iterdir() if p.is_dir()])
print("[val class folders]")
print([p.name for p in val_dir.iterdir() if p.is_dir()])


train_dir: /content/dataset_zip/unzipped/2025_project_data_CNN_1204/train
val_dir  : /content/dataset_zip/unzipped/2025_project_data_CNN_1204/val

[train 類別資料夾]
['whole_plant', 'potato', 'tomato', 'pepper_bell', 'others']
[val 類別資料夾]
['whole_plant', 'potato', 'tomato', 'pepper_bell', 'others']


In [None]:
# Transforms and Dataset/DataLoader
IMG_SIZE = CFG["img_size"]

train_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.15, hue=0.02),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

val_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

train_ds = datasets.ImageFolder(str(train_dir), transform=train_tfms)
val_ds   = datasets.ImageFolder(str(val_dir),   transform=val_tfms)
class_names = train_ds.classes
num_classes = len(class_names)
class_names, num_classes


(['others', 'pepper_bell', 'potato', 'tomato', 'whole_plant'], 5)

In [None]:
# ==== class imbalance handling (WeightedRandomSampler) ====
from collections import Counter

train_targets = [y for _, y in train_ds.imgs]
cnt = Counter(train_targets)
print("[train 各類別張數]：", {class_names[k]: v for k,v in cnt.items()})

# according to 1/frequency as weight, to alleviate imbalance
sample_weights = np.array([1.0 / cnt[y] for y in train_targets], dtype=np.float32)
sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)

train_loader = DataLoader(
    train_ds, batch_size=CFG["batch_size"], sampler=sampler,
    num_workers=CFG["num_workers"], pin_memory=True
)
val_loader = DataLoader(
    val_ds, batch_size=CFG["batch_size"], shuffle=False,
    num_workers=CFG["num_workers"], pin_memory=True
)


[train 各類別張數]： {'others': 1492, 'pepper_bell': 1980, 'potato': 1721, 'tomato': 2000, 'whole_plant': 1594}


In [None]:
# create model 
# use timm version (faster, pretrained weights are complete)
model = timm.create_model(CFG["model_name"], pretrained=True, num_classes=num_classes)
model.to(device)

# loss and optimizer
criterion = nn.CrossEntropyLoss(label_smoothing=CFG["label_smoothing"])
optimizer = optim.AdamW(model.parameters(), lr=CFG["lr"], weight_decay=CFG["weight_decay"])
scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())


  scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())


In [None]:
# training/validation loop (with early stopping)
def evaluate(model, loader):
    model.eval()
    loss_sum, n = 0.0, 0
    all_probs, all_targets = [], []
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
                logits = model(x)
                loss = criterion(logits, y)
            loss_sum += loss.item() * x.size(0)
            n += x.size(0)
            probs = logits.softmax(1).detach().cpu().numpy()
            all_probs.append(probs)
            all_targets.append(y.detach().cpu().numpy())
    avg_loss = loss_sum / n
    all_probs = np.concatenate(all_probs)
    all_targets = np.concatenate(all_targets)
    preds = all_probs.argmax(1)
    acc = (preds == all_targets).mean()
    return avg_loss, acc, all_probs, all_targets

def train_epochs(model, train_loader, val_loader, epochs, save_dir):
    best_acc, best_path = -1, None
    history = {"epoch": [], "train_loss": [], "val_loss": [], "val_acc": []}
    patience = CFG["early_stop_patience"]
    wait = 0

    for epoch in range(1, epochs+1):
        model.train()
        train_loss_sum, n = 0.0, 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad(set_to_none=True)
            with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
                logits = model(x)
                loss = criterion(logits, y)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            train_loss_sum += loss.item() * x.size(0)
            n += x.size(0)

        train_loss = train_loss_sum / n
        val_loss, val_acc, _, _ = evaluate(model, val_loader)

        history["epoch"].append(epoch)
        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(float(val_acc))
        print(f"Epoch {epoch:02d}/{epochs} | train_loss {train_loss:.4f} | val_loss {val_loss:.4f} | val_acc {val_acc:.4f}")

        # save best weights + early stopping
        if val_acc > best_acc:
            best_acc = val_acc
            wait = 0
            best_path = save_dir / "best_mobilenetv3_large.pth"
            torch.save(model.state_dict(), best_path)
        else:
            wait += 1
            if wait >= patience:
                print(f"[Early Stop] patience={patience}")
                break

    # save training curves
    plt.figure(figsize=(10,4))
    plt.subplot(1,2,1); plt.plot(history["epoch"], history["train_loss"], label="train_loss"); plt.plot(history["epoch"], history["val_loss"], label="val_loss"); plt.legend(); plt.title("Loss")
    plt.subplot(1,2,2); plt.plot(history["epoch"], history["val_acc"], label="val_acc"); plt.legend(); plt.title("Val Acc")
    plt.tight_layout()
    plt.savefig(save_dir / "results_curves.png", dpi=160); plt.close()

    with open(save_dir / "history.json", "w") as f:
        json.dump(history, f, indent=2)

    return best_path, best_acc

# create output folder
RUN_DIR = Path("/content/runs/cls") / datetime.now().strftime("%Y%m%d_%H%M%S")
RUN_DIR.mkdir(parents=True, exist_ok=True)

best_path, best_acc = train_epochs(model, train_loader, val_loader, CFG["epochs"], RUN_DIR)
print("Best acc:", best_acc, " | best path:", best_path)


  with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
  with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):


Epoch 01/25 | train_loss 0.5850 | val_loss 0.4997 | val_acc 0.9437
Epoch 02/25 | train_loss 0.3122 | val_loss 0.3724 | val_acc 0.9617
Epoch 03/25 | train_loss 0.2776 | val_loss 0.3518 | val_acc 0.9686
Epoch 04/25 | train_loss 0.2586 | val_loss 0.3347 | val_acc 0.9686
Epoch 05/25 | train_loss 0.2502 | val_loss 0.3088 | val_acc 0.9772
Epoch 06/25 | train_loss 0.2452 | val_loss 0.3223 | val_acc 0.9723
Epoch 07/25 | train_loss 0.2412 | val_loss 0.3112 | val_acc 0.9715
Epoch 08/25 | train_loss 0.2375 | val_loss 0.3022 | val_acc 0.9739
Epoch 09/25 | train_loss 0.2352 | val_loss 0.3051 | val_acc 0.9739
Epoch 10/25 | train_loss 0.2338 | val_loss 0.3006 | val_acc 0.9739
Epoch 11/25 | train_loss 0.2315 | val_loss 0.2912 | val_acc 0.9755
Epoch 12/25 | train_loss 0.2317 | val_loss 0.2925 | val_acc 0.9764
[Early Stop] patience=7
Best acc: 0.9771708112515287  | best path: /content/runs/cls/20251204_144247/best_mobilenetv3_large.pth


In [None]:
# ==== use best weights for complete evaluation (confusion matrix / PR curves / report) ====
# reload best weights
model.load_state_dict(torch.load(best_path, map_location=device))
val_loss, val_acc, probs, targets = evaluate(model, val_loader)
preds = probs.argmax(1)

print(f"[VALID] loss={val_loss:.4f} acc={val_acc:.4f}")
print("\n[Classification Report]\n", classification_report(targets, preds, target_names=class_names, digits=4))

# Confusion Matrix
cm = confusion_matrix(targets, preds, labels=list(range(num_classes)))
fig, ax = plt.subplots(figsize=(7,7))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
disp.plot(ax=ax, cmap="Blues", colorbar=False, xticks_rotation=45)
plt.tight_layout()
plt.savefig(RUN_DIR / "confusion_matrix.png", dpi=200); plt.close()

# PR Curves (each class)
plt.figure(figsize=(6,5))
for i, name in enumerate(class_names):
    y_true = (targets == i).astype(int)
    precision, recall, _ = precision_recall_curve(y_true, probs[:, i])
    ap = average_precision_score(y_true, probs[:, i])
    plt.plot(recall, precision, label=f"{name} AP={ap:.3f}")
plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title("Precision-Recall Curves"); plt.legend()
plt.tight_layout()
plt.savefig(RUN_DIR / "pr_curves.png", dpi=200); plt.close()

# save report text
with open(RUN_DIR / "classification_report.txt", "w") as f:
    f.write(classification_report(targets, preds, target_names=class_names, digits=4))


  with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):


[VALID] loss=0.3088 acc=0.9772

[Classification Report]
               precision    recall  f1-score   support

      others     1.0000    0.8940    0.9440       500
 pepper_bell     0.9357    1.0000    0.9668       495
      potato     0.9795    0.9977    0.9885       431
      tomato     0.9842    0.9960    0.9901       500
 whole_plant     0.9906    1.0000    0.9953       527

    accuracy                         0.9772      2453
   macro avg     0.9780    0.9775    0.9769      2453
weighted avg     0.9782    0.9772    0.9768      2453



In [None]:
# ==== save results to drive ====
SAVE_DIR = Path(SAVE_TO_DRIVE_DIR) / RUN_DIR.name
SAVE_DIR.parent.mkdir(parents=True, exist_ok=True)

# copy results
if SAVE_DIR.exists():
    shutil.rmtree(SAVE_DIR)
shutil.copytree(RUN_DIR, SAVE_DIR)

print(f"[OK] 已將結果複製到：{SAVE_DIR}")

# extra: also copy the zip file and "detected train/val structure" for reference
shutil.copy(str(zip_local), str(SAVE_DIR / "source_dataset.zip"))
with open(SAVE_DIR / "dataset_paths.txt", "w") as f:
    f.write(f"train_dir: {train_dir}\nval_dir  : {val_dir}\nclasses  : {class_names}\n")
print("[DONE]")


[OK] 已將結果複製到：/content/drive/MyDrive/Colab_Results/leaf_stage1_cls/20251204_144247
[DONE]
