In [7]:
import os
import json
import cv2
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from tqdm import tqdm

# ─── 1. 경로 & 하이퍼파라미터 ────────────────────────────────
IMG_ROOT   = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Images\CRACK'
ANN_ROOT   = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Annotations\CRACK'
INPUT_SIZE = 32       # 매우 낮은 해상도
BATCH_SIZE = 32       # 배치 크게
NUM_EPOCHS = 3
NUM_CLASSES= 2        # 배경 vs 크랙
DEVICE     = torch.device('cpu')

# ─── 2. Dataset 정의 ────────────────────────────────────────
class CrackSegDataset(Dataset):
    def __init__(self, img_root, ann_root, img_tf, mask_tf):
        self.img_tf, self.mask_tf = img_tf, mask_tf
        self.items = []
        exts = ('.png','.jpg','.jpeg','.webp')
        for cls in os.listdir(img_root):
            img_dir = os.path.join(img_root, cls)
            ann_dir = os.path.join(ann_root, cls)
            if not os.path.isdir(img_dir) or not os.path.isdir(ann_dir): continue
            for fn in os.listdir(img_dir):
                if fn.lower().endswith(exts):
                    jp = os.path.join(ann_dir, os.path.splitext(fn)[0]+'.json')
                    if os.path.isfile(jp):
                        self.items.append((os.path.join(img_dir,fn), jp))
        assert self.items, "데이터가 없습니다."

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        img_path, json_path = self.items[idx]
        # 이미지 로드 (PIL로 통일)
        img = Image.open(img_path).convert('RGB')
        w,h = img.size

        # JSON → 빈 mask
        with open(json_path,'r',encoding='utf-8') as f: data = json.load(f)
        mask = np.zeros((h,w),dtype=np.uint8)
        for ann in data.get('annotations',[]):
            for poly in ann.get('polyline',[]):
                pts = np.array(poly).reshape(-1,2).astype(np.int32)
                cv2.polylines(mask,[pts],False,1,thickness=1)
        mask = Image.fromarray(mask)

        # Transform
        img  = self.img_tf(img)
        mask = self.mask_tf(mask).squeeze(0).long()
        return img, mask

# ─── 3. Transform 정의 ───────────────────────────────────────
img_tf = transforms.Compose([
        transforms.Resize((INPUT_SIZE, INPUT_SIZE)),  # <- 고정 크기
        transforms.ToTensor(),
 ])
mask_tf = transforms.Compose([
        transforms.Resize((INPUT_SIZE, INPUT_SIZE),
                      interpolation=transforms.InterpolationMode.NEAREST),
        transforms.PILToTensor(),
 ])

# ─── 4. 아주 간단한 1×1 Conv 모델 ───────────────────────────
class TinySegModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # 3ch → 16ch → ReLU → num_classes
        self.net = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, num_classes, kernel_size=1),
        )
    def forward(self, x):
        return self.net(x)

def main():
    # DataLoader
    ds = CrackSegDataset(IMG_ROOT, ANN_ROOT, img_tf, mask_tf)
    n_train = int(len(ds)*0.8)
    train_ds, val_ds = random_split(ds, [n_train, len(ds)-n_train])
    loader_params = dict(batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=False)
    train_loader = DataLoader(train_ds, **loader_params)
    val_loader   = DataLoader(val_ds,    batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

    # 모델/손실/옵티마이저
    model     = TinySegModel(NUM_CLASSES).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)  # 빠른 수렴은 기대 안 함

    best_acc = 0.0
    for epoch in range(1, NUM_EPOCHS+1):
        # train
        model.train()
        for imgs, masks in tqdm(train_loader, desc=f'Epoch {epoch}/{NUM_EPOCHS} [Train]'):
            imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
            optimizer.zero_grad()
            logits = model(imgs)
            loss   = criterion(logits, masks)
            loss.backward()
            optimizer.step()

        # eval
        model.eval()
        correct = total = 0
        with torch.no_grad():
            for imgs, masks in val_loader:
                imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
                preds = model(imgs).argmax(1)
                correct += (preds==masks).sum().item()
                total   += masks.numel()
        acc = correct/total
        print(f'Epoch {epoch}/{NUM_EPOCHS}  PixelAcc: {acc:.3f}')

        if acc > best_acc:
            best_acc = acc

    print('완료. 최종 PixelAcc (낮음이 보장됨):', best_acc)

if __name__=='__main__':
    main()


Epoch 1/3 [Train]:  22%|██▏       | 246/1128 [1:07:26<4:01:48, 16.45s/it]


TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'

In [9]:
import os
import json
import cv2
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from tqdm import tqdm

