In [1]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
from torchvision.models.segmentation import deeplabv3_resnet50
from sklearn.model_selection import train_test_split
import segmentation_models_pytorch as smp
from torch.utils.data import DataLoader, TensorDataset

# YOLO 데이터셋 경로
yolo_data_dir = './data/YOLO/images'

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# U-Net 모델 정의
class UNetModel(nn.Module):
    def __init__(self, num_classes):
        super(UNetModel, self).__init__()
        self.model = smp.Unet(
            encoder_name="resnet34",
            encoder_weights="imagenet",
            in_channels=3,
            classes=num_classes,
        )

    def forward(self, x):
        return self.model(x)
    

In [3]:
# YOLO 데이터 로드 및 학습/검증 분할
def load_yolo_data(data_dir):
    images = []
    labels = []
    for filename in os.listdir(data_dir):
        if filename.endswith(('.jpg', '.png', '.jpeg')):
            # 이미지 읽기 및 전처리
            img = cv2.imread(os.path.join(data_dir, filename))
            img = cv2.resize(img, (224, 224))
            # OpenCV는 BGR, 모델은 RGB 필요
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            images.append(img)
            
            # 더미 세그멘테이션 마스크 (실제 마스크로 대체 필요)
            label = np.zeros((224, 224), dtype=np.int64)
            labels.append(label)
    
    # NumPy 배열로 변환
    images = np.array(images)
    labels = np.array(labels)
    
    # 학습/검증 분할
    x_train, x_val, y_train, y_val = train_test_split(
        images, labels, test_size=0.2, random_state=42)
    
    return x_train, x_val, y_train, y_val

In [4]:
# U-Net 모델 학습 및 검증
model = UNetModel(num_classes=30)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
num_epochs = 30
best_f1_score = 0

# 하이퍼파라미터 설정
batch_size = 16  # 배치 크기 추가
num_epochs = 30
learning_rate = 0.001

In [5]:
# def calculate_iou(outputs, labels):
#     # 출력을 예측 클래스로 변환
#     pred = torch.argmax(outputs, dim=1)
    
#     # 실제 레이블과 예측 레이블 변환
#     pred = pred.cpu().numpy()

#     # labels가 이미 NumPy 배열인지 확인
#     if isinstance(labels, torch.Tensor):
#         labels = labels.cpu().numpy()
    
#     # IoU 계산
#     intersection = np.logical_and(pred, labels)
#     union = np.logical_or(pred, labels)
#     iou = np.sum(intersection) / (np.sum(union) + 1e-8)
    
#     return iou


In [6]:
def calculate_fwiou(outputs, labels):
    # 출력을 예측 클래스로 변환
    pred = torch.argmax(outputs, dim=1)
    
    # 실제 레이블과 예측 레이블 변환
    pred = pred.cpu().numpy()
    
    # labels가 텐서일 경우에만 변환
    if isinstance(labels, torch.Tensor):
        labels = labels.cpu().numpy()
    
    # 각 클래스별 IoU 계산
    class_iou = []
    num_classes = outputs.shape[1]  # 클래스 개수
    for cls in range(num_classes):
        pred_cls = (pred == cls)
        labels_cls = (labels == cls)
        
        intersection = np.logical_and(pred_cls, labels_cls)
        union = np.logical_or(pred_cls, labels_cls)
        
        class_iou.append(np.sum(intersection) / (np.sum(union) + 1e-8))
    
    # 클래스 가중치 계산 (클래스 빈도에 따라)
    flat_labels = labels.flatten() # `labels`는 평탄화(flatten)하여 1차원 배열로 변환
    class_weights = np.zeros(num_classes)  # 클래스 개수만큼 0으로 초기화
    for cls in range(num_classes):
        class_weights[cls] = np.sum(flat_labels == cls)
    
    # 클래스 빈도를 기준으로 가중치 정규화
    if np.sum(class_weights) > 0:
        class_weights = class_weights / np.sum(class_weights)
    
    # 가중 평균 IoU 계산
    fwiou = np.sum(np.array(class_iou) * class_weights)

    # # 디버깅용 출력
    # print("labels shape:", labels.shape)
    # print("class_weights shape:", class_weights.shape)

    return fwiou

