In [None]:
import os
import random
import glob 
import cv2
import pandas as pd
import numpy as np
import math
import pickle
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

import timm
from timm.scheduler.cosine_lr import CosineLRScheduler

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

In [None]:
# 하이퍼파라미터 
class CFG: 
    seed = 1212
    fold_number = 0
    
    batch_size = 64
    num_workers = 4
    
    learning_rate = 0.001
    weight_decay = 5e-4
    epoch = 30
    
    # scheduler = CosineLRScheduler일 경우 
    scheduler_t_initial = 10
    scheduler_cycle_mul = 2
    scheduler_cycle_decay = 0.5
    scheduler_lr_min = 1e-5

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

    # CuDNN 결정론적 옵션 설정
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 원하는 시드 값 설정
seed = CFG.seed
set_seed(seed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 디바이스 설정
print(device)

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

In [None]:
with open('train_folder_names_per_fold.pkl', 'rb') as f:
    train_folder_names_per_fold = pickle.load(f)

with open('valid_folder_names_per_fold.pkl', 'rb') as f:
    valid_folder_names_per_fold = pickle.load(f)

    
with open('train_labels_per_fold.pkl', 'rb') as f:
    train_labels_per_fold = pickle.load(f)
    
with open('valid_labels_per_fold.pkl', 'rb') as f:
    valid_labels_per_fold = pickle.load(f)

In [None]:
train_folder_names = train_folder_names_per_fold[CFG.fold_number]
valid_folder_names = valid_folder_names_per_fold[CFG.fold_number]

train_labels = train_labels_per_fold[CFG.fold_number]
valid_labels = valid_labels_per_fold[CFG.fold_number]

In [None]:
print(train_folder_names[:10])
print(train_labels[:10])

In [None]:
# train_image_list , valid_image_list, train_label_list , valid_label_list

# train
train_image_list=[]
train_label_list=[]

for i in range(len(train_labels)):
    train_folder_path = train_folder_names[i]
    train_label = train_labels[i]
    
    part_image = []
    part_label = []
    
    # mask (1,2,3,4,5)
    part_label += [train_label for _ in range(5)]
    part_image += glob.glob(os.path.join(train_folder_path,'mask*'))
    
    # incorrect_mask
    part_label += [train_label+6]
    part_image += glob.glob(os.path.join(train_folder_path,'incorrect_mask*'))
    
    # normal
    part_label += [train_label+12]
    part_image += glob.glob(os.path.join(train_folder_path,'normal*'))

    train_image_list+=part_image
    train_label_list+=part_label

# valid
valid_image_list=[]
valid_label_list=[]

for i in range(len(valid_labels)):
    valid_folder_path = valid_folder_names[i]
    valid_label = valid_labels[i]
    
    part_image = []
    part_label = []
    
    # mask (1,2,3,4,5)
    part_label += [valid_label for _ in range(5)]
    part_image += glob.glob(os.path.join(valid_folder_path,'mask*'))
    
    # incorrect_mask
    part_label += [valid_label+6]
    part_image += glob.glob(os.path.join(valid_folder_path,'incorrect_mask*'))
    
    # normal
    part_label += [valid_label+12]
    part_image += glob.glob(os.path.join(valid_folder_path,'normal*'))

    valid_image_list+=part_image
    valid_label_list+=part_label

In [None]:
print(len(train_image_list))
print(len(valid_image_list))
print(len(train_label_list))
print(len(valid_label_list))
print(train_image_list[:10])
print(train_label_list[:10])

In [None]:
class Train_Valid_Dataset(Dataset):
    def __init__(self,images_path,labels,transform):
        self.images_path = images_path
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.images_path)
    
    def __getitem__(self,idx):
        img_path = self.images_path[idx]
        label = self.labels[idx]
        
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        
        if self.transform is not None:
            img = self.transform(image=img)['image']
            
        return img, label

In [None]:
class TestDataset(Dataset):
    def __init__(self,images_path,transform):
        self.images_path = images_path
        self.transform = transform
        
    def __len__(self):
        return len(self.images_path)
    
    def __getitem__(self,idx):
        img_path = self.images_path[idx]
        
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
        
        if self.transform is not None:
            img = self.transform(image=img)['image']
            
        return img

