# 머신러닝 (Machine Learning)

## Kaggle 이미지 분류 대회 (Image Classification Competition)

### 1. 데이터 불러오기

In [None]:
# 오늘은 Kaggle에서 데이터를 다운받고 이미지를 분류함으로써 대회에 참가하는 일련의 과정을 체험해 볼 예정입니다.
# 먼저 Kaggle에서 데이터를 다운받아 불러오도록 합시다.

## 구글 드라이브 불러오기
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 우리가 기존에 사용했던 MNIST나 CIFAR10 데이터 같은 경우, Pytorch에서 지원해주는 데이터를 사용했습니다.
# 그렇기에 이미 데이터가 전처리가 되어있었고, 우리가 추가로 전처리를 할 필요가 없었습니다.
# 하지만 이번에는 Real World 데이터를 다루는만큼, 데이터를 직접 데이터셋으로 바뀌는 과정을 진행하겠습니다.

# 데이터를 데이터셋으로 변경하기 위해서는 클래스로 바꿔줄 필요가 있습니다.
# 이를 도와주는 라이브러리가 있는데요, 바로 torch.utils.data에 있는 Dataset 라이브러리입니다.
# torch.utils.data는 DataLoader와 random_split을 사용할 때 한번 보셨었는데 기억하실까요?
# 백문이 불여일견! 데이터셋으로 바꾸는 Class를 다같이 작성해봅시다.

## csv 파일 처리를 위한 pandas 라이브러리 불러오기
import pandas as pd

## Image 라이브러리 (이미지 처리) 불러오기
from PIL import Image

## Dataset 라이브러리 불러오기 (겸사겸사 DataLoader와 random_split도 같이!)
from torch.utils.data import Dataset, DataLoader, random_split

## class 작성
class CustomDataset(Dataset):
    def __init__(self, img_dir, annotation_file=None, transforms=None, iter=None):
        self.data = []
        self.target = []
        self.transforms = transforms
        self.iter = iter

        for path in img_dir:
            self.data.append(self.transforms(Image.open(path)))

        if annotation_file:
            csv_file = pd.read_csv(annotation_file)
            self.target = csv_file['label']

    def __len__(self):
        return len(self.target)

    def __getitem__(self, idx):
        if self.iter:
            data = self.iter(self.data[idx])
        return data, self.target[idx]

In [None]:
# 먼저 데이터 경로를 저장해줍시다.

train_path = ""
train_csv = ""
test_path = ""

In [None]:
# 그 다음, 우리가 어떤 전처리를 할지 정해야겠죠? 전처리는 다음과 같이 진행하겠습니다.
# 일단 전처리를 하기 위해 transforms 라이브러리를 불러옵시다.

from torchvision import transforms as T

# 전처리는 다음과 같은 것을 해주겠습니다.
## T.Resize -> 이미지의 사이즈 변경
## T.RandomCrop -> 이미지를 랜덤하게 잘라냄
## T.RamdomHorizontalFlip -> 이미지를 랜덤하게 좌우반전 (Default: 0.5)
## T.ToTensor -> 이미지를 텐서(병렬계산이 가능하도록)로 변경
## T.Normalize -> 이미지의 학습이 빠르게 되도록 정규화
T = {"train": T.Compose([
        T.Resize((224,224)),
        T.ToTensor(),
        T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ]),
     "iter": T.Compose([
         T.RandomCrop(224, padding=4),
         T.RandomHorizontalFlip(),
     ])
    "test": T.Compose([
        T.Resize((224,224)),
        T.ToTensor(),
        T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])
    }

In [None]:
# 이제 데이터를 불러올 준비가 모두 끝났습니다! 우리가 만든 class와 전처리를 이용하여 데이터를 불러옵시다.

trainset = CustomDataset(train_path, train_csv, T['train'], T['iter'])
testset = CustomDataset(test_path, test_csv, T['test'])

