In [1]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import transforms, datasets
from torch.utils import data
from sklearn.model_selection import StratifiedShuffleSplit
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
import torch.nn as nn
import timm
import torch

DATA_PATH = os.path.join(os.getcwd(), 'ML_Data')
LABEL_DICT = {'dog':0, 'elephant':1, 'giraffe':2,'guitar':3,'horse':4,'house':5,'person':6}
LABEL_DICT2 = {'dog':0, 'person':1, 'giraffe':2, 'guitar':3, 'house':4, 'horse':5, 'elephant':6}
TO_LABEL = {v: k for k, v in LABEL_DICT.items()}
num_classes = 7
batch_size = 32
num_epochs = 10
base_lr = 0.0001
valid_size = 0.2

#### 학습 파일 정리

In [2]:
train_dir = os.path.join(DATA_PATH, 'train')
new_train_dir = os.path.join(train_dir, 'train')

# train 파일을 만들어 train데이터를 모두 모아넣습니다.
if not os.path.exists(new_train_dir):
    import shutil
    
    label_dirs = os.listdir(train_dir)
    os.makedirs(new_train_dir)
    for label_dir in label_dirs:
        if label_dir[0] != '.':
            now_dir = os.path.join(train_dir, label_dir)
            img_list = os.listdir(now_dir)
            
            for jpg_file in img_list:
                shutil.copy(os.path.join(now_dir, jpg_file), new_train_dir)
                
                new_name = label_dir + '_' + jpg_file.split('_')[1]
                os.rename(os.path.join(new_train_dir, jpg_file), os.path.join(new_train_dir, new_name))

In [3]:
def get_label(file_name):
    return file_name.split('_')[0]

In [4]:
# 학습 데이터 전처리(normalization, random_crop)
def get_transform(random_crop=True):
    transform = []
    transform.append(transforms.Resize(256))
    if random_crop:
        transform.append(transforms.RandomResizedCrop(224))
        transform.append(transforms.RandomHorizontalFlip())
    else:
        transform.append(transforms.CenterCrop(224))
    transform.append(transforms.ToTensor())
    mean = [x / 255.0 for x in [125.3, 123.0, 113.9]]
    std = [x / 255.0 for x in [63.0, 62.1, 66.7]]
    transform.append(transforms.Normalize(mean= mean, std= std))
    return transforms.Compose(transform)

In [5]:
# 이미지를 불러오는 함수입니다
def img_loader(path):
    try:
        with open(path, 'rb') as f:
            img = Image.open(f)
            return img.convert('RGB')
    except FileNotFoundError as e:
        raise FileNotFoundError(e)

In [6]:
# 학습, 검증 데이터셋입니다
class TrainDataset(data.Dataset):
    def __init__(self, valid_split= 0.2, random_crop=True):
        self.get_mode = 'train'
        self.path = os.path.join(new_train_dir)
        self.transform = get_transform(random_crop = random_crop)

        self.data_names = os.listdir(self.path)
        self.labels = [get_label(jpg) for jpg in self.data_names]

        # 계층적 샘플링을 통해 균형있게 학습, 검증 데이터로 나눕니다.
        split = StratifiedShuffleSplit(n_splits=1, test_size = valid_split, random_state= 99)
        self.train_data, self.train_label = [], []
        self.valid_data, self.valid_label = [], []
        for train_idx, valid_idx in split.split(pd.Series(self.data_names), self.labels):
            self.train_data = list(map(lambda idx: self.data_names[idx], train_idx))
            self.train_label = list(map(lambda idx: self.labels[idx], train_idx))
            self.valid_data = list(map(lambda idx: self.data_names[idx], valid_idx))
            self.valid_label = list(map(lambda idx: self.labels[idx], valid_idx))
    
    def change_get_mode(self,mode='train'):
        self.get_mode = mode

    def __getitem__(self, index):
        # 학습, 검증 모드에 따라
        if self.get_mode == 'train':
            file_name, target = self.train_data[index], self.train_label[index]
        else:
            file_name, target = self.valid_data[index], self.valid_label[index]
        path = os.path.join(new_train_dir, file_name)
        
        img = img_loader(path) # 이미지 로딩
        sample = self.transform(img) # 이미지 전처리
        
        return sample, target
    
    def __len__(self):
        if self.get_mode == 'train':
            return len(self.train_data)
        else:
            return len(self.valid_data)

