# 라이브러리 임포트

In [None]:
#! pip install timm

In [4]:
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import cv2
import matplotlib.pyplot as plt
import numpy as np
import torchvision.transforms as T
import timm
from sklearn.metrics import accuracy_score, f1_score
from glob import glob
import os
from tqdm import tqdm

# Configuration 설정

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
! unzip /content/drive/MyDrive/yeardream_2nd/train.zip -d /content/drive/MyDrive/yeardream_2nd

In [None]:
! unzip /content/drive/MyDrive/yeardream_2nd/test.zip -d /content/drive/MyDrive/yeardream_2nd

In [None]:
class dotdict(dict):
    """dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__


cfg = dotdict(
    device='cuda',#'cuda' #cpu
    batch_size=8,
    epochs=20,
    lr=1e-4,)

In [None]:
# 데이터셋 경로 설정
data_dir = '/content/drive/MyDrive/yeardream_2nd'
train_dir = data_dir + '/train'
test_dir = data_dir + '/test'

# Simple EDA

In [None]:
# 데이터 수량 체크
train_defect_images = glob(train_dir + '/defect_images/*.png')
train_normal_images = glob(train_dir + '/normal_images/*.png')

print(f'total number of train dataset : {len(train_defect_images) + len(train_normal_images)}, defect : {len(train_defect_images)}, normal : {len(train_normal_images)}')

In [None]:
# 클래스 분포 체크
plt.bar(['defect', 'normal'], [len(train_defect_images), len(train_normal_images)], color=['red', 'blue'])
plt.title('dist. of train dataset')

# 데이터 분할

In [None]:
total_dataset = train_defect_images + train_normal_images
label = [1] * len(train_defect_images) + [0] * len(train_normal_images)

# Stratified split
X_train, X_val, y_train, y_val = train_test_split(train_defect_images+train_normal_images, label, test_size=0.3, random_state=2025, stratify=label)

print(f'train dataset : {len(X_train)}, val dataset : {len(X_val)}')
print(f'train label : {len(y_train)}, val label : {len(y_val)}')

# Pytorch CustomDataset 클래스 정의

In [None]:
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        image = image / 255.0

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        return image, label

In [None]:
train_dataset = CustomDataset(X_train, y_train)
test_dataset = CustomDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=cfg.batch_size, shuffle=True, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=cfg.batch_size, shuffle=False, num_workers=2, pin_memory=True)

In [None]:
# 데이터 샘플 체크
for batch in train_loader:
    imgs, labels = batch
    fig, axs = plt.subplots(ncols=len(imgs), squeeze=False) # 총 사진의 개수만큼 plot

    for i, img in enumerate(imgs):
        axs[0, i].imshow(img.squeeze(), cmap='gray')

    break

# CNN Model 정의 (resnet18d)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.encoder =  timm.create_model('resnet18d', pretrained=True, in_chans=1)
        # self.encoder =  timm.create_model('resnet34d', pretrained=True, in_chans=1)

        self.head = nn.Linear(1000, 1)

    def forward(self, image, mode='train'):
        x = self.encoder(image)
        output = self.head(x)
        output = torch.sigmoid(output)
        return output

# 모델, Loss, Optimizer 선언

In [None]:
model = Net()

model = model.to(cfg.device)

criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=cfg.lr)

# 학습 모델 저장 경로 설정

In [None]:
model_dir = './models'
os.makedirs(model_dir, exist_ok=True)

# 학습 및 검증

In [None]:
# Train and Valid Loop

metric_best = 0.
model_file = os.path.join(model_dir, f'best.pt')

for epoch in range(cfg.epochs):
    # Train Loop
    train_loss = 0
    train_outputs = []
    train_labels = []
    for batch in tqdm(train_loader, desc=f'train-{epoch}'):
        imgs, labels = batch

        imgs = imgs.to(cfg.device).float()
        labels = labels.to(cfg.device)

        optimizer.zero_grad()

        output = model(imgs.unsqueeze(1))
        loss = criterion(output, labels.unsqueeze(1).float())

        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        train_outputs.append(output.cpu().detach())
        train_labels.append(labels.cpu().detach())

    train_loss /= len(train_loader)

    # Validation Loop
    val_loss = 0
    val_outputs = []
    val_labels = []
    with torch.no_grad():
        for batch in tqdm(test_loader, desc=f'val-{epoch}'):
            imgs, labels = batch

            imgs = imgs.to(cfg.device).float()
            labels = labels.to(cfg.device)

            output = model(imgs.unsqueeze(1))
            loss = criterion(output, labels.unsqueeze(1).float())

            val_loss += loss.item()
            val_outputs.append(output.cpu().detach())
            val_labels.append(labels.cpu().detach())

    val_loss /= len(test_loader)
    train_outputs = (torch.cat(train_outputs) > 0.5).float().squeeze(-1)
    val_outputs = (torch.cat(val_outputs) > 0.5).float().squeeze(-1)
    train_labels = torch.cat(train_labels)
    val_labels = torch.cat(val_labels)

    train_acc = accuracy_score(train_labels, train_outputs)
    val_acc = accuracy_score(val_labels, val_outputs)

    train_f1 = f1_score(train_labels, train_outputs, average='macro')
    val_f1 = f1_score(val_labels, val_outputs, average='macro')

    if val_f1 > metric_best:
        print(f'metric_best ({metric_best:.6f} --> {val_f1:.6f}). Saving model ...')
        torch.save(model.state_dict(), model_file)
        metric_best = val_f1

    print(f'Epoch: {epoch}, Train Loss: {train_loss}, Val Loss: {val_loss}, Train Acc: {train_acc}, Val Acc: {val_acc}, Train F1: {train_f1}, Val F1: {val_f1}')


# Pytorch CustomTestDataset 클래스 정의

In [None]:
class CustomTestDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):

        image_path = self.image_paths[idx]
        image_name = os.path.basename(image_path)
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        image = image / 255.0

        if self.transform:
            image = self.transform(image)

        return image, image_name

In [None]:
test_images = glob(test_dir + '/images/*.png')

print(f'the number of test images : {len(test_images)}')

In [None]:
test_dataset = CustomTestDataset(test_images)
test_loader = DataLoader(test_dataset, batch_size=cfg.batch_size, shuffle=False, num_workers=2, pin_memory=True)

# 학습된 모델 로드

In [None]:
model_dir = './models'
model_file = os.path.join(model_dir, f'best.pt')

model = Net()
model.load_state_dict(torch.load(model_file))
model = model.to(cfg.device)

# 추론 수행

In [None]:
image_names = []
test_outputs = []
with torch.no_grad():
    for i, batch in enumerate(tqdm(test_loader)):
        imgs, image_name = batch
        imgs = imgs.to(cfg.device).float()

        output = model(imgs.unsqueeze(1))
        test_outputs.append(output.cpu().detach())
        image_names.extend(image_name)

test_outputs = (torch.cat(test_outputs) > 0.5).int().squeeze(-1)

# 추론 결과 저장

In [None]:
submission = pd.DataFrame({'ImageId': image_names, 'answer': test_outputs.tolist()})
submission = submission.sort_values(by=['ImageId']).reset_index(drop=True)

In [None]:
submission.to_csv('submission.csv', index=False)

In [None]:
submission

In [None]:
submission['answer'].value_counts()