In [None]:
# 자 이제 본격적인 학습에 앞서, torch 라이브러리를 불러옵시다.
import torch

# 오늘도 시각화 툴을 이용하여 어떻게 생겼는지 확인해봅시다.
import matplotlib.pyplot as plt

labels_map = {
    0: "label 0",
    1: "label 1"
}
figure = plt.figure(figsize=(10, 5))
cols, rows = 5, 2
for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(trainset), size=(1,)).item()
    img, label = trainset[sample_idx]
    while label != i-1:
        sample_idx = torch.randint(len(trainset), size=(1,)).item()
        img, label = trainset[sample_idx]
    figure.add_subplot(rows, cols, i)
    plt.title(labels_map[label])
    plt.axis("off")
    plt.imshow(img.permute(1, 2, 0))
plt.show()

### 2. 데이터 분할 (Train/Valid)

In [None]:
# 여기까지 따라오시느라 고생했습니다! 이 아래는 이제 비슷해요
# 우리가 늘 했던 것처럼 데이터셋의 1할은 제대로 학습이 되는지 확인을 할 목적으로 나눠줍시다.
# 이를 위해서는 먼저, seed를 고정해주도록 합시다!

## seed 고정
def seed_fix(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)

seed_fix(0)

In [None]:
# train 데이터의 이미지 개수를 우선 확인합니다.
train_size = len(trainset)
print(train_size)

# 분할은 늘 하던대로, 9:1로 진행합시다.
val_ratio = 0.1

## validation으로 사용할 데이터의 수 확인
split = int(train_size*val_ratio)

## 데이터 분할; random_split(데이터셋, [train 데이터의 수, validation 데이터의 수])
train_dataset, val_dataset = random_split(trainset, [train_size-split, split])

In [None]:
# 저번 시간에 배웠던대로 DataLoader를 사용해봅시다.
# DataLoader가 기억이 안나신다면 망설이지 말고 질문주세요!

## train, validation, test dataloader로 변환
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_dataloader = DataLoader(testset, batch_size=64, shuffle=False)

### 3. 모델 아키텍쳐 설계

In [None]:
# 우리가 저번 시간에 했던 것처럼 모델을 설계해봅시다.
# 오늘은 저번에 제가 잠깐 언급 드렸던 모델인 Alexnet(2014)을 설계해볼거에요

## nn 라이브러리 불러오기
import torch.nn as nn