# ─── 1. 경로 & 하이퍼파라미터 ────────────────────────────────
IMG_ROOT   = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Images\CRACK'
ANN_ROOT   = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Annotations\CRACK'
INPUT_SIZE = 32       # 매우 낮은 해상도
BATCH_SIZE = 32       # 배치 크게
NUM_EPOCHS = 3
NUM_CLASSES= 2        # 배경 vs 크랙
DEVICE     = torch.device('cpu')

# ─── 2. Dataset 정의 ────────────────────────────────────────
class CrackSegDataset(Dataset):
    def __init__(self, img_root, ann_root, img_tf, mask_tf):
        self.img_tf, self.mask_tf = img_tf, mask_tf
        self.items = []
        exts = ('.png', '.jpg', '.jpeg', '.webp')
        for cls in os.listdir(img_root):
            img_dir = os.path.join(img_root, cls)
            ann_dir = os.path.join(ann_root, cls)
            if not os.path.isdir(img_dir) or not os.path.isdir(ann_dir):
                continue
            for fn in os.listdir(img_dir):
                if not fn.lower().endswith(exts):
                    continue
                jp = os.path.join(ann_dir, os.path.splitext(fn)[0] + '.json')
                if os.path.isfile(jp):
                    self.items.append((os.path.join(img_dir, fn), jp))
        assert self.items, "데이터가 없습니다."

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        img_path, json_path = self.items[idx]

        # 1) 이미지 로드: PIL로 통일 (webp도 지원 빌드라면 되지만 없으면 설치 필요)
        img = Image.open(img_path).convert('RGB')
        w, h = img.size

        # 2) JSON → 빈 mask 생성
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)

        mask = np.zeros((h, w), dtype=np.uint8)
        for ann in data.get('annotations', []):
            for poly in ann.get('polyline', []):
                # None 값이 섞여 있거나 길이 짝수 아니면 건너뛰기
                if poly is None or any(p is None for p in poly) or len(poly) % 2 != 0:
                    continue
                pts = np.array(poly, dtype=np.int32).reshape(-1, 2)
                cv2.polylines(mask, [pts], isClosed=False, color=1, thickness=1)

        mask = Image.fromarray(mask)

        # 3) Transform 적용 (고정 크기)
        img  = self.img_tf(img)
        mask = self.mask_tf(mask).squeeze(0).long()

        return img, mask

# ─── 3. Transform 정의 ───────────────────────────────────────
img_tf = transforms.Compose([
    transforms.Resize((INPUT_SIZE, INPUT_SIZE)),  # 가로×세로 고정
    transforms.ToTensor(),
])
mask_tf = transforms.Compose([
    transforms.Resize((INPUT_SIZE, INPUT_SIZE),
                      interpolation=transforms.InterpolationMode.NEAREST),
    transforms.PILToTensor(),
])

# ─── 4. 아주 간단한 1×1 Conv 모델 ───────────────────────────
class TinySegModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, num_classes, kernel_size=1),
        )
    def forward(self, x):
        return self.net(x)

# ─── 5. 학습/검증 루프 ───────────────────────────────────────
def main():
    # DataLoader 준비
    ds = CrackSegDataset(IMG_ROOT, ANN_ROOT, img_tf, mask_tf)
    n_train = int(len(ds) * 0.8)
    train_ds, val_ds = random_split(ds, [n_train, len(ds) - n_train])

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                              num_workers=0, pin_memory=False)
    val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False,
                              num_workers=0, pin_memory=False)

    # 모델/손실/옵티마이저
    model     = TinySegModel(NUM_CLASSES).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    best_acc = 0.0
    for epoch in range(1, NUM_EPOCHS + 1):
        # train
        model.train()
        for imgs, masks in tqdm(train_loader, desc=f'Epoch {epoch}/{NUM_EPOCHS} [Train]'):
            imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
            optimizer.zero_grad()
            logits = model(imgs)
            loss   = criterion(logits, masks)
            loss.backward()
            optimizer.step()

        # eval
        model.eval()
        correct = total = 0
        with torch.no_grad():
            for imgs, masks in val_loader:
                imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
                preds = model(imgs).argmax(1)
                correct += (preds == masks).sum().item()
                total   += masks.numel()
        acc = correct / total
        print(f'Epoch {epoch}/{NUM_EPOCHS}  PixelAcc: {acc:.3f}')

        if acc > best_acc:
            best_acc = acc

    print('완료. 최종 PixelAcc (낮음이 보장됨):', best_acc)

if __name__ == '__main__':
    main()


Epoch 1/3 [Train]: 100%|██████████| 1128/1128 [3:40:35<00:00, 11.73s/it] 


Epoch 1/3  PixelAcc: 0.998


Epoch 2/3 [Train]: 100%|██████████| 1128/1128 [1:10:21<00:00,  3.74s/it]


Epoch 2/3  PixelAcc: 0.998