In [7]:
# 테스트 데이터셋입니다
class TestDataset(data.Dataset):
    def __init__(self):
        self.path = os.path.join(DATA_PATH, 'test', '0')
        self.transform = get_transform(random_crop = False)
        self.file_names = os.listdir(self.path)

    def __getitem__(self, index):
        path = os.path.join(self.path, self.file_names[index])
        img = img_loader(path)
        sample = self.transform(img)

        return sample
    
    def __len__(self):
        return len(self.file_names)

모델은 전처리된 efficientnet v2를 이용하여 분류기만 전이 학습을 진행합니다.  
손실 함수: cross entropy loss을 사용했습니다.  
최적화 함수: Adam을 이용하여 각 파라미터마다 다른 크기로 업데이트합니다.  
스케쥴러: StepLR를 이용하여 학습률을 관리합니다. 

In [8]:
model = timm.create_model('tf_efficientnetv2_b3', pretrained=True)

num_ftrs = model.classifier.in_features
model.classifier = nn.Linear(in_features=num_ftrs, out_features=num_classes)

loss_fn = nn.CrossEntropyLoss()
optimizer = Adam([param for param in model.parameters() if param.requires_grad], lr =base_lr, weight_decay =1e-4)
scheduler = StepLR(optimizer, step_size=3, gamma=0.1)

In [9]:
def valid_fit(loaded_dataset, model, loss_fn):
    for iter_, (data, label) in enumerate(loaded_dataset):
        print(f'\r{iter_} valid', end='')
        val_pred = model(data)
        val_loss = loss_fn(val_pred, torch.tensor(list(map(lambda x: LABEL_DICT[x], label))))
        return val_loss

In [None]:
dataset = TrainDataset(valid_split=valid_size)

loaded_dataset = data.DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)

global_iter = 0
loss = 0
for epoch in range(num_epochs):
    model.train()
    dataset.change_get_mode('train')
    for iter_, (data, label) in enumerate(loaded_dataset):
        global_iter += iter_
        print(f'\r training... {global_iter}', end='')
        pred = model(data)
        loss = loss_fn(pred, torch.tensor(list(map(lambda x: LABEL_DICT[x], label))))
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    dataset.change_get_mode('valid')
    val_loss = valid_fit(loaded_dataset, model, loss_fn)
    """
    for iter_, (data, label) in enumerate(loaded_dataset):
        global_iter += iter_
        
        val_pred = model(data)
        val_loss = loss_fn(val_pred, torch.tensor(list(map(lambda x: LABEL_DICT[x], label))))
    """
    print(f'\r ------')
    print(f"Epoch [{epoch+1}/{num_epochs}] => loss: {loss}, val_loss:{val_loss}")
    scheduler.step()
    
    torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
            }, DATA_PATH + '/model'+str(epoch)+'.pt')

 ------ng... 903
Epoch [1/10] => loss: 1.2941467761993408, val_loss:1.2395766973495483
 ------ng... 1806
Epoch [2/10] => loss: 0.5186840891838074, val_loss:0.8862184882164001
 ------ng... 2709
Epoch [3/10] => loss: 0.6859062314033508, val_loss:0.48008179664611816
 ------ng... 3612
Epoch [4/10] => loss: 0.40599870681762695, val_loss:0.6465213894844055
 ------ng... 4515
Epoch [5/10] => loss: 0.6797652244567871, val_loss:0.5857230424880981
 ------ng... 5418
Epoch [6/10] => loss: 0.3869991600513458, val_loss:0.2531175911426544
 training... 5419

In [None]:
loaded_test_dataset = data.DataLoader(dataset=TestDataset(), batch_size=batch_size, shuffle=False)
model.eval()

In [None]:
tf = transforms.ToPILImage()

In [None]:
answers = []
for i, data in enumerate(loaded_test_dataset):
    fc0 = model(data)
    fc = fc0.squeeze().detach().cpu().numpy()
    
    for answer in fc:
        answers.append(np.argmax(answer))
    
    if i == 0:
        print(len(data))
        for i in range(3):
            t = tf(data[i])
            t.show()
            print(fc0[i])
            print(fc[i])
            mx = np.argmax(fc[i])
            print(mx)
            print(TO_LABEL[mx])
    

In [None]:
pdcsv = pd.DataFrame(pd.Series(answers), columns=['answer value'])
pdcsv.to_csv(DATA_PATH + '/test_answer.csv')