class AlexNet(nn.Module):
    def __init__(self):
        super(AlexNet, self).__init__()
        self.Convolutional = nn.Sequential(
            nn.Conv2d(in_channels=3,
                      out_channels=96,
                      kernel_size=11,
                      stride=4,
                      padding=0),
            nn.BatchNorm2d(96),
            nn.ReLU(),
            nn.MaxPool2d(3, 2),

            nn.Conv2d(in_channels=96,
                      out_channels=256,
                      kernel_size=5,
                      stride=1,
                      padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(3, 2),

            nn.Conv2d(in_channels=256,
                      out_channels=384,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU(),

            nn.Conv2d(in_channels=384,
                      out_channels=384,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU(),

            nn.Conv2d(in_channels=384,
                      out_channels=256,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(3, 2),
        )
        self.fullyconnected = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256*6*6, 4096),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Linear(4096, 11),
        )

    def forward(self, x):
        x = self.Convolutional(x)
        x = torch.flatten(x, 1)
        x = self.fullyconnected(x)
        return x

### 4. Train/Evaluation code 작성

In [None]:
def train(model, dataloader, optim, criterion, device):
    model.train()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    train_loss, train_acc = 0, 0

    for idx, (data, target) in enumerate(dataloader):
        data, target = data.to(device), target.to(device)
        pred = model(data)
        loss = criterion(pred, target)

        loss.backward()
        optim.step()
        optim.zero_grad()

        train_loss += loss.item()
        train_acc += (pred.argmax(1) == target).type(torch.float).sum().item()

    train_loss /= num_batches
    train_acc /= size

    print(f"Train: \n Accuracy: {(100*train_acc):>0.1f}%, Avg loss: {train_loss:>8f}")

def evaluation(model, dataloader, device, test=False):
    model.eval()
    size = len(dataloader.dataset)
    validation_acc = 0

    with torch.no_grad():
        for data, target in dataloader:
            data, target = data.to(device), target.to(device)
            pred = model(data)

            validation_acc += (pred.argmax(1) == target).type(torch.float).sum().item()

    validation_acc /= size

    print(f"Validation: \n Accuracy: {(100*validation_acc):>0.1f}%\n")

    return validation_acc

### 5. 모델 훈련 및 성능 확인

#### 5-1. CUDA 연결

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cuda device


#### 5-2. 학습 진행

In [None]:
## 모델 저장을 위한 라이브러리
import copy

# 우리가 만든 모델을 불러옵시다.
model = AlexNet().to(device)

# 또, 결과가 가장 좋았던 모델을 저장하기위해 가장 좋은 결과를 저장하는 변수와 모델을 저장하는 변수를 하나씩 만듭시다.
best_model = None
best_val_acc = 0

# loss를 계산하는 방식은 CrossEntropyLoss를 사용합시다.
criterion = nn.CrossEntropyLoss()

# 또한, 가중치를 업데이트하는 방식은 Adam을 사용합시다.
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 가볍게 10번 정도 학습해봅시다. epoch는 반복횟수를 의미하는 단어입니다.
epochs = 10
print(f"Training Starts ...\n{model} || {criterion} || {optimizer} || CIFAR10")
for epoch in range(epochs):
    print(f'Epoch {epoch+1} >>>')
    train(model, train_dataloader, optimizer, criterion, device)
    val_acc = evaluation(model, val_dataloader, device)
    if val_acc > best_val_acc:
        best_model = copy.deepcopy(model)

#### 5-3. Test 성능 확인

In [None]:
# 자 여기에서 우리가 지금까지 해온 부분과 Kaggle 대회의 차이점이 생깁니다.
# Kaggle의 경우 Test의 정답이 제공되지 않기 때문에, Test image를 분류한 결과를 저장하고, 이를 제출하여 정확도를 확인합니다.
# 따라서 우리도 Test용 Evaluation code를 새로 작성하여 정답을 제출해봅시다.

def evaluation_test(model, test_dataloader):
    pred = None
    model.eval()

    with torch.no_grad():
        for X, _ in test_dataloader:
            X = X.to(device)
            Y = model(X)
            if pred == None:
                pred = Y.argmax(1)
            else:
                pred = torch.cat((pred, Y.argmax(1)), dim=0)

    return pred

In [None]:
# 위에서 우리가 작성한 코드를 기반으로 Test의 정답 label을 예측해봅시다.
# 예측을 진행한 후에는, 정답을 csv로 저장하여 kaggle 사이트에 제출하면 됩니다!

## Test 결과 예측
pred = evaluation_test(best_model, test_dataloader, device)

## 정답 pandas로 변경
test_pred = pd.DataFrame([i for i in range(len(pred))], columns=["file_name"])
test_pred['label'] = test_pred.to("cpu").numpy()
test_pred.tail()

## 정답 저장
test_pred.to_csv('./Kaggle_pred.csv', index=False)

이제 여러분은 Kaggle이나 Dacon과 같은 대회에 참가할 수 있는 충분한 실력이 되었습니다!<br><br>
여기서 우리는 1. 모델의 구조를 바꾸거나, 2. epoch의 수를 늘리거나, 3. transforms를 이용한 이미지 전처리를 이용하여 정확도를 올릴 수 있습니다.<br><br>
한 학기동안 부족한 강의 들어주셔서 감사드리며, 앞으로 여러분들께 좋은 일만 있으시길 기원하겠습니다! 감사합니다