Epoch 3/3 [Train]: 100%|██████████| 1128/1128 [1:07:56<00:00,  3.61s/it]


Epoch 3/3  PixelAcc: 0.998
완료. 최종 PixelAcc (낮음이 보장됨): 0.9983680642187847


In [None]:
# ─── 전체 코드: 학습 → 체크포인트 저장 → TorchScript 벤치마크 + 시각화 ─────────────────────────────

import os
import time
import json
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from tqdm import tqdm
from torchvision.transforms import ToPILImage

# ─── 1. 설정 & 경로 ───────────────────────────────────────────────
torch.set_num_threads(os.cpu_count())  # CPU 스레드 최대화
DEVICE      = torch.device('cpu')
IMG_ROOT    = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Images\CRACK'
ANN_ROOT    = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Annotations\CRACK'
INPUT_SIZE  = 32
BATCH_SIZE  = 32
NUM_EPOCHS  = 3
NUM_CLASSES = 2
BEST_FN     = 'best_tiny_seg.pth'
SCRIPTED_FN = 'tinyseg_scripted.pt'

print(f"Using {os.cpu_count()} CPU threads")

# ─── 2. Dataset/Transform 정의 ───────────────────────────────────────
class CrackSegDataset(Dataset):
    def __init__(self, img_root, ann_root, img_tf, mask_tf):
        self.img_tf, self.mask_tf = img_tf, mask_tf
        self.items = []
        for cls in sorted(os.listdir(img_root)):
            d1, d2 = os.path.join(img_root,cls), os.path.join(ann_root,cls)
            if not os.path.isdir(d1) or not os.path.isdir(d2): continue
            for fn in sorted(os.listdir(d1)):
                if fn.lower().endswith(('.png','.jpg','.jpeg','.webp')):
                    jp = os.path.join(d2, fn.rsplit('.',1)[0] + '.json')
                    if os.path.isfile(jp):
                        self.items.append((os.path.join(d1,fn), jp))
        assert self.items, "데이터가 없습니다."
    def __len__(self): return len(self.items)
    def __getitem__(self, idx):
        img_p, json_p = self.items[idx]
        img = Image.open(img_p).convert('RGB')
        w,h = img.size

        mask = np.zeros((h,w), dtype=np.uint8)
        with open(json_p,'r',encoding='utf-8') as f:
            data = json.load(f)
        for ann in data.get('annotations',[]):
            for poly in ann.get('polyline',[]):
                if not poly or len(poly)%2!=0 or any(p is None for p in poly):
                    continue
                pts = np.array(poly, np.int32).reshape(-1,2)
                cv2.polylines(mask, [pts], False, 1, thickness=1)
        mask = Image.fromarray(mask)

        img  = self.img_tf(img)
        mask = self.mask_tf(mask).squeeze(0).long()
        return img, mask

img_tf = transforms.Compose([
    transforms.Resize((INPUT_SIZE,INPUT_SIZE)),
    transforms.ToTensor(),
])
mask_tf = transforms.Compose([
    transforms.Resize((INPUT_SIZE,INPUT_SIZE),
                      interpolation=transforms.InterpolationMode.NEAREST),
    transforms.PILToTensor(),
])

# ─── 3. 모델 정의 ───────────────────────────────────────────────────
class TinySegModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, num_classes, kernel_size=1),
        )
    def forward(self, x):
        return self.net(x)

# ─── 4. 학습/검증 루프 + 체크포인트 저장 ───────────────────────────
# DataLoader 준비
full_ds    = CrackSegDataset(IMG_ROOT, ANN_ROOT, img_tf, mask_tf)
n_train    = int(len(full_ds)*0.8)
train_ds, val_ds = random_split(full_ds, [n_train, len(full_ds)-n_train])
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=os.cpu_count(), pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=os.cpu_count(), pin_memory=True)

model     = TinySegModel(NUM_CLASSES).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

best_acc = 0.0
for epoch in range(1, NUM_EPOCHS+1):
    # Train
    model.train()
    for imgs, masks in tqdm(train_loader, desc=f'Epoch {epoch}/{NUM_EPOCHS} ▶ Train'):
        imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
        optimizer.zero_grad()
        loss = criterion(model(imgs), masks)
        loss.backward()
        optimizer.step()

    # Val
    model.eval()
    corr = total = 0
    with torch.no_grad():
        for imgs, masks in val_loader:
            imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
            preds = model(imgs).argmax(1)
            corr += (preds==masks).sum().item()
            total += masks.numel()
    acc = corr/total
    print(f'Epoch {epoch} ▶ PixelAcc: {acc:.4f}')
    if acc > best_acc:
        best_acc = acc
        torch.save(model.state_dict(), BEST_FN)
        print(f' ↳ Saved best model ({best_acc:.4f})')

