In [1]:
import os
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
from torchvision.transforms import Resize, ToTensor, Normalize

import glob
from tqdm.notebook import tqdm

In [2]:
# 테스트 데이터셋 폴더 경로를 지정해주세요.
test_dir = '/opt/ml/input/data/eval'
test_dir_glob = glob.glob(test_dir + '/images/*')
## png, jpg 파일 동시에 존재
train_dir = '/opt/ml/input/data/train'
train_dir_glob = glob.glob(train_dir + '/images/**/*')

test_csv = '/opt/ml/input/data/eval/info.csv'
train_csv = '/opt/ml/input/data/train/train.csv'

count = 0
for files in test_dir_glob:
    if count == 10:
        break
    if files[-3:] == 'jpg':
        print(files)
        count += 1

/opt/ml/input/data/eval/images/9d066f76ec0c1ff3f0ce2a46dd3eb94a9161fa31.jpg
/opt/ml/input/data/eval/images/bc979e75fc0353d092603a9e36a22eb4b7051926.jpg
/opt/ml/input/data/eval/images/8a2c7353245b9f03710f690fed6b6ec7408f5d38.jpg
/opt/ml/input/data/eval/images/b9bbec2822d0af3ae164e52428aec22307146a5e.jpg
/opt/ml/input/data/eval/images/e89be8780c409ffc39dec14a270ee0ecfd169e33.jpg
/opt/ml/input/data/eval/images/2b9d36fb3f069133a1e649240bdd3c3f20a5a0aa.jpg
/opt/ml/input/data/eval/images/a0f9e8cfe98e12840f2d0495db4910028ba8eafe.jpg
/opt/ml/input/data/eval/images/ecf3f3bece9e406827766b1b1fa1b5c01b030e9a.jpg
/opt/ml/input/data/eval/images/6a30fc71d27a0a33377478cf6cb0b391eb18b66c.jpg
/opt/ml/input/data/eval/images/8f958971742a44a71ad3ffba827e7fb6d498aca9.jpg


In [3]:
train_pd = pd.read_csv(train_csv)
test_pd = pd.read_csv(test_csv)

In [4]:
"""
mask
    wear: 0
    incorrect: 1
    not wear: 2
gender
    male: 0
    female: 1
age
    <30: 0
    >=30 and <60: 1
    >=60: 2
"""
class FileNameError(Exception):
    def __init__(self, msg):
        self.msg = msg
    def __str__(self):
        return '** FileNameErrorself, ' + msg

mask = [0, 1, 2]
gender = [0, 1]
age = [0, 1, 2]

from itertools import product
label_number = list(product(mask, gender, age))
print(label_number)

def mask_feature(file_name):
    if file_name[:4] == 'mask':
        return 0
    elif file_name[:14] == 'incorrect_mask':
        return 1
    elif file_name[:6] == 'normal':
        return 2
    else:
        raise FileNameError('Mask naming error')

def gender_feature(person):
    gender = person.split('_')[1]
    if gender == 'male':
        return 0
    elif gender == 'female':
        return 1
    else:
        raise FileNameError('Gender naming error')

def age_feature(age):
    if age < 30:
        return 0
    elif 30 <= age < 60:
        return 1
    elif age >= 60:
        return 2
    else:
        raise FileNameError('Age naming error')

def get_label(path):
    # print(path.split()[-2:])
    person, file_name = path.split('/')[-2:]
    
    try:
        mask = mask_feature(file_name)
        gender = gender_feature(person)
        age = age_feature(int(person[-2:]))

        return label_number.index((mask, gender, age))
    except FileNameError as e:
        print(e)
        exit()

get_label('/opt/ml/input/data/train/images/006163_female_Asian_18/mask1.png')

    
# train_dir_glob

[(0, 0, 0), (0, 0, 1), (0, 0, 2), (0, 1, 0), (0, 1, 1), (0, 1, 2), (1, 0, 0), (1, 0, 1), (1, 0, 2), (1, 1, 0), (1, 1, 1), (1, 1, 2), (2, 0, 0), (2, 0, 1), (2, 0, 2), (2, 1, 0), (2, 1, 1), (2, 1, 2)]


3

In [5]:
def get_test_label(path):
    file_name = path.split('/')[-1]
    return test_pd[test_pd['ImageID'] == file_name]['ans'].item()

print(get_test_label('/opt/ml/input/data/eval/images/8f958971742a44a71ad3ffba827e7fb6d498aca9.jpg'))

0


In [6]:
class MaskDataset(Dataset):
    def __init__(self, images, transforms, train=True):
        self.images = images
        self.classes = range(18)
        self.transforms = transforms
        self.train = train
    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        path = self.images[idx]
        img = Image.open(path)

        if self.train:
            label = get_label(path)
        else:
            label = get_test_label(path)

        if self.transforms:
            img = self.transforms(img)
            
        return img, label

In [7]:
transformation = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize(224)
    ])
train_dataset = MaskDataset(train_dir_glob, transformation)
test_dataset = MaskDataset(test_dir_glob, transformation, train=False)

In [8]:
train_dataloader = DataLoader(dataset=train_dataset, batch_size=16, shuffle=True)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=16, shuffle=False)
img, labels = next(iter(train_dataloader))
img, labels = next(iter(test_dataloader))
# print(img)
# print(labels)

