In [14]:
from pathlib import Path
import torch, torchvision
from torchvision import transforms
from PIL import Image
import csv
import torch
import torch.nn as nn
torch.manual_seed(1220)

<torch._C.Generator at 0x1a467f9e7b0>

In [15]:
TEST_DIR      = Path(r"./test_data/column")

MODEL_PATH_CLS    = "./best_model_class.h5"   # 0,1,2 -> 18,19,20
MODEL_PATH_DMG    = "./best_model_damage.pth"     # multi‑label 0/1/2
MODEL_PATH_CRK    = "./best_model_crack.pth"    # multi‑label 7 cracks

IMG_SIZE      = 224
DEVICE        = torch.device("cuda" if torch.cuda.is_available() else "cpu")
THR           = 0.5     

CRACK_CLASSES = [
    'Diagonal',
    'Horizontal',
    'Vertical',
    'X-shape'
]

tfm = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,)*3, (0.5,)*3)
])

def load_img(path):
    return tfm(Image.open(path).convert("RGB")).unsqueeze(0).to(DEVICE)  # [1,3,H,W]

In [16]:
from torchvision.models import vit_l_16, ViT_L_16_Weights

def class_model(num_classes=3, freeze_backbone=True):
    weights = ViT_L_16_Weights.IMAGENET1K_SWAG_LINEAR_V1
    model = vit_l_16(weights=weights)

    in_features = model.heads.head.in_features

    # 自定義分類頭
    model.heads.head = nn.Sequential(
        nn.LayerNorm(in_features),
        nn.Dropout(0.5),
        nn.Linear(in_features, 512),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(512, 128),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(128, num_classes)
    )

    if freeze_backbone:
        # 先全部凍結
        for param in model.parameters():
            param.requires_grad = False

        # 解凍分類頭
        for param in model.heads.parameters():
            param.requires_grad = True

        # 解凍最後 3 個 encoder blocks
        encoder_blocks = model.encoder.layers  # transformer blocks
        for block in encoder_blocks[-2:]:
            for param in block.parameters():
                param.requires_grad = True

        # 解凍所有 LayerNorm 或 normalization 層
        for module in model.modules():
            if isinstance(module, (nn.LayerNorm, nn.BatchNorm2d)):
                for param in module.parameters():
                    param.requires_grad = True

    return model, weights.transforms()

# 使用方式
model_cls, _ = class_model(num_classes=3, freeze_backbone=True)          # EfficientNet-V2-S
model_cls.load_state_dict(torch.load(MODEL_PATH_CLS, map_location=DEVICE))
model_cls = model_cls.to(DEVICE).eval()   


In [17]:
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights

def damage_model(num_classes=3, freeze_backbone=True):
    weights = EfficientNet_V2_S_Weights.IMAGENET1K_V1
    model = efficientnet_v2_s(weights=weights)

    in_features = model.classifier[1].in_features

    # 替換分類頭
    model.classifier = nn.Sequential(
        nn.Dropout(p=0.5),
        nn.Linear(in_features, 512),
        nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(512, 128),
        nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(128, num_classes)
    )

    if freeze_backbone:
        # 凍結所有參數
        for param in model.parameters():
            param.requires_grad = False

        # 解凍分類頭
        for param in model.classifier.parameters():
            param.requires_grad = True

        # 解凍最後一層 block
        last_block = model.features[-1]
        for param in last_block.parameters():
            param.requires_grad = True

        # 解凍所有 BatchNorm2d 層
        for module in model.modules():
            if isinstance(module, nn.BatchNorm2d):
                for param in module.parameters():
                    param.requires_grad = True

    return model, weights.transforms()

model_dmg, _ = damage_model(num_classes=3)         
model_dmg.load_state_dict(torch.load(MODEL_PATH_DMG, map_location=DEVICE))
model_dmg = model_dmg.to(DEVICE).eval()

In [18]:
from torchvision.models import vit_l_16, ViT_L_16_Weights

def crack_model(num_classes=4, freeze_backbone=True):
    weights = ViT_L_16_Weights.IMAGENET1K_SWAG_LINEAR_V1
    model = vit_l_16(weights=weights)

    in_features = model.heads.head.in_features

    # 自定義分類頭
    model.heads.head = nn.Sequential(
        nn.LayerNorm(in_features),
        nn.Dropout(0.5),
        nn.Linear(in_features, 512),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(512, 128),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(128, num_classes)
    )

    if freeze_backbone:
        # 先全部凍結
        for param in model.parameters():
            param.requires_grad = False

        # 解凍分類頭
        for param in model.heads.parameters():
            param.requires_grad = True

        # 解凍最後 3 個 encoder blocks
        encoder_blocks = model.encoder.layers  # transformer blocks
        for block in encoder_blocks[-3:]:
            for param in block.parameters():
                param.requires_grad = True

        # 解凍所有 LayerNorm 或 normalization 層
        for module in model.modules():
            if isinstance(module, (nn.LayerNorm, nn.BatchNorm2d)):
                for param in module.parameters():
                    param.requires_grad = True

    return model, weights.transforms()