print("▶ Training complete. Best PixelAcc:", best_acc)

# ─── 5. 모델 로드 & TorchScript 변환 ─────────────────────────────
model.load_state_dict(torch.load(BEST_FN, map_location=DEVICE))
model.eval()
scripted = torch.jit.script(model)
scripted.save(SCRIPTED_FN)
print(f"▶ Loaded '{BEST_FN}' and saved TorchScript to '{SCRIPTED_FN}'")

# ─── 6. 워밍업 + 벤치마크 ───────────────────────────────────────
with torch.no_grad():
    # warm-up
    next(iter(val_loader))
    for imgs, _ in val_loader:
        _ = scripted(imgs.to(DEVICE))
        break
    print("▶ Warm-up done")
    start = time.time()
    for imgs, _ in val_loader:
        _ = scripted(imgs.to(DEVICE))
    elapsed = time.time() - start

print(f"▶ Inference on {len(val_ds)} samples: {elapsed:.2f}s → {len(val_ds)/elapsed:.1f} img/s")

# ─── 7. 결과 시각화 (5개 샘플 오버레이) ─────────────────────────
to_pil = ToPILImage()
for i in range(5):
    img, gt = val_ds[i]
    pred    = scripted(img.unsqueeze(0).to(DEVICE)).argmax(1).squeeze(0).cpu()
    img_np  = np.array(to_pil(img))
    ov      = img_np.copy()
    ov[pred.numpy()==1] = [255,0,0]

    plt.figure(figsize=(9,3))
    plt.subplot(1,3,1); plt.imshow(img_np);         plt.title("Input");      plt.axis("off")
    plt.subplot(1,3,2); plt.imshow(gt.numpy(),cmap='gray'); plt.title("GT Mask"); plt.axis("off")
    plt.subplot(1,3,3); plt.imshow(ov);              plt.title("Overlay");   plt.axis("off")
    plt.show()


Using 16 CPU threads




In [None]:
import os
import json
import cv2
import numpy as np
from PIL import Image
import torch

# ── 호환성 패치: torch.version.hip 속성 없을 때 대비 ─────────────────────────────
if not hasattr(torch, 'version'):
    class _v:
        hip = False
    torch.version = _v()
# ────────────────────────────────────────────────────────────────────────────────

import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torch.utils.data._utils.collate import default_collate
from torchvision import transforms
from torchvision.transforms import InterpolationMode
from tqdm import tqdm

import matplotlib.pyplot as plt
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    jaccard_score,
    r2_score
)

# ─── 1. 경로 & 하이퍼파라미터 ─────────────────────────────────────────────────
IMG_ROOT    = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Images\CRACK'
ANN_ROOT    = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Annotations\CRACK'
INPUT_SIZE  = 32
BATCH_SIZE  = 32
NUM_EPOCHS  = 3
NUM_CLASSES = 2
DEVICE      = torch.device('cpu')  # GPU 사용 시 'cuda'

# ─── 2. collate_fn: None 샘플 걸러내기 ─────────────────────────────────────────
def filter_none_collate(batch):
    batch = [b for b in batch if b is not None]
    if not batch:
        return None
    return default_collate(batch)

# ─── 3. Dataset 정의 ─────────────────────────────────────────────────────────
class CrackSegDataset(Dataset):
    def __init__(self, img_root, ann_root, img_tf, mask_tf):
        self.img_tf, self.mask_tf = img_tf, mask_tf
        self.items = []
        exts = ('.png', '.jpg', '.jpeg', '.webp')
        for cls in os.listdir(img_root):
            img_dir = os.path.join(img_root, cls)
            ann_dir = os.path.join(ann_root, cls)
            if not os.path.isdir(img_dir) or not os.path.isdir(ann_dir):
                continue
            for fn in os.listdir(img_dir):
                if not fn.lower().endswith(exts):
                    continue
                jp = os.path.join(ann_dir, os.path.splitext(fn)[0] + '.json')
                if os.path.isfile(jp):
                    self.items.append((os.path.join(img_dir, fn), jp))
        assert self.items, f"데이터가 없습니다: {img_root}"

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        img_path, json_path = self.items[idx]
        try:
            # 1) 이미지 읽기 (cv2 우선, 실패 시 PIL)
            arr = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)
            if arr is None:
                pil = Image.open(img_path).convert('RGB')
                arr = np.array(pil)
            else:
                if arr.ndim == 3 and arr.shape[2] == 4:
                    arr = cv2.cvtColor(arr, cv2.COLOR_BGRA2RGB)
                else:
                    arr = cv2.cvtColor(arr, cv2.COLOR_BGR2RGB)
            img = Image.fromarray(arr)

            # 2) JSON → mask 생성
            w, h = img.size
            with open(json_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            mask = np.zeros((h, w), dtype=np.uint8)
            for ann in data.get('annotations', []):
                for poly in ann.get('polyline', []):
                    if not poly or any(p is None for p in poly) or len(poly) % 2 != 0:
                        continue
                    pts = np.array(poly, dtype=np.int32).reshape(-1, 2)
                    cv2.polylines(mask, [pts], isClosed=False, color=1, thickness=1)
            mask = Image.fromarray(mask)

            # 3) Transform 적용
            img_t  = self.img_tf(img)
            mask_t = self.mask_tf(mask).squeeze(0).long()
            return img_t, mask_t

        except Exception as e:
            print(f"[Warning] 스킵: {img_path} → {e}")
            return None

# ─── 4. Transform 정의 ───────────────────────────────────────────────────────
img_tf = transforms.Compose([
    transforms.Resize((INPUT_SIZE, INPUT_SIZE)),
    transforms.ToTensor(),
])
mask_tf = transforms.Compose([
    transforms.Resize((INPUT_SIZE, INPUT_SIZE), interpolation=InterpolationMode.NEAREST),
    transforms.PILToTensor(),
])

# ─── 5. 모델 정의 ───────────────────────────────────────────────────────────
class TinySegModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, num_classes, kernel_size=1),
        )
    def forward(self, x):
        return self.net(x)