In [9]:
# from efficientnet_pytorch import EfficientNet
# model = EfficientNet.from_pretrained('efficientnet-b0')
import torchvision
# imagenet_resnet18 = torchvision.models.resnet50(pretrained=True)
model = torchvision.models.resnet18(pretrained=True)
print("네트워크 필요 입력 채널 개수", model.conv1.weight.shape[1])
print("네트워크 출력 채널 개수 (예측 class type 개수)", model.fc.weight.shape[0])
print(model)

네트워크 필요 입력 채널 개수 3
네트워크 출력 채널 개수 (예측 class type 개수) 1000
ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, trac

In [10]:
MASK_CLASS_NUM = 18
import math
model.fc = torch.nn.Linear(in_features=512, out_features=MASK_CLASS_NUM, bias=True)
torch.nn.init.xavier_uniform_(model.fc.weight)
stdv = 1. / math.sqrt(model.fc.weight.size(1))
model.fc.bias.data.uniform_(-stdv, stdv)

tensor([ 0.0073, -0.0133,  0.0128,  0.0014,  0.0222,  0.0377,  0.0407, -0.0382,
        -0.0007,  0.0414, -0.0204, -0.0183,  0.0437,  0.0033,  0.0055, -0.0004,
        -0.0257, -0.0131])

In [11]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 학습 때 GPU 사용여부 결정. Colab에서는 "런타임"->"런타임 유형 변경"에서 "GPU"를 선택할 수 있음

print(f"{device} is using!")
model.to(device) # Resnent 18 네트워크의 Tensor들을 GPU에 올릴지 Memory에 올릴지 결정함

LEARNING_RATE = 0.0001 # 학습 때 사용하는 optimizer의 학습률 옵션 설정
NUM_EPOCH = 5 # 학습 때 mnist train 데이터 셋을 얼마나 많이 학습할지 결정하는 옵션

loss_fn = torch.nn.CrossEntropyLoss() # 분류 학습 때 많이 사용되는 Cross entropy loss를 objective function으로 사용 - https://en.wikipedia.org/wiki/Cross_entropy
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE) # weight 업데이트를 위한 optimizer를 Adam으로 사용함

dataloaders = {
    "train" : train_dataloader,
    "test" : test_dataloader
}

cuda:0 is using!


In [12]:
### 학습 코드 시작
best_test_accuracy = 0.
best_test_loss = 9999.

for epoch in range(NUM_EPOCH):
  for phase in ["train", "test"]:
    running_loss = 0.
    running_acc = 0.
    if phase == "train":
      model.train() # 네트워크 모델을 train 모드로 두어 gradient을 계산하고, 여러 sub module (배치 정규화, 드롭아웃 등)이 train mode로 작동할 수 있도록 함
    elif phase == "test":
      model.eval() # 네트워크 모델을 eval 모드 두어 여러 sub module들이 eval mode로 작동할 수 있게 함

    with tqdm(dataloaders[phase], unit="batch") as tepoch:
        for ind, (images, labels) in enumerate(tepoch):
            tepoch.set_description(f"Epoch {epoch}")
            # (참고.해보기) 현재 tqdm으로 출력되는 것이 단순히 진행 상황 뿐인데 현재 epoch, running_loss와 running_acc을 출력하려면 어떻게 할 수 있는지 tqdm 문서를 보고 해봅시다!
            # hint - with, pbar
            images = images.to(device)
            labels = labels.to(device)

            optimizer.zero_grad() # parameter gradient를 업데이트 전 초기화함

            with torch.set_grad_enabled(phase == "train"): # train 모드일 시에는 gradient를 계산하고, 아닐 때는 gradient를 계산하지 않아 연산량 최소화
                logits = model(images)
                _, preds = torch.max(logits, 1) # 모델에서 linear 값으로 나오는 예측 값 ([0.9,1.2, 3.2,0.1,-0.1,...])을 최대 output index를 찾아 예측 레이블([2])로 변경함  
                loss = loss_fn(logits, labels)

                if phase == "train":
                  loss.backward() # 모델의 예측 값과 실제 값의 CrossEntropy 차이를 통해 gradient 계산
                  optimizer.step() # 계산된 gradient를 가지고 모델 업데이트

            running_loss += loss.item() * images.size(0) # 한 Batch에서의 loss 값 저장
            iter_correct = torch.sum(preds == labels.data)
            running_acc += iter_correct # 한 Batch에서의 Accuracy 값 저장

            tepoch.set_postfix(loss=loss.item(), accuracy=iter_correct.item() / batch_size)

    # 한 epoch이 모두 종료되었을 때,
    epoch_loss = running_loss / len(dataloaders[phase].dataset)
    epoch_acc = running_acc / len(dataloaders[phase].dataset)

    print(f"현재 epoch-{epoch}의 {phase}-데이터 셋에서 평균 Loss : {epoch_loss:.3f}, 평균 Accuracy : {epoch_acc:.3f}")
    if phase == "test" and best_test_accuracy < epoch_acc: # phase가 test일 때, best accuracy 계산
      best_test_accuracy = epoch_acc
    if phase == "test" and best_test_loss > epoch_loss: # phase가 test일 때, best loss 계산
      best_test_loss = epoch_loss
print("학습 종료!")
print(f"최고 accuracy : {best_test_accuracy}, 최고 낮은 loss : {best_test_loss}")

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=1182.0), HTML(value='')))


현재 epoch-0의 train-데이터 셋에서 평균 Loss : 0.383, 평균 Accuracy : 0.879


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=788.0), HTML(value='')))


현재 epoch-0의 test-데이터 셋에서 평균 Loss : 8.195, 평균 Accuracy : 0.181


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=1182.0), HTML(value='')))