# 제 2회 컴퓨터 비전 학습 경진 대회
- 대회 기간 : 2021.01.27 ~ 2021.03.01 17:59 
- public score : 0.86792
- private score : 0.86733
- 사용 모델 : efficient net-b7
- 대회 task : 이미지 분류

# 라이브러리 로딩

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
from tqdm import tqdm
import os
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import random
import time

from sklearn.model_selection import train_test_split

# 시드고정

In [None]:
def seed_everything(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True


seed_everything()

In [None]:
# numpy를 tensor로 변환하는 ToTensor 정의
class ToTensor(object):
    """numpy array를 tensor(torch)로 변환합니다."""

    def __call__(self, sample):
        image, label = sample['image'], sample['label']
        # swap color axis because
        # numpy image: H x W x C
        # torch image: C X H X W
        image = image.transpose((2, 0, 1))
        return {'image': torch.FloatTensor(image),
                'label': torch.FloatTensor(label)}


class DatasetMNIST(torch.utils.data.Dataset):
    def __init__(self,
                 dir_path,
                 meta_df,
                 augmentations=None):

        self.dir_path = dir_path  # 데이터의 이미지가 저장된 디렉터리 경로
        self.meta_df = meta_df  # 데이터의 인덱스와 정답지가 들어있는 DataFrame
        self.augmentations = augmentations  # Augmentation

    def __len__(self):
        return len(self.meta_df)

    def __getitem__(self, index):
        # 폴더 경로 + 이미지 이름 + .png => 파일의 경로
        # 참고) "12".zfill(5) => 000012
        #       "146".zfill(5) => 000145
        # cv2.IMREAD_GRAYSCALE : png파일을 채널이 1개인 GRAYSCALE로 읽음
        image = cv2.imread(self.dir_path +
                           str(self.meta_df.iloc[index, 0]).zfill(5) + '.png',
                           cv2.IMREAD_GRAYSCALE)
        # 0 ~ 255의 값을 갖고 크기가 (256,256)인 numpy array를
        # 0 ~ 1 사이의 실수를 갖고 크기가 (256,256,1)인 numpy array로 변환
        image = (image/255).astype('float32')[..., np.newaxis]

        # 정답 numpy array생성(존재하면 1 없으면 0)
        label = self.meta_df.iloc[index, 1:].values.astype('float')
        sample = {'image': image, 'label': label}

        # augmentation 적용
        if self.augmentations:
            sample['image'] = self.augmentations(sample['image'])
        # sample 반환
        return sample

In [None]:
def get_split_data(df):
    """
    train 데이터를 8:2 비율로 train과 valid 데이터로 나누는 함수
    """

    X, y = df.loc[:,'index':'y'], df['z']
    train_x, valid_x, train_y, valid_y = train_test_split(
        X, y,  test_size=0.2)

    train_x['z'] = train_y
    valid_x['z'] = valid_y

    return train_x, valid_x


def get_dataloader(dirty_mnist_answer,mode,batch_size):
    """
    데이터프레임을 dataloader형태로 반환하는 함수
    """
    # augmentations 적용 
    train_augmentations = transforms.Compose([
            transforms.ToPILImage(),        
            transforms.RandomHorizontalFlip(p=0.6),
            transforms.RandomVerticalFlip(0.6),
            transforms.RandomRotation(40),
            transforms.ToTensor(),
            ])
    valid_augmentations = transforms.Compose([
        transforms.ToPILImage(),
        transforms.ToTensor(),
        ])
    
    if mode == 'TRAIN':
        train,valid = get_split_data(dirty_mnist_answer)
        
        # Data Loader
        train_dataset = DatasetMNIST("dirty_mnist/", train, augmentations =train_augmentations)
        valid_dataset = DatasetMNIST("dirty_mnist/", valid, augmentations = valid_augmentations)

        train_data_loader = DataLoader(
            train_dataset,
            batch_size = batch_size,
            shuffle = True,
            num_workers = 3
        )
        valid_data_loader = DataLoader(
            valid_dataset,
            batch_size = batch_size,
            shuffle = False,
            num_workers = 3
        )
        return train_data_loader, valid_data_loader
    
    else :
        test_dataset = DatasetMNIST("test_dirty_mnist/", dirty_mnist_answer)
        test_data_loader = DataLoader(
            test_dataset,
            batch_size = batch_size,
            shuffle = False,
            num_workers = 3,
            drop_last = False
        )
        return test_data_loader
        

    

In [None]:
def train_model(model, dataloaders_dict, optimizer, num_epochs, device):
    """
    train 데이터로 모델을 학습하고 valid 데이터로 모델을 검증하는 코드

    파라미터
    ---
    model : 
        학습할 모델
    dataloaders_dict : dict
        train_dataloader과 validation_datalodaer가 들어 있는 dictonary
    optimizer : 
        최적화 함수
    num_epochs : int
        학습 횟수
    device : cuda or cpu
        모델을 학습할 때 사용할 장비

    returns 
    best_model :
        검증데이터 셋 기준으로 가장 성능이 좋은 모델
    ---
    """
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                   step_size=5,
                                                   gamma=0.9)
    model.to(device)
    torch.cuda.empty_cache()
    critrion = torch.nn.BCELoss()
    # 학습이 어느정도 진행되면 gpu 가속화
    #torch.backends.cudnn.benchmark = False

    # loss가 제일 낮은 모델을 찾기위한 변수
    best_val_loss = int(1e9)
    for epoch in range(num_epochs):
        # epoch 별 학습 및 검증

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # 모델을 학습 모드로
            else:
                model.eval()   # 모델을 추론 모드로

            epoch_loss = 0.0  # epoch loss
            epoch_corrects = 0  # epoch 정확도
            for i,  batch in enumerate(dataloaders_dict[phase]):
                images, labels = batch['image'], batch['label']
                # tensor를 gpu에 올리기
                images = images.to(device)
                labels = labels.to(device)

                # 옵티마이저 초기화 초기화
                optimizer.zero_grad()

                # 순전파 계산
                with torch.set_grad_enabled(phase == 'train'):
                    probs = model(images)
                    loss = critrion(probs, labels)

                    # 학습시 역전파
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                    # 결과 계산
                    # loss계산
                    epoch_loss += loss.item() * len(probs)
                    # 정확도 계산
                    # train accuracy 계산
                    probs = probs.cpu().detach().numpy()
                    labels = labels.cpu().detach().numpy()
                    preds = probs > 0.5
                    batch_acc = (labels == preds).mean()
                    epoch_corrects += batch_acc
            # epoch별 loss 및 정확도

            epoch_loss = epoch_loss / len(dataloaders_dict[phase])
            epoch_acc = 100 * epoch_corrects / \
                len(dataloaders_dict[phase])

            print('Epoch {}/{} | {:^5} |  Loss: {:.4f} Acc: {:.4f}'.format(epoch+1, num_epochs,
                                                                           phase, epoch_loss, epoch_acc))

            # 검증 오차가 가장 적은 최적의 모델을 저장
            if not best_val_loss or epoch_loss < best_val_loss:
                best_val_loss = epoch_loss
                best_model = model

            lr_scheduler.step()

    return best_model