# ─── 6. 시각화 함수 ─────────────────────────────────────────────────────────
def visualize_predictions(model, loader, num_samples=5, output_dir='visualizations'):
    os.makedirs(output_dir, exist_ok=True)
    model.eval()
    count = 0
    with torch.no_grad():
        for batch in loader:
            if batch is None:
                continue
            imgs, masks = batch
            imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
            preds = model(imgs).argmax(1)
            for i in range(imgs.size(0)):
                if count >= num_samples:
                    return
                fig, axes = plt.subplots(1, 3, figsize=(12, 4))
                axes[0].imshow(imgs[i].cpu().permute(1, 2, 0))
                axes[0].set_title('Input');    axes[0].axis('off')
                axes[1].imshow(masks[i].cpu(), cmap='gray')
                axes[1].set_title('Ground Truth'); axes[1].axis('off')
                axes[2].imshow(preds[i].cpu(), cmap='gray')
                axes[2].set_title('Prediction');   axes[2].axis('off')
                plt.savefig(os.path.join(output_dir, f'pred_{count}.png'))
                plt.close(fig)
                count += 1

# ─── 7. 학습·평가·저장·시각화 루프 ──────────────────────────────────────────
def main():
    # Dataset & DataLoader
    ds = CrackSegDataset(IMG_ROOT, ANN_ROOT, img_tf, mask_tf)
    n_train = int(len(ds) * 0.8)
    train_ds, val_ds = random_split(ds, [n_train, len(ds) - n_train])
    train_loader = DataLoader(
        train_ds,
        batch_size=BATCH_SIZE,
        shuffle=True,
        collate_fn=filter_none_collate
    )
    val_loader = DataLoader(
        val_ds,
        batch_size=BATCH_SIZE,
        shuffle=False,
        collate_fn=filter_none_collate
    )

    # 모델·손실·옵티마이저 설정
    model     = TinySegModel(NUM_CLASSES).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    best_iou = 0.0
    best_path = 'best_model.pt'

    for epoch in range(1, NUM_EPOCHS + 1):
        model.train()
        for batch in tqdm(train_loader, desc=f'Epoch {epoch}/{NUM_EPOCHS} [Train]'):
            if batch is None:
                continue
            imgs, masks = batch
            imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
            optimizer.zero_grad()
            loss = criterion(model(imgs), masks)
            loss.backward()
            optimizer.step()

        model.eval()
        all_preds, all_masks = [], []
        with torch.no_grad():
            for batch in val_loader:
                if batch is None:
                    continue
                imgs, masks = batch
                imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
                preds = model(imgs).argmax(1)
                all_preds.append(preds.cpu().numpy().ravel())
                all_masks.append(masks.cpu().numpy().ravel())

        y_pred = np.concatenate(all_preds)
        y_true = np.concatenate(all_masks)

        acc   = accuracy_score(y_true, y_pred)
        prec  = precision_score(y_true, y_pred, zero_division=0)
        rec   = recall_score(y_true, y_pred, zero_division=0)
        f1    = f1_score(y_true, y_pred, zero_division=0)
        iou   = jaccard_score(y_true, y_pred, zero_division=0)
        r2    = r2_score(y_true, y_pred)

        print(f'Epoch {epoch} Metrics:')
        print(f'  Accuracy : {acc:.4f}')
        print(f'  Precision: {prec:.4f}')
        print(f'  Recall   : {rec:.4f}')
        print(f'  F1-score : {f1:.4f}')
        print(f'  IoU      : {iou:.4f}')
        print(f'  R2-score : {r2:.4f}')

        if iou > best_iou:
            best_iou = iou
            torch.save(model.state_dict(), best_path)
            print(f'  -> New best model saved (IoU={iou:.4f})')

    print('Training complete.')
    print(f'Best validation IoU: {best_iou:.4f}')
    print(f'Best model saved to: {best_path}')

    # 저장된 best 모델로 시각화
    best_model = TinySegModel(NUM_CLASSES).to(DEVICE)
    best_model.load_state_dict(torch.load(best_path, map_location=DEVICE))
    visualize_predictions(best_model, val_loader, num_samples=5)

