In [None]:
import os

import numpy as np
import pandas as pd

# 이미지
from PIL import Image

# 시각화
import matplotlib.pyplot as plt

# PyTorch
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, AdamW

from torchvision import transforms

In [None]:
!unzip train.zip
!unzip test.zip

# **데이터셋 경로 지정**

In [None]:
PROJECT_PATH = "/content"

# 데이터셋 패스 설정
TRAIN_PATH = os.path.join(PROJECT_PATH, 'train')
TEST_PATH = os.path.join(PROJECT_PATH, 'test')

# **Label 숫자화**

In [None]:
label2idx = {'dog': 0,
             'elephant': 1,
             'giraffe': 2,
             'guitar': 3,
             'horse': 4,
             'house': 5,
             'person': 6}

# **Train Label 데이터 프레임 만들기**

In [None]:
# train_df 만들기
labels = []
for path in os.walk(TRAIN_PATH):
    label_type = path[0].split('/')[-1]
    if label_type == 'train':
        continue

    base_path = path[0]
    images = path[2]

    idx = label2idx[label_type]
    for image in images:
        img_path = os.path.join(base_path, image)
        label = {'img_path': img_path,
                 'label': idx}

        labels.append(label)

train_df = pd.DataFrame(labels)
train_df = train_df.sort_values(['label', 'img_path'])
train_df = train_df.reset_index(drop=True)
train_df.head(5)

# **Test Label 데이터 프레임 만들기**

In [None]:
# test_df 만들기
labels = []
for path in os.walk(TEST_PATH):
    base_path = path[0]
    images = path[2]

    for image in images:
        img_path = os.path.join(base_path, image)
        label = {'img_path': img_path,
                 'label': -1}

        labels.append(label)

test_DF = pd.DataFrame(labels)
test_DF = test_DF.sort_values(['img_path'])
test_DF = test_DF.reset_index(drop=True)
test_DF.head(5)

# **이미지 잘 들어왔나 확인**

In [None]:
image = plt.imread(train_df['img_path'][0])
plt.imshow(image)

In [None]:
image = plt.imread(test_df['img_path'][0])
plt.imshow(image)

# **사용자 데이터셋 클래스 정의**

In [None]:
class ArtDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __getitem__(self, idx):
        data = self.df.iloc[idx]

        # 이미지
        img_path = data['img_path']
        image = Image.open(img_path)

        if self.transform:
            image = self.transform(image)

        # 라벨
        label = data['label']

        return image, label

    def __len__(self):
        return len(self.df)

# **Train, Vaild 데이터 나누기**

In [None]:
from sklearn.model_selection import StratifiedKFold

def split_df(df, kfold_n=5):
    kfold = StratifiedKFold(n_splits=kfold_n)

    # 클래스 비율 고려하여 Fold별로 데이터 나눔
    X = df.img_path.values
    y = df.label.values

    for i, (train_index, valid_index) in enumerate(kfold.split(X, y)):
        train_df = df.iloc[train_index].copy().reset_index(drop=True)
        valid_df = df.iloc[valid_index].copy().reset_index(drop=True)

    return train_df, valid_df

In [None]:
train_df, valid_df = split_df(train_df)

In [None]:
from sklearn.model_selection import train_test_split
datasets = train_test_split(train_DF, test_size=0.1, random_state=42, shuffle=True)

train_df = datasets[0]
valid_df = datasets[1]
print(train_dataset)
print(val_dataset)

# **하이퍼파라미터 설정**

In [None]:
class HyperParams:
  def __init__(self, epoch=50, batch_size=64, lr=0.005):
    self.epoch = epoch
    self.batch_size = batch_size
    self.lr = lr
    self.use_cuda = torch.cuda.is_available()
    self.device = torch.device("cuda:0" if self.use_cuda else 'cpu')

  def print_params(self):
    print("------------- Parameter Table -------------")
    print("* EPOCH          : {}".format(self.epoch))
    print("* BATCH_SIZE     : {}".format(self.batch_size))
    print("* LEARNING RATE  : {}".format(self.lr))
    print("* USE_CUDA       : {}".format(self.use_cuda))
    print("* DEVICE         : {}".format(self.device))
    print("-------------------------------------------")


