# 스프린트 미션 5

### 3팀 전수현

### 제출일자: 2025.06.24


## 과제 목표: Autoencoder 기반 Denoising

1. 최종 목표: 'denoising-dirty-documents' 데이터셋의 노이즈 낀 문서 이미지를 깨끗한 원본 이미지로 복원하는 딥러닝 모델을 만든다.
2. 타겟 모델: Autoencoder
3. 중점 과제:
   - albumentations를 활용하여 Data Augmentation을 적극적으로 활용하여 모델의 일반화 성능을 높인다.
   - W&B를 연동하여 모든 실험 과정을 추적하고 시각화하여 최적의 모델 성능을 찾는다.


이번 과제에서 중점 적으로 활용할 패키지는 `albumentations`와 `W&B`이다. 전통적인 DAE 문제를 푸는 과정을 최신 패키지 적용을 통해서 구현해보는 것이 개인적인 목표이다.


## 0. Import Libraries


In [1]:
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import cv2
from tqdm import tqdm  # 진행 상황을 보여주는 바

# PyTorch 라이브러리
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms


# 1. 데이터 증강(Augmentation) 라이브러리
# torchvision도 좋지만, 이미지 변환에 더 강력하고 직관적인 albumentations를 사용
import albumentations as A
from albumentations.pytorch import ToTensorV2

# 2. 실험 추적과 하이퍼 파라미터 튜닝을 위한 W&B 라이브러리 불러오기
import wandb



## 1. Set Configuration


In [2]:
# ====================================================================
# STEP 1: 각종 Configuration 관리
# ====================================================================
# 나중에 하이퍼파라미터 튜닝을 편하게 하려면, 이렇게 설정값을 모아두는 습관이 중요함.

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

# 주요 하이퍼 파라미터
LEARNING_RATE = 1e-4
BATCH_SIZE = 16
NUM_EPOCHS = 25

# data path 설정
DATA_PATH = "./data/denoising-dirty-documents/"  # 데이터셋이 있는 경로
train_dir = os.path.join(DATA_PATH, "train/train")
train_cleaned_dir = os.path.join(DATA_PATH, "train_cleaned/train_cleaned")
test_dir = os.path.join(DATA_PATH, "test/test")

Using device: cuda


In [3]:
# 해당 데이터셋의 이미지를
def load_images_from_folder(train_dir):
    images = []
    for filename in os.listdir(train_dir):
        if filename.endswith(".png"):  # PNG 파일만 가져오기
            img_path = os.path.join(train_dir, filename)
            img = cv2.imread(img_path)  # OpenCV로 이미지를 읽음 (BGR 형식)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # BGR → RGB 변환
            images.append(img)
    return images


train_images = load_images_from_folder(train_dir)
train_cleaned_images = load_images_from_folder(train_cleaned_dir)
test_images = load_images_from_folder(test_dir)


print(f"Train: {len(train_images)}")
print(f"Train Cleaned: {len(train_cleaned_images)}")
print(f"Test: {len(test_images)}")

Train: 144
Train Cleaned: 144
Test: 72


데이터셋의 갯수는 저렇게 구성되어 있는데 문제는 data의 크기가 일정하지 않다는 것이다.


In [5]:
# 각 데이터셋의 이미지 크기 추출
train_sizes = [
    img.shape[:2] for img in train_images
]  # train 이미지의 크기 (height, width)
train_cleaned_sizes = [
    img.shape[:2] for img in train_cleaned_images
]  # train_cleaned 이미지의 크기
test_sizes = [img.shape[:2] for img in test_images]  # test 이미지의 크기


train_unique_sizes = np.unique(train_sizes, axis=0)  # train 이미지의 유니크 크기
train_cleaned_unique_sizes = np.unique(
    train_cleaned_sizes, axis=0
)  # train_cleaned 유니크 크기
test_unique_sizes = np.unique(test_sizes, axis=0)  # test 유니크 크기
# train_unique_sizes


