# Document Type Classification 2조

### Contents
- Import Library & Loading Data
- EDA
- Data Preprocessing
- Define Functions
- Modeling
- Train Model
- Result
- Save File

## 1. Import Library

In [23]:
import os
import time
import random
import timm
import torch
import albumentations as A
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
import albumentations as A
import cv2
from albumentations.pytorch import ToTensorV2
from torch.optim import Adam, AdamW
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
import augraphy
from sklearn.model_selection import KFold
from glob import glob
from matplotlib.gridspec import GridSpec
from timm import create_model
import wandb

In [24]:
class Config:
    model_name = "resnet50"
    exp_name = f"{model_name}-stepwise"
    img_size = 224
    lr = 1e-3
    step_size = 10  # Step size for LR scheduler
    gamma = 0.1  # LR 감소율
    epochs = 3
    batch_size = 64
    num_workers = 0
    wd = 1e-4
    alpha = 0.4
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    data_dir = "/Users/choesumin/Documents/upstage/data"
    train_images_dir = os.path.join(data_dir, "train")
    test_images_dir = os.path.join(data_dir, "test")
    train_csv = os.path.join(data_dir, "train.csv")
    meta_csv = os.path.join(data_dir, "meta.csv")
    sample_submission_csv = os.path.join(data_dir, "sample_submission.csv")
    best_model = os.path.join(data_dir, "checkpoints", f"{model_name}_best_model.pth")

config = Config()


In [25]:
def balance_classes(df, target_column="target"):
    class_counts = df[target_column].value_counts()
    class_weights = 1. / class_counts
    sample_weights = df[target_column].map(class_weights).values
    return sample_weights

In [26]:
def mixup_data(x, y, alpha=config.alpha):
    lam = np.random.beta(alpha, alpha) if alpha > 0 else 1
    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

In [27]:
# Augraphy 및 Albumentations 증강 적용
trn_transform = A.Compose([
    A.Resize(height=config.img_size, width=config.img_size),
        A.OneOf([
            A.GaussNoise(var_limit=(10.0, 800.0), p=0.75),
            A.GaussianBlur(blur_limit=(1, 7), p=0.5)
        ], p=0.75),
        A.RandomRotate90(p=0.5),
        A.HorizontalFlip(p=0.75),
        A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2, p=0.5),
        A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=30, p=0.25),
        A.CoarseDropout(max_holes=6, max_height=32, max_width=32, p=0.5),
        A.ElasticTransform(alpha=1, sigma=30, alpha_affine=30, p=0.5),
        A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.Rotate(limit=30, p=0.75),
        A.CLAHE(clip_limit=2.0, tile_grid_size=(8, 8), p=0.5),
        A.MotionBlur(blur_limit=5, p=0.5),
        A.OpticalDistortion(p=0.5),
        A.Transpose(p=0.5),
        A.Normalize(mean=[0.5743355787358306, 0.583304060105453, 0.588189268004516], std=[0.18964056010820557, 0.18694252072057746, 0.1850691924647016]),
        ToTensorV2(),
])

tst_transform = A.Compose([
    A.Resize(config.img_size, config.img_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2()
])

In [28]:
class ImageDataset(Dataset):
    def __init__(self, df, path, transform=None):
        self.df = df.values
        self.path = path
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        name, target = self.df[idx]
        img = np.array(Image.open(os.path.join(self.path, name)))
        if self.transform:
            img = self.transform(image=img)["image"]
        return img, target

In [29]:
# 모델 학습 함수
def train(loader, model, optimizer, loss_fn, device):
    model.train()
    train_loss = 0
    preds_list = []
    targets_list = []

    pbar = tqdm(loader)
    for image, targets in pbar:
        image = image.to(device)
        targets = targets.to(device)

        model.zero_grad(set_to_none=True)

        preds = model(image)
        loss = loss_fn(preds, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.detach().cpu().numpy())

        pbar.set_description(f"Train Loss: {loss.item():.4f}")

    train_loss /= len(loader)
    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average="macro")

    return train_loss, train_acc, train_f1