In [None]:
from model import effnet_b7
from model import regnet
from model import mobilenet_v3

#model = regnet.MultiLabelRegnet()
#model = mobilenet_v3.moblienet_v3()

# 제일 성능이 좋은 effnet 사용.
model = effnet_b7.MultiLabeleffnet()

In [None]:
dirty_mnist_answer = pd.read_csv("dirty_mnist_2nd_answer.csv")
# dirty_mnist라는 디렉터리 속에 들어있는 파일들의 이름을
# namelist라는 변수에 저장
namelist = os.listdir('./dirty_mnist/')

best_models = []
# 교차 검증을 진행할 K
K = 1
for i in range(K):

    train_loader, val_loader = get_dataloader(
        dirty_mnist_answer, mode='TRAIN', batch_size=8)
    #  dict 형식으로 data loader 정의
    dataloaders_dict = {"train": train_loader, "val": val_loader}
    num_epochs = 30
    max_grad_norm = 1
    learning_rate = 5e-5

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print('사용하는 device :', device)
    print('--------------------', i+1, '/', K,
          '- fold start--------------------')

    model = MultiLabeleffnet()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    best_model = train_model(model, dataloaders_dict,
                             optimizer, num_epochs, device)
    best_models.append(best_model)

# test데이터 예측

In [None]:
def inference(best_models, prediction_df, test_dataloader):
    """
    test데이터 셋을 inference 하는 코드

    파라미터
    ---
    best_models : 
        k개의 fold에서 가장 성능 좋은 모델들
    prediction_df : dataframe
        예측을 후 결과 값을 채우기 위해 필요한 dataframe
    test_dataloader : dataloader
        mini batch를 위한 data loader
    returns
    ---
    probs_list : list
        test 데이터의 예측값을 가지고 있는 list
    """
    probs_list = []
    # 5개의 fold마다 가장 좋은 모델을 이용하여 예측
    for model in best_models:
        # 0으로 채워진 array 생성
        probs_array = np.zeros([prediction_df.shape[0],
                                prediction_df.shape[1] - 1])

        for idx, batch in enumerate(test_dataloader):
            with torch.no_grad():
                # 추론
                model.eval()
                images = batch['image']
                images = images.to(device)
                probs = model(images)
                probs = probs.cpu().detach().numpy()

                # 예측 결과를
                # prediction_array에 입력
                batch_index = batch_size * idx
                probs_array[batch_index: batch_index + images.shape[0], :]\
                    = probs
        probs_list.append(probs_array[..., np.newaxis])
    return probs_list

In [None]:
# test Dataset 정의
sample_submission = pd.read_csv("sample_submission.csv")
batch_size = 128
test_dataloader = get_dataloader(
    sample_submission, 'TEST', batch_size=batch_size)

prediction_df = pd.read_csv("sample_submission.csv")

In [None]:
prob_list = inference(best_models, prediction_df,test_dataloader)
prob_list = np.concatenate(prob_list, axis = 2)
probs_mean = prob_list.mean(axis = 2)
probs_mean = (probs_mean > 0.5) * 1

# 정답파일 생성

In [None]:
sample_submission = pd.read_csv("sample_submission.csv")
sample_submission.iloc[:,1:] = probs_mean