# 결과 출력
print("데이터 사이즈는 몇종류일까 from baseline code")
print("Unique sizes in train:\n", train_unique_sizes)
print("Unique sizes in train_cleaned:\n", train_cleaned_unique_sizes)
print("Unique sizes in test:\n", test_unique_sizes)

데이터 사이즈는 몇종류일까 from baseline code
Unique sizes in train:
 [[258 540]
 [420 540]]
Unique sizes in train_cleaned:
 [[258 540]
 [420 540]]
Unique sizes in test:
 [[258 540]
 [420 540]]


데이터 사이즈는 2가지 형태가 있기 때문에 추가적인 pre-processing 작업이 필요하다. 두 가지 경우의 수가 있는데,  
1) resize를 하거나
2) padding을 하거나  

그러나 이번 과제는 denoising 과제이므로 resizing의 경우 문서의 비율이 깨져서 글자의 특성을 망가뜨릴 수 있기 때문에 padding이 더 적합한 프로세싱일 것이다.

## 2. Load data

PyTorch의 Dataset 클래스를 상속받아서 우리 데이터셋에 맞는 클래스를 들고 데이터 로드까지.


In [None]:
# ====================================================================
# 커스텀 데이터셋 클래스 구현
# ====================================================================
class TrainDataset(Dataset):
    def __init__(self, dirty_dir, clean_dir, transform=None):
        self.dirty_dir = dirty_dir
        self.clean_dir = clean_dir
        self.transform = transform
        self.dirty_images = os.listdir(self.dirty_dir)

    def __len__(self):
        return len(self.dirty_images)

    def __getitem__(self, index):
        img_name = self.dirty_images[index]
        dirty_img_path = os.path.join(self.dirty_dir, img_name)
        clean_img_path = os.path.join(
            self.clean_dir, img_name
        )  # clean 이미지 파일 이름이 같다고 가정

        dirty_image = np.array(Image.open(dirty_img_path).convert("RGB"))
        clean_image = np.array(Image.open(clean_img_path).convert("RGB"))

        # Augmentation 적용
        if self.transform:
            # DAE에서는 보통 입력(dirty)에만 augmentation을 적용해.
            # 하지만 geometric 변환(회전, 뒤집기 등)은 clean 이미지에도 동일하게 적용해야 할 수도 있어.
            # 여기서는 간단하게 dirty 이미지에만 적용하는 것으로 시작해보자.
            augmented = self.transform(image=dirty_image)
            dirty_image = augmented["image"]
        # TODO: dirty_image와 clean_image를 텐서로 변환해서 반환
        return dirty_image, clean_image

In [8]:
train_data = TrainDataset(train_dir, train_cleaned_dir)
for x, y in iter(train_data):
    pass
a = x.shape
a[:2]
# cv2.imshow(x)
# np.unique(train_data, axis=0)

(420, 540)

In [None]:
# ====================================================================
# 데이터 증강 (Data Augmentation)
# ====================================================================
# TODO: Albumentations를 사용해서 train과 validation용 변환 파이프라인 정의하기
TARGET_HEIGHT = 540
TARGET_WIDTH = 420
train_transform = A.Compose(
    [
        A.ToGray(p=1.0),  # RGB → Gray
        A.LongestMaxSize(max_size=TARGET_WIDTH, interpolation=cv2.INTER_AREA),
        A.PadIfNeeded(
            min_height=TARGET_HEIGHT,
            min_width=TARGET_WIDTH,
            border_mode=cv2.BORDER_CONSTANT,
            value=0,
        ),  # zero-padding
        # A.Resize(),
        A.Rotate(limit=10, p=0.5),
        A.HorizontalFlip(p=0.5),
        # 정규화 및 텐서 변환
        A.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0], max_pixel_value=255.0),
        ToTensorV2(),
    ]
)

val_transform = A.Compose(
    [
        # A.Resize(),
        A.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0], max_pixel_value=255.0),
        ToTensorV2(),
    ]
)

In [None]:
# ====================================================================
# 데이터로더 생성
# ====================================================================
# TODO: DenoiseDataset과 DataLoader를 사용해서 train_loader, val_loader 만들기
train_dataset = DenoiseDataset(...)
train_loader = DataLoader(...)