In [7]:
def calculate_f1(outputs, labels):
    # 출력을 예측 클래스로 변환
    pred = torch.argmax(outputs, dim=1)
    
    # 실제 레이블과 예측 레이블 변환
    pred = pred.cpu().numpy()

    # labels가 텐서일 경우에만 변환
    if isinstance(labels, torch.Tensor):
        labels = labels.cpu().numpy()
    
    # 이진 분류로 가정 (첫 번째 클래스에 대해)
    pred_binary = (pred == 1)
    labels_binary = (labels == 1)
    
    # 정밀도, 재현율, F1 스코어 계산
    true_positive = np.sum((pred_binary == 1) & (labels_binary == 1))
    false_positive = np.sum((pred_binary == 1) & (labels_binary == 0))
    false_negative = np.sum((pred_binary == 0) & (labels_binary == 1))
    
    precision = true_positive / (true_positive + false_positive + 1e-8)
    recall = true_positive / (true_positive + false_negative + 1e-8)
    
    f1 = 2 * (precision * recall) / (precision + recall + 1e-8)
    return f1

In [8]:
x_train, x_val, y_train, y_val = load_yolo_data(yolo_data_dir)

# 텐서 데이터셋 및 데이터로더 생성
x_train_tensor = torch.from_numpy(x_train).permute(0, 3, 1, 2).float()
y_train_tensor = torch.from_numpy(y_train).to(torch.long)
x_val_tensor = torch.from_numpy(x_val).permute(0, 3, 1, 2).float()
y_val_tensor = torch.from_numpy(y_val).to(torch.long)

train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
val_dataset = TensorDataset(x_val_tensor, y_val_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [9]:
# 모델 학습
best_f1_score = 0

for epoch in range(num_epochs):
    
    # 학습 모드
    model.train()
    train_loss = 0
    train_fwiou = 0
    train_f1 = 0
    
    for batch_x, batch_y in train_loader:
        # 모델 입력 및 손실 계산
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        # print("outputs shape:", outputs.shape)

        # 학습 진행
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
        # 배치별 성능 지표 계산
        train_fwiou += calculate_fwiou(outputs, batch_y)
        train_f1 += calculate_f1(outputs, batch_y)

    # 평균 계산
    train_loss /= len(train_loader)
    train_fwiou /= len(train_loader)
    train_f1 /= len(train_loader)
    
    # 검증 모드
    model.eval()
    val_loss = 0
    val_fwiou = 0
    val_f1 = 0
    
    with torch.no_grad():
        for batch_x, batch_y in val_loader:
            # 모델 입력 및 손실 계산
            val_outputs = model(batch_x)
            loss = criterion(val_outputs, batch_y)
            
            val_loss += loss.item()
            
            # 배치별 성능 지표 계산    
            val_fwiou += calculate_fwiou(val_outputs, batch_y.numpy()) 
            val_f1 += calculate_f1(val_outputs, batch_y.numpy())
        
        # 평균 계산
        val_loss /= len(val_loader)
        val_fwiou /= len(val_loader)
        val_f1 /= len(val_loader)
    
    # 결과 출력
    print(f'Epoch [{epoch+1}/{num_epochs}] ')
    print(f'Train Loss: {train_loss:.4f}, Train fwIoU: {train_fwiou:.4f}, Train F1: {train_f1:.4f}')
    print('-     -       -       -       -       -       -       -')
    print(f'Val Loss: {val_loss:.4f}, Val fwIoU: {val_fwiou:.4f}, Val F1: {val_f1:.4f}')
    print('---------------------------------------------------------')

    # 가장 높은 F1 스코어를 가진 모델 저장
    if val_f1 > best_f1_score:
        best_f1_score = val_f1
        torch.save(model.state_dict(), './MODEL/best_model.pth')

Epoch [1/30] 
Train Loss: 0.8971, Train fwIoU: 0.8804, Train F1: 0.0000
-     -       -       -       -       -       -       -
Val Loss: 0.0370, Val fwIoU: 1.0000, Val F1: 0.0000
---------------------------------------------------------
Epoch [2/30] 
Train Loss: 0.0173, Train fwIoU: 1.0000, Train F1: 0.0000
-     -       -       -       -       -       -       -
Val Loss: 0.0109, Val fwIoU: 1.0000, Val F1: 0.0000
---------------------------------------------------------
Epoch [3/30] 
Train Loss: 0.0078, Train fwIoU: 1.0000, Train F1: 0.0000
-     -       -       -       -       -       -       -
Val Loss: 0.0059, Val fwIoU: 1.0000, Val F1: 0.0000
---------------------------------------------------------
Epoch [4/30] 
Train Loss: 0.0046, Train fwIoU: 1.0000, Train F1: 0.0000
-     -       -       -       -       -       -       -
Val Loss: 0.0037, Val fwIoU: 1.0000, Val F1: 0.0000
---------------------------------------------------------
Epoch [5/30] 
Train Loss: 0.0031, Train fwIoU: 1

KeyboardInterrupt: 