In [None]:
# Albumentations 변환 함수 생성
train_transform = A.Compose([
    A.CenterCrop(height=384, width=384, p=1.0),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255, p=1.0),
    ToTensorV2(),
])

valid_transform = A.Compose([
    A.CenterCrop(height=384, width=384, p=1.0),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255, p=1.0),
    ToTensorV2(),
])

test_transform = A.Compose([
    A.CenterCrop(height=384, width=384, p=1.0),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255, p=1.0),
    ToTensorV2(),
])

In [None]:
train_dataset = Train_Valid_Dataset(train_image_list, train_label_list,transform=train_transform)
train_dataloader = DataLoader(train_dataset, batch_size=CFG.batch_size,num_workers=CFG.num_workers, shuffle=True)

valid_dataset = Train_Valid_Dataset(valid_image_list, valid_label_list,transform=valid_transform)
valid_dataloader = DataLoader(valid_dataset, batch_size=CFG.batch_size,num_workers=CFG.num_workers, shuffle=False)

## transforms visualize

In [None]:

def show_images(images, labels, preds):
    plt.figure(figsize=(10, 10))
    for i, image in enumerate(images):
        plt.subplot(5, 5, i + 1)
        image = image.permute(1, 2, 0).numpy()
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        image = std * image + mean
        image = np.clip(image, 0, 1)
        plt.imshow(image)
        plt.title(f"Label: {labels[i]} | Pred: {preds[i]}")
        plt.axis("off")
    plt.tight_layout()
    plt.show()

def visualize_dataset(dataloader, num_images=20):
    images, labels = next(iter(dataloader))
    images = images[:num_images]
    labels = labels[:num_images]
    preds = [None] * num_images
    show_images(images, labels, preds)

In [None]:
visualize_dataset(train_dataloader,20)

In [None]:
model = timm.create_model('efficientnet_b0', pretrained=True, num_classes=18).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=CFG.learning_rate, weight_decay = CFG.weight_decay)
scheduler = CosineLRScheduler(optimizer, t_initial=2, cycle_limit = 100, cycle_mul=CFG.scheduler_cycle_mul, cycle_decay=CFG.scheduler_cycle_decay, lr_min=CFG.scheduler_lr_min)

In [None]:
# Train

for epoch in range(CFG.epoch):
    model.train()
    train_loss = 0.0
    
    for batch_idx, (images,labels) in enumerate(tqdm(train_dataloader)):
        images = images.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(images) # (B,18)
        loss = criterion(outputs, labels)
        loss.backward()
        
        train_loss += loss.item() * len(images)
        
        if batch_idx % 10 == 0:
            print(f'Epoch : {epoch+1} | Iter : ({batch_idx+1}/{len(train_dataloader)}) | train_loss : {train_loss / ((batch_idx + 1) * len(images))}, now_lr = {get_lr(optimizer)}')
            
        optimizer.step()
        scheduler.step(epoch + batch_idx/len(train_dataloader))
        
    model.eval()
    valid_loss = 0.0
    total_correct = 0
    best_accuracy = 0.0
    
    with torch.no_grad():
        for batch_idx, (images,labels) in enumerate(tqdm(valid_dataloader)):
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images) # (B,18)
            loss = criterion(outputs, labels)
            _, predicted = torch.max(outputs, dim = -1, keepdim=False)
            
            valid_loss += loss.item() * len(images)
            
            correct = (predicted == labels).sum().item()
            
            total_correct += correct
            
    accuracy = total_correct / len(valid_dataset)
    valid_loss = valid_loss / len(valid_dataset)
        
    print(f'Epoch : {epoch+1} | valid_loss = {valid_loss} | valid_accuracy = {accuracy}')
    
        # 모델 성능이 향상되면 저장
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        torch.save(model.state_dict(), f"best_model.pth")
        print(f'save pth {epoch}epoch')
        
    torch.save(model.state_dict(), "last_model.pth")