if __name__ == '__main__':
    main()


Epoch 1/3 [Train]:   0%|          | 0/1128 [00:00<?, ?it/s]



Epoch 1/3 [Train]:   1%|          | 11/1128 [02:18<3:42:57, 11.98s/it]



Epoch 1/3 [Train]: 100%|██████████| 1128/1128 [4:52:00<00:00, 15.53s/it] 


Epoch 1 Metrics:
  Accuracy : 0.9983
  Precision: 0.0000
  Recall   : 0.0000
  F1-score : 0.0000
  IoU      : 0.0000
  R2-score : -0.0017


Epoch 2/3 [Train]:  26%|██▌       | 293/1128 [1:03:47<3:38:19, 15.69s/it]



Epoch 2/3 [Train]:  26%|██▌       | 294/1128 [1:03:57<3:16:43, 14.15s/it]



Epoch 2/3 [Train]:  26%|██▌       | 295/1128 [1:04:03<2:39:14, 11.47s/it]



Epoch 2/3 [Train]:  26%|██▌       | 296/1128 [1:04:07<2:10:00,  9.38s/it]



Epoch 2/3 [Train]:  26%|██▋       | 298/1128 [1:04:15<1:30:55,  6.57s/it]



Epoch 2/3 [Train]:  27%|██▋       | 299/1128 [1:04:19<1:19:43,  5.77s/it]



Epoch 2/3 [Train]:  27%|██▋       | 300/1128 [1:04:23<1:11:25,  5.18s/it]



Epoch 2/3 [Train]:  27%|██▋       | 301/1128 [1:04:26<1:05:17,  4.74s/it]



Epoch 2/3 [Train]:  27%|██▋       | 302/1128 [1:04:31<1:03:02,  4.58s/it]



Epoch 2/3 [Train]:  27%|██▋       | 303/1128 [1:04:35<1:01:58,  4.51s/it]



Epoch 2/3 [Train]:  27%|██▋       | 304/1128 [1:04:39<59:51,  4.36s/it]  



Epoch 2/3 [Train]:  27%|██▋       | 305/1128 [1:04:43<58:43,  4.28s/it]



Epoch 2/3 [Train]:  27%|██▋       | 306/1128 [1:04:47<56:45,  4.14s/it]



Epoch 2/3 [Train]:  27%|██▋       | 307/1128 [1:04:51<56:03,  4.10s/it]



Epoch 2/3 [Train]:  27%|██▋       | 308/1128 [1:04:54<54:01,  3.95s/it]



Epoch 2/3 [Train]:  27%|██▋       | 309/1128 [1:04:58<53:22,  3.91s/it]



Epoch 2/3 [Train]:  27%|██▋       | 310/1128 [1:05:02<53:20,  3.91s/it]



Epoch 2/3 [Train]:  28%|██▊       | 311/1128 [1:05:06<52:44,  3.87s/it]



Epoch 2/3 [Train]:  28%|██▊       | 312/1128 [1:05:13<1:07:25,  4.96s/it]



Epoch 2/3 [Train]:  28%|██▊       | 313/1128 [1:05:17<1:03:02,  4.64s/it]



Epoch 2/3 [Train]:  28%|██▊       | 314/1128 [1:05:21<1:00:17,  4.44s/it]



Epoch 2/3 [Train]:  28%|██▊       | 315/1128 [1:05:25<57:36,  4.25s/it]  



Epoch 2/3 [Train]:  28%|██▊       | 316/1128 [1:31:05<104:50:57, 464.85s/it]



Epoch 2/3 [Train]:  28%|██▊       | 317/1128 [1:56:10<175:03:44, 777.10s/it]



Epoch 2/3 [Train]:  28%|██▊       | 318/1128 [2:21:16<223:59:30, 995.52s/it]



Epoch 2/3 [Train]:  28%|██▊       | 319/1128 [2:31:21<197:23:45, 878.40s/it]



Epoch 2/3 [Train]:  46%|████▋     | 522/1128 [3:46:02<2:55:20, 17.36s/it]   



Epoch 2/3 [Train]: 100%|██████████| 1128/1128 [7:26:40<00:00, 23.76s/it] 