In [None]:
# Set Hyper Parameters
hyper_parameters = HyperParams(epoch=10, batch_size = 16, lr = 0.0001)
hyper_parameters.print_params()

# **Train, Valid 데이터 정의**

In [None]:
transform = transforms.Compose([transforms.CenterCrop(224),
                                transforms.ToTensor(),
                                transforms.Normalize([0.485, 0.456, 0.406],
                                                     [0.229, 0.224, 0.225])])

train_dataset = ArtDataset(train_df, transform)
valid_dataset = ArtDataset(valid_df, transform)


train_loader = DataLoader(train_dataset,
                          batch_size = hyper_parameters.batch_size,
                          shuffle = True,
                          num_workers = 1)
valid_loader = DataLoader(valid_dataset,
                          batch_size=hyper_parameters.batch_size,
                          shuffle=False,
                          num_workers=1)


print("* train_loader.shape : {}".format(len(train_loader)))

# **모델 설정**



In [None]:
class ConvNet(nn.Module):
    def __init__(self, class_n=7):
        super().__init__()

        self.model = nn.Sequential(
            # [32, 3, 224, 224] -> [32, 32, 55, 55]
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=8, stride=4),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d((2, 2)), # [32, 64, 55, 55] -> [32, 64, 27, 27]

            # [32, 32, 27, 27] -> [32, 64, 11, 11]
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=7, stride=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d((2, 2)), # [32, 64, 11, 11] -> [32, 64, 5, 5]
        )
        self.fc = nn.Linear(1600, class_n)


    def forward(self, x):
        x = self.model(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [None]:
model = ConvNet().to(hyper_parameters.device)

optimizer = torch.optim.Adam(model.parameters(), lr = hyper_parameters.lr, weight_decay=0.0)
optimizer.zero_grad()

criterion = nn.CrossEntropyLoss()

# **Train 함수 작성**

In [None]:
def train(model, hp, train_loader, train_len):
  model.train()

  corrects = 0

  for step, (x, label) in enumerate(train_loader):
    images = x.to(hp.device)
    labels = label.to(hp.device)

    outputs = model(images)

    loss = criterion(outputs, labels)
    loss.backward()

    optimizer.step()
    optimizer.zero_grad()

    if step % 100 == 0:
      print(f"Training steps: {step} Loss: {str(loss.item())}")

    _, preds = torch.max(outputs, 1)

    corrects += torch.sum(preds == labels.data)

  acc = corrects / train_len

  return acc

# **Validation 함수 작성**

In [None]:
def validate(model, hp, valid_loader, valid_len):
    model.eval()

    corrects = 0

    for images, labels in valid_loader:
        images = images.to(hp.device)
        labels = labels.to(hp.device)


        outputs = model(images)

        _, preds = torch.max(outputs, 1)
        corrects += torch.sum(preds == labels.data)

    acc = corrects / valid_len

    print(f'VALID ACC : {acc}\n')

    return acc

# **Train 시작**

In [None]:
for epoch in range(hyper_parameters.epoch):

  train_len = len(train_df)
  valid_len = len(valid_df)

  train_acc = train(model, hyper_parameters, train_loader, train_len)
  valid_acc = validate(model, hyper_parameters, valid_loader, valid_len)

  print("[Epoch {}] Train ACC : {}, Valid ACC : {}".format(epoch, train_acc, valid_acc))

# **Test 데이터 정의**

In [None]:
test_dataset = ArtDataset(test_df, transform)

test_loader = DataLoader(test_dataset,
                          batch_size=1,
                          shuffle=False,
                          num_workers=1)
print(len(test_dataset))

# **Test 시작**

In [None]:
model.eval()
answers = []
for images, labels in test_loader:
    images = images.to(hyper_parameters.device)
    labels = labels.to(hyper_parameters.device)

    outputs = model(images)

    _, preds = torch.max(outputs, 1)

    answers.extend(list(preds.cpu().numpy()))

In [None]:
print(answers)
print(len(answers))

# **최종 결과 파일 저장**

In [None]:
submission_path = os.path.join(PROJECT_PATH, 'submission.csv')
submission_df = pd.DataFrame({'answer_value': answers})
submission_df.to_csv(submission_path)