# 使用方式
model_crk, _ = crack_model(num_classes=4, freeze_backbone=True)          
model_crk.load_state_dict(torch.load(MODEL_PATH_CRK, map_location=DEVICE))
model_crk = model_crk.to(DEVICE).eval()

In [19]:
def postprocess(structure_level, damage_labels, crack_names):
    final_labels = {structure_level}  # 使用 set() 初始化，放入基本結構類別標籤（18、19、20）

    # ----------------- A 類（18）-----------------
    if structure_level == 18:
        if any(label in damage_labels for label in [0, 1, 2]):
            final_labels.add(0)

        # 根據裂縫種類加上對應標籤
        for crack in crack_names:
            if crack  == 'X-shape':
                final_labels.add(3)
            elif crack == 'Diagonal':
                final_labels.add(4)
            elif crack == 'Horizontal':
                final_labels.add(8)
            elif crack == 'Vertical':
                final_labels.add(6)
        
        if final_labels == {18}:
            final_labels.add(0)
        elif final_labels == {18,3}:
            final_labels.add(4)

        if {18, 0, 3, 4}.issubset(final_labels):
            final_labels.intersection_update({18, 3, 4})
        
        elif 18 in final_labels and 0 in final_labels:
                final_labels.intersection_update({18, 0})
        
        
    # ----------------- B 類（19）-----------------
    elif structure_level == 19:
        # 不管 damage 標籤，全部忽略
        if 'Diagonal' in crack_names:
            final_labels.add(5)
        elif 'Vertical' in crack_names:
            final_labels.add(7)
        elif 'X-shape' in crack_names:
            final_labels.add(5)

    # ----------------- C 類（20）-----------------
    elif structure_level == 20:
        if 1 in damage_labels:  # 只處理 crack 類
            final_labels.add(1)

        if 'Horizontal' in crack_names:
            final_labels.add(9)
        elif len(crack_names) > 0:  # 如果還有其他裂縛（但不是水平裂縛）
            final_labels.add(10)

    # ----------------- 整理輸出 -----------------
    # 使用 sorted() 將結果排序並返回
    return sorted(final_labels)  # 轉換為排序過的 list 並返回


In [20]:
@torch.no_grad()  
def predict_one(image_path):

    image = load_img(image_path) 

    output_cls = model_cls(image)
    class_index = torch.softmax(output_cls, dim=1).argmax(dim=1).item()
    level_map = {0: 18, 1: 19, 2: 20} 
    level = level_map[class_index]  

    # 3. 第二階段：判斷是否有損傷 (多標籤：0=rebar, 1=crack, 2=spalling)
    output_dmg = model_dmg(image)
    probs_dmg = torch.sigmoid(output_dmg)[0]  
    damage_labels = [i for i, p in enumerate(probs_dmg) if p > THR] 

    # 4. 第三階段：判斷裂縫類型 (多標籤共7類)
    output_crk = model_crk(image)
    probs_crk = torch.sigmoid(output_crk)[0]
    crack_labels = [CRACK_CLASSES[i] for i, p in enumerate(probs_crk) if p > THR] 


    final = postprocess(level, damage_labels, crack_labels)

    return {
        "level": level,                       # 18, 19, or 20
        "damage_labels": damage_labels,       # 多標籤 0/1/2
        "crack_labels": crack_labels,         # 裂縫名稱清單
        "final_labels": final                 # 最終送出的標籤（會寫入 CSV）
    }

In [21]:
output_file = "./test_data/column.csv"

# 預測結果暫存在這裡
results = []

# 讀取資料夾內所有 .jpg
image_list = sorted(TEST_DIR.glob("*.jpg"))

for idx, img_path in enumerate(image_list, start=1):
    pred = predict_one(img_path)
    level = pred["level"]
    others = sorted([lab for lab in pred["final_labels"] if lab not in (18, 19, 20)])
    ordered_labels = [level] + others
    label_str = ",".join(str(x) for x in ordered_labels)

    results.append([idx, label_str])  # 暫存所有結果

# 寫入 CSV
with open(output_file, "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["ID", "class"])
    writer.writerows(results)

print(f"已將所有預測結果儲存到：{output_file}")


已將所有預測結果儲存到：./test_data/column.csv