## 3. Model implementation


In [None]:
# ====================================================================
# STEP 4: U-Net 모델
# ====================================================================
# U-Net을 구성하는 기본 블록 (Convolution 2번)
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        # TODO: Conv2d, BatchNorm2d, ReLU를 사용해서 DoubleConv 블록 구현
        pass

    def forward(self, x):
        return  # x


class UNET(nn.Module):
    def __init__(self, in_channels=3, out_channels=3, features=[64, 128, 256, 512]):
        super(UNET, self).__init__()
        # TODO: DoubleConv와 MaxPool2d를 사용해서 Encoder(Down) 부분 구현
        # self.downs = nn.ModuleList()
        # self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # TODO: DoubleConv와 ConvTranspose2d를 사용해서 Decoder(Up) 부분 구현
        # self.ups = nn.ModuleList()

        # TODO: 가장 아래쪽 Bottleneck 부분 구현
        # self.bottleneck = DoubleConv(...)

        # TODO: 최종 출력 레이어 (1x1 Conv) 구현
        # self.final_conv = nn.Conv2d(...)

    def forward(self, x):
        # TODO: Encoder -> Bottleneck -> Decoder 순서로 forward pass 구현
        # 중요한 건! Encoder에서 나온 출력을 skip_connections 리스트에 저장해뒀다가
        # Decoder에서 하나씩 꺼내서 합쳐주는(concat) 거야.
        # skip_connections = []
        # for down in self.downs:
        #     x = down(x)
        #     skip_connections.append(x)
        #     x = self.pool(x)
        # ...
        return  # x

## 4. Train a model


In [None]:
# ====================================================================
# STEP 5: 학습 루프 (Training Loop)
# ====================================================================


def train_fn(loader, model, optimizer, loss_fn):
    # TODO: 1 에포크 동안의 학습을 진행하는 함수 구현
    # model.train()
    # loop = tqdm(loader)
    # for batch_idx, (data, targets) in enumerate(loop):
    #     data = data.to(device=DEVICE)
    #     targets = targets.to(device=DEVICE)

    #     # Forward
    #     predictions = model(data)
    #     loss = loss_fn(predictions, targets)

    #     # Backward
    #     optimizer.zero_grad()
    #     loss.backward()
    #     optimizer.step()

    #     # TODO: W&B에 loss 기록 (wandb.log)
    #     loop.set_postfix(loss=loss.item())
    pass


def validate_fn(loader, model, loss_fn, device):
    # TODO: validation 데이터에 대한 성능을 평가하고, 결과 이미지 몇 개를 시각화하는 함수 구현
    # model.eval()
    # with torch.no_grad():
    # ...
    # TODO: W&B에 validation loss와 결과 이미지 기록 (wandb.log)
    # model.train()
    pass


# ====================================================================
# 메인 실행부
# ====================================================================
# TODO: 모델, Loss 함수, Optimizer 정의하기
# model = UNET(in_channels=3, out_channels=3).to(DEVICE)
# loss_fn = nn.MSELoss() # 또는 nn.L1Loss()가 이미지 복원에 더 좋다고도 해
# optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# TODO: W&B 초기화 (wandb.init)
# wandb.init(project="denoising-autoencoder", config={...})


# 학습 시작!
for epoch in range(NUM_EPOCHS):
    print(f"--- Epoch {epoch+1}/{NUM_EPOCHS} ---")
    # train_fn(...)
    # validate_fn(...)

    # TODO: 매 에포크마다 또는 validation 성능이 가장 좋을 때 모델 저장하기
    # torch.save(model.state_dict(), "best_model.pth")

In [None]:
# ====================================================================
# STEP 6: 평가 및 시각화
# ====================================================================
# TODO: 저장된 best_model.pth 불러오기
# model.load_state_dict(...)

# TODO: 테스트 데이터셋에서 몇 개의 이미지를 가져와서
# [노이즈 이미지] vs [모델이 복원한 이미지] vs [원본 깨끗한 이미지]
# 이렇게 3개를 나란히 놓고 비교하는 시각화 코드 작성 (matplotlib 사용)