Epoch 2 Metrics:
  Accuracy : 0.9983
  Precision: 0.0000
  Recall   : 0.0000
  F1-score : 0.0000
  IoU      : 0.0000
  R2-score : -0.0017


Epoch 3/3 [Train]:   5%|▍         | 55/1128 [09:32<3:04:27, 10.31s/it]



Epoch 3/3 [Train]:  10%|█         | 118/1128 [21:33<4:56:04, 17.59s/it]



Epoch 3/3 [Train]:  17%|█▋        | 193/1128 [37:03<2:47:52, 10.77s/it]



Epoch 3/3 [Train]:  22%|██▏       | 246/1128 [48:06<4:55:35, 20.11s/it]



Epoch 3/3 [Train]:  26%|██▌       | 295/1128 [1:00:04<4:26:57, 19.23s/it]



Epoch 3/3 [Train]:  34%|███▍      | 384/1128 [1:17:44<2:41:33, 13.03s/it]



Epoch 3/3 [Train]:  42%|████▏     | 474/1128 [1:37:21<1:40:55,  9.26s/it]



Epoch 3/3 [Train]:  46%|████▌     | 516/1128 [1:47:23<3:17:00, 19.32s/it]



Epoch 3/3 [Train]:  50%|████▉     | 560/1128 [1:57:19<2:02:48, 12.97s/it]



Epoch 3/3 [Train]:  51%|█████     | 578/1128 [2:01:42<2:04:02, 13.53s/it]



Epoch 3/3 [Train]:  53%|█████▎    | 598/1128 [2:05:37<1:50:46, 12.54s/it]



Epoch 3/3 [Train]:  53%|█████▎    | 603/1128 [2:06:32<1:34:06, 10.75s/it]



Epoch 3/3 [Train]:  56%|█████▌    | 634/1128 [2:12:37<1:16:51,  9.33s/it]



Epoch 3/3 [Train]:  60%|█████▉    | 673/1128 [2:22:06<1:39:20, 13.10s/it]



Epoch 3/3 [Train]:  63%|██████▎   | 712/1128 [2:30:04<1:33:49, 13.53s/it]



Epoch 3/3 [Train]:  63%|██████▎   | 713/1128 [2:30:20<1:38:52, 14.30s/it]



Epoch 3/3 [Train]:  65%|██████▍   | 733/1128 [2:35:30<1:36:33, 14.67s/it]



Epoch 3/3 [Train]:  69%|██████▉   | 776/1128 [2:45:56<1:15:09, 12.81s/it]



Epoch 3/3 [Train]:  72%|███████▏  | 810/1128 [2:53:15<1:14:43, 14.10s/it]



Epoch 3/3 [Train]:  72%|███████▏  | 816/1128 [2:54:42<1:09:54, 13.44s/it]



Epoch 3/3 [Train]:  75%|███████▍  | 841/1128 [3:00:31<1:21:41, 17.08s/it]



Epoch 3/3 [Train]:  82%|████████▏ | 922/1128 [3:19:15<50:31, 14.72s/it]  



Epoch 3/3 [Train]:  82%|████████▏ | 925/1128 [3:19:45<39:13, 11.59s/it]



Epoch 3/3 [Train]:  85%|████████▌ | 959/1128 [3:27:05<41:20, 14.68s/it]



Epoch 3/3 [Train]:  85%|████████▌ | 963/1128 [3:27:40<27:02,  9.83s/it]



Epoch 3/3 [Train]:  88%|████████▊ | 989/1128 [3:33:09<32:14, 13.92s/it]



Epoch 3/3 [Train]:  90%|████████▉ | 1010/1128 [3:37:43<27:55, 14.20s/it]

In [1]:
# ─── 8. Pixel-wise mAP50 & R² 계산 (IPython Friendly) ──────────────────────────
# 필요한 패키지 설치:
# !pip install scikit-learn

import torch.nn as nn
import numpy as np
import torch
from sklearn.metrics import average_precision_score, r2_score

class TinySegModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(16, num_classes, kernel_size=1),
        )
    def forward(self, x):
        return self.net(x)


# 1) PyTorch 모델 다시 로드 (TorchScript 대신 원본 모델 사용)
metrics_model = TinySegModel(NUM_CLASSES).to(DEVICE)
metrics_model.load_state_dict(torch.load(BEST_FN, map_location=DEVICE))
metrics_model.eval()

# 2) 스코어 & GT 수집
all_scores = []
all_truths = []

with torch.no_grad():
    for imgs, masks in val_loader:
        imgs = imgs.to(DEVICE)
        masks = masks.to(DEVICE)

        logits = metrics_model(imgs)                  # (B, 2, H, W)
        probs  = torch.softmax(logits, dim=1)[:, 1]   # (B, H, W): crack class 확률
        all_scores.append(probs.cpu().view(-1).numpy())
        all_truths.append(masks.view(-1).cpu().numpy())