In [30]:
# 모델 검증 함수
def validate(loader, model, loss_fn, device):
    model.eval()
    val_loss = 0
    preds_list, targets_list = [], []
    
    wrong_images, wrong_preds, wrong_labels = [], [], []
    total_preds, total_targets = [], []

    with torch.no_grad():
        pbar = tqdm(loader)
        for image, targets in pbar:
            image = image.to(device)
            targets = targets.to(device)

            preds = model(image)
            loss = loss_fn(preds, targets)

            val_loss += loss.item()
            
            preds_np = preds.argmax(dim=1).cpu().numpy()
            targets_np = targets.cpu().numpy()
            for i in range(len(preds_np)):
                total_preds.append(preds_np[i])
                total_targets.append(targets_np[i])
                if preds_np[i] != targets_np[i]:
                    wrong_images.append(image[i].cpu().numpy().transpose(1, 2, 0))
                    wrong_preds.append(preds_np[i])
                    wrong_labels.append(targets_np[i])

            preds_list.extend(preds_np)
            targets_list.extend(targets_np)
            pbar.set_description(f"Valid Loss: {loss.item():.4f}")

    val_loss /= len(loader)
    val_acc = accuracy_score(targets_list, preds_list)
    val_f1 = f1_score(targets_list, preds_list, average="macro")

    return val_loss, val_acc, val_f1, wrong_images, wrong_preds, wrong_labels, total_preds, total_targets

In [31]:
def test(loader, model, device):
    test_preds_list = []
    model.eval()

    with torch.no_grad():
        for image, _ in tqdm(loader, desc="Testing"):
            image = image.to(device)

            preds = model(image)
            test_preds_list.extend(preds.argmax(dim=1).cpu().numpy())

            torch.cuda.empty_cache()

    test_df = pd.DataFrame(tst_dataset.df, columns=["ID", "target"])
    test_df["target"] = test_preds_list

    sample_submission_df = pd.read_csv(os.path.join(config.data_dir, "sample_submission.csv"))
    if not (sample_submission_df["ID"] == test_df["ID"]).all():
        raise ValueError("Mismatch in IDs between submission and test predictions.")

    test_df.to_csv(config.submission_output_csv, index=False)

    print("Test predictions saved successfully.")

In [37]:
SEED = 42
os.environ["PYTHONHASHSEED"] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True

In [39]:
train_df = pd.read_csv(os.path.join(config.data_dir, "train.csv"))
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=SEED, stratify=train_df["target"])

In [43]:
# 모델 및 학습 초기화
if __name__ == "__main__":
    model = timm.create_model(config.model_name, pretrained=True, num_classes=17).to(config.device)
    optimizer = torch.optim.SGD(model.parameters(), lr=config.lr, momentum=0.9, weight_decay=config.wd)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=config.step_size, gamma=config.gamma)
    loss_fn = nn.CrossEntropyLoss()

    # 데이터 불균형 해결을 위한 샘플링 가중치 계산
    train_df = pd.read_csv(config.train_csv)
    sample_weights = balance_classes(train_df)
    sampler = torch.utils.data.WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights))

    # 데이터 로더
    trn_loader = DataLoader(ImageDataset(train_df, config.train_images_dir, trn_transform),
                            batch_size=config.batch_size, sampler=sampler, num_workers=config.num_workers)
    val_df = train_test_split(train_df, test_size=0.2, stratify=train_df["target"])[1]
    val_loader = DataLoader(ImageDataset(val_df, config.train_images_dir, tst_transform),
                            batch_size=config.batch_size, shuffle=False, num_workers=config.num_workers)

    for epoch in range(3):
        train_loss, train_acc, train_f1 = train(trn_loader, model, optimizer, loss_fn, config.device)
        val_loss, val_acc, val_f1, _, _, _, _, _ = validate(val_loader, model, loss_fn, config.device)  # 필요한 값만 변수에 저장
        scheduler.step()

        print(f"Epoch {epoch+1}/{config.epochs}")
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Train F1: {train_f1:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}, Val F1: {val_f1:.4f}")


Train Loss: 2.8413: 100%|███████████████████████| 25/25 [07:36<00:00, 18.27s/it]
Valid Loss: 2.8222: 100%|█████████████████████████| 5/5 [00:39<00:00,  7.84s/it]


Epoch 1/3
Train Loss: 2.8332, Train Acc: 0.0720, Train F1: 0.0254
Val Loss: 2.8154, Val Acc: 0.1051, Val F1: 0.0372


Train Loss: 2.8456: 100%|███████████████████████| 25/25 [07:16<00:00, 17.47s/it]
Valid Loss: 2.8091: 100%|█████████████████████████| 5/5 [00:38<00:00,  7.75s/it]


Epoch 2/3
Train Loss: 2.8230, Train Acc: 0.0624, Train F1: 0.0327
Val Loss: 2.8030, Val Acc: 0.0924, Val F1: 0.0345


Train Loss: 2.8077: 100%|███████████████████████| 25/25 [07:31<00:00, 18.04s/it]
Valid Loss: 2.7955: 100%|█████████████████████████| 5/5 [00:39<00:00,  7.82s/it]

Epoch 3/3
Train Loss: 2.8076, Train Acc: 0.1057, Train F1: 0.0797
Val Loss: 2.7918, Val Acc: 0.1146, Val F1: 0.0609