y_score = np.concatenate(all_scores)
y_true  = np.concatenate(all_truths)

# 3) 지표 계산
ap50 = average_precision_score(y_true, y_score)  # 픽셀 단위 AP ≈ mAP@50
r2   = r2_score(y_true, y_score)

print(f"Pixel-wise AP (≈mAP50): {ap50:.4f}")
print(f"Pixel-wise R²:         {r2:.4f}")


KeyboardInterrupt: 

In [None]:
import os, json, cv2
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, models
from torchvision.models.segmentation import fcn_resnet18
from tqdm import tqdm

# ─── 1. 전역 설정 ──────────────────────────────────────────
IMG_ROOT   = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Images\CRACK'
ANN_ROOT   = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Annotations\CRACK'
MASK_ROOT  = r'E:\Hyunji4579 Dropbox\Hyunji4579의 팀 폴더\도로장애물\Masks\CRACK'  # 캐시 저장 폴더
os.makedirs(MASK_ROOT, exist_ok=True)

INPUT_SIZE = 128   # 중간 해상도
BATCH_SIZE = 32    # CPU 메모리 허용치에서 최대로
NUM_EPOCHS = 30
NUM_CLASSES= 2     # 배경 vs 크랙
DEVICE     = torch.device('cpu')

# CPU 쓰레드 수 최대화
torch.set_num_threads(os.cpu_count())

# ─── 2. Dataset w/ 마스크 캐싱 ───────────────────────────
class CrackSegDataset(Dataset):
    def __init__(self, img_root, ann_root, mask_root, img_tf, mask_tf):
        self.img_tf, self.mask_tf = img_tf, mask_tf
        self.mask_root = mask_root
        self.items = []
        exts = ('.png','.jpg','.jpeg','.webp')
        for cls in sorted(os.listdir(img_root)):
            img_dir = os.path.join(img_root, cls)
            ann_dir = os.path.join(ann_root, cls)
            mask_dir = os.path.join(mask_root, cls)
            os.makedirs(mask_dir, exist_ok=True)
            if not os.path.isdir(img_dir) or not os.path.isdir(ann_dir): continue
            for fn in sorted(os.listdir(img_dir)):
                if not fn.lower().endswith(exts): continue
                jp = os.path.join(ann_dir, os.path.splitext(fn)[0] + '.json')
                if os.path.isfile(jp):
                    self.items.append((cls, fn, jp))
        assert self.items, "데이터 없음"

    def __len__(self): return len(self.items)

    def __getitem__(self, idx):
        cls, fn, json_path = self.items[idx]
        img_path = os.path.join(IMG_ROOT, cls, fn)
        mask_path= os.path.join(self.mask_root, cls, os.path.splitext(fn)[0] + '.png')

        # 1) 이미지 로드 (PIL)
        img = Image.open(img_path).convert('RGB')

        # 2) 마스크 캐싱: 없으면 생성→저장, 있으면 바로 로드
        if not os.path.isfile(mask_path):
            # JSON → mask
            with open(json_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            w,h = img.size
            mask = np.zeros((h,w), dtype=np.uint8)
            for ann in data.get('annotations', []):
                for poly in ann.get('polyline', []):
                    pts = np.array(poly).reshape(-1,2).astype(np.int32)
                    cv2.polylines(mask, [pts], False, 1, thickness=3)
            Image.fromarray(mask).save(mask_path)
        mask = Image.open(mask_path)

        # 3) Transform
        img  = self.img_tf(img)
        mask = self.mask_tf(mask).squeeze(0).long()
        return img, mask

# ─── 3. Transform 정의 ────────────────────────────────────
img_tf = transforms.Compose([
    transforms.Resize((INPUT_SIZE,INPUT_SIZE)),
    transforms.ColorJitter(0.1,0.1,0.1,0.1),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],
                         [0.229,0.224,0.225]),
])
mask_tf = transforms.Compose([
    transforms.Resize((INPUT_SIZE,INPUT_SIZE), interpolation=transforms.InterpolationMode.NEAREST),
    transforms.PILToTensor(),
])

# ─── 4. 학습/검증 함수 ────────────────────────────────────
def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss = 0
    for imgs, masks in loader:
        imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
        optimizer.zero_grad()
        out = model(imgs)['out']
        loss = criterion(out, masks)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * imgs.size(0)
    return total_loss / len(loader.dataset)

@torch.no_grad()
def eval_one_epoch(model, loader, criterion):
    model.eval()
    total_loss = correct = total = 0
    for imgs, masks in loader:
        imgs, masks = imgs.to(DEVICE), masks.to(DEVICE)
        out = model(imgs)['out']
        to
