# 투빅스 16기 9주차 과제 Transfer Learning

데이터: 7개의 클래스로 분류되어 있는 명화 이미지<br>
dog, elephant, giraffe, guitar, horse, house, person

In [35]:
# gdrive에 mount
from google.colab import drive
drive.mount('/content/Mydrive', force_remount = True)

Mounted at /content/Mydrive


In [36]:
# 경로 설정
import os
os.chdir('/content/Mydrive/MyDrive/Tobigs/정규세션_CNN심화')

# Import Modules

In [37]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Load Data
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder 
from torch.utils.data import DataLoader

# Pytorch --> CNN
import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim

from torchsummary import summary 

In [38]:
import torch

if torch.cuda.is_available():
  DEVICE = torch.device('cuda')
else:
  DEVICE = torch.device('cpu')
print(DEVICE)

cuda


# Load Data

In [128]:
import shutil

original_dataset_dir = './data'                   # 기존의 데이터
classes_list = os.listdir(original_dataset_dir) 
 
base_dir = './splitted'                           # train-validation-test 나누기
os.mkdir(base_dir)
 
train_dir = os.path.join(base_dir, 'train') 
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

for cls in classes_list:     
    os.mkdir(os.path.join(train_dir, cls))
    os.mkdir(os.path.join(validation_dir, cls))
    os.mkdir(os.path.join(test_dir, cls))

In [129]:
## 데이터 분할 후 클래스별 데이터 수 확인

import math
 
for cls in classes_list:
    path = os.path.join(original_dataset_dir, cls)
    fnames = os.listdir(path)
 
    train_size = math.floor(len(fnames) * 0.6)
    validation_size = math.floor(len(fnames) * 0.2)
    test_size = math.floor(len(fnames) * 0.2)
    
    train_fnames = fnames[:train_size]
    print("Train size(",cls,"): ", len(train_fnames))
    for fname in train_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(train_dir, cls), fname)
        shutil.copyfile(src, dst)
        
    validation_fnames = fnames[train_size:(validation_size + train_size)]
    print("Validation size(",cls,"): ", len(validation_fnames))
    for fname in validation_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, cls), fname)
        shutil.copyfile(src, dst)
        
    test_fnames = fnames[(train_size+validation_size):(validation_size + train_size +test_size)]

    print("Test size(",cls,"): ", len(test_fnames))
    for fname in test_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, cls), fname)
        shutil.copyfile(src, dst)

Train size( elephant ):  123
Validation size( elephant ):  41
Test size( elephant ):  41
Train size( house ):  147
Validation size( house ):  49
Test size( house ):  49
Train size( guitar ):  80
Validation size( guitar ):  26
Test size( guitar ):  26
Train size( horse ):  90
Validation size( horse ):  30
Test size( horse ):  30
Train size( giraffe ):  141
Validation size( giraffe ):  47
Test size( giraffe ):  47
Train size( person ):  239
Validation size( person ):  79
Test size( person ):  79
Train size( dog ):  197
Validation size( dog ):  65
Test size( dog ):  65


In [39]:
BATCH_SIZE = 64
EPOCH = 30

In [52]:
 transform_base = transforms.Compose([transforms.Resize((256,256)),transforms.ToTensor()]) # AlexNet의 input size에 맞춤 (64x64는 너무 작음)
train_dataset = ImageFolder(root='./splitted/train', transform=transform_base) 
val_dataset = ImageFolder(root='./splitted/val', transform=transform_base)

In [41]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = torch.utils.data.DataLoader(val_dataset,batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

  cpuset_checked))


# Baseline Model


In [42]:
class Net(nn.Module): # 모델 설계
    def __init__(self, n_classes = 7):   
        ####  모델을 설계해 주세요 ####
        # AlexNet 구현
        super(Net, self).__init__()
        self.convnet = nn.Sequential(
            # Input Channel (RGB: 3)
            nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, padding=0, stride=4), # 227 -> 55
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2), # 55 -> 27
            
            nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, padding=2, stride=1), # 27 -> 27
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2), # 27 -> 13
            
            nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.Conv2d(in_channels=384, out_channels=384, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2), # 13 -> 6
        )

        self.fclayer = nn.Sequential(
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, n_classes),
        )

    def forward(self, x):   
        x = self.convnet(x)
        x = torch.flatten(x, 1)
        out = self.fclayer(x)
        return F.log_softmax(out) # softmax 통해 최종 output 계산


# print model summary
model_base = Net().to(DEVICE)
summary(model_base, (3, 227, 227)) 

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 96, 55, 55]          34,944
              ReLU-2           [-1, 96, 55, 55]               0
 LocalResponseNorm-3           [-1, 96, 55, 55]               0
         MaxPool2d-4           [-1, 96, 27, 27]               0
            Conv2d-5          [-1, 256, 27, 27]         614,656
              ReLU-6          [-1, 256, 27, 27]               0
 LocalResponseNorm-7          [-1, 256, 27, 27]               0
         MaxPool2d-8          [-1, 256, 13, 13]               0
            Conv2d-9          [-1, 384, 13, 13]         885,120
             ReLU-10          [-1, 384, 13, 13]               0
LocalResponseNorm-11          [-1, 384, 13, 13]               0
           Conv2d-12          [-1, 384, 13, 13]       1,327,488
             ReLU-13          [-1, 384, 13, 13]               0
LocalResponseNorm-14          [-1, 384,



In [43]:
# Optimizer, Loss function 은 편하신 대로 변경하시면 됩니다 :) 

# optimizer = optim.SGD(model_base.parameters(), lr=0.001) 
optimizer = optim.Adam(model_base.parameters(), lr=0.001) 
criterion = nn.CrossEntropyLoss()

**Train** Model

In [44]:
def train(model, train_loader, optimizer):
    model.train()                         # 모델 train 상태로
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)   # data, target 값 DEVICE에 할당
        optimizer.zero_grad()                                             # optimizer gradient 값 초기화
        output = model(data)                                             # 할당된 데이터로 output 계산
        loss = criterion(output, target)                                           # Cross Entropy Loss 사용해 loss 계산
        loss.backward()                                           # 계산된 loss back propagation
        optimizer.step()                                    # parameter update

Evaluate Model

In [45]:
def evaluate(model, test_loader):
    model.eval()      # 모델 평가 상태로
    test_loss = 0     # test_loss 초기화
    correct = 0       # 맞게 예측한 0 값으로 초기화
    
    with torch.no_grad(): 
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)     # data, target DEVICE에 할당
            output = model(data)                                                 # output 계산
            test_loss += criterion(output, target).item()         # loss 계산(총 loss 에 더해주기)
            pred = output.max(1, keepdim=True)[1]                 # 계산된 벡터값 중 가장 큰 값 가지는 class 예측
            correct += pred.eq(target.view_as(pred)).sum().item() # 맞게 예측한 값 세기
   
    test_loss /= len(test_loader.dataset)                         # 평균 loss
    test_accuracy = 100. * correct / len(test_loader.dataset)     # test(validation) 데이터 정확도
    return test_loss, test_accuracy  

## Train Model

In [None]:
import time
import copy
# Batch 128, 0.001
def train_baseline(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0  # beset accuracy 초기화
    best_model_wts = copy.deepcopy(model.state_dict()) 
 
    for epoch in range(1, num_epochs + 1):
        since = time.time()                                     # 학습 시간 계산
        train(model, train_loader, optimizer)                   # train 데이터로 학습
        train_loss, train_acc = evaluate(model, train_loader)   # train_loss, train_acc 계산
        val_loss, val_acc = evaluate(model, val_loader)         # valid_loss, valid_acc 계산
        
        if val_acc>best_acc:  # update best accuracy
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = time.time() - since # 학습 시간 출력
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))   
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60)) 

    model.load_state_dict(best_model_wts)  
    return model

base = train_baseline(model_base ,train_loader, val_loader, optimizer)  	# 모델 학습시키기
torch.save(base,'AlexNet.pt')                                             # 모델 저장

  cpuset_checked))


-------------- epoch 1 ----------------
train Loss: 0.0296, Accuracy: 23.50%
val Loss: 0.0338, Accuracy: 23.44%
Completed in 0m 13s
-------------- epoch 2 ----------------
train Loss: 0.0298, Accuracy: 19.37%
val Loss: 0.0338, Accuracy: 19.29%
Completed in 0m 13s
-------------- epoch 3 ----------------
train Loss: 0.0299, Accuracy: 20.45%
val Loss: 0.0340, Accuracy: 19.29%
Completed in 0m 13s
-------------- epoch 4 ----------------
train Loss: 0.0293, Accuracy: 24.68%
val Loss: 0.0334, Accuracy: 24.33%
Completed in 0m 13s
-------------- epoch 5 ----------------
train Loss: 0.0300, Accuracy: 23.50%
val Loss: 0.0344, Accuracy: 23.44%
Completed in 0m 14s
-------------- epoch 6 ----------------
train Loss: 0.0286, Accuracy: 26.16%
val Loss: 0.0335, Accuracy: 21.66%
Completed in 0m 13s
-------------- epoch 7 ----------------
train Loss: 0.0282, Accuracy: 27.83%
val Loss: 0.0325, Accuracy: 29.08%
Completed in 0m 13s
-------------- epoch 8 ----------------
train Loss: 0.0284, Accuracy: 25.07%

## Test Model

In [46]:
# Test Data Classification 하기
transform_base = transforms.Compose([transforms.ToTensor()]) 
test_base = ImageFolder(root='./splitted/test',transform=transform_base)  
test_loader_base = torch.utils.data.DataLoader(test_base, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

  cpuset_checked))


In [None]:
def predict_test(model, data_loader):
  preds = []
  labels = []
  with torch.no_grad():
    for data, label in data_loader:
      data = data.to(DEVICE)
      label = label.to(DEVICE)
      # 신경망에 이미지를 통과시켜 출력을 계산
      outputs = model(data)
      # 가장 높은 값(energy)를 갖는 분류(class)를 정답으로 선택
      _, pred = torch.max(outputs, 1)
      for i in pred:
        preds.append(int(i))
      for j in label:
        labels.append(int(j))
  return preds, labels

# Transfer Learning with 모델 학습

* Transfer Learning이 익숙하지 않으신 분들은 PyTorch에서 제공하는 https://9bow.github.io/PyTorch-tutorials-kr-0.3.1/beginner/transfer_learning_tutorial.html 을 참고하세요 :)


In [55]:
# Data Augmentation 
data_transforms = {
    'train': transforms.Compose([
        ### 데이터 전처리를 진행해 주세요 ###
        transforms.Resize([256,256]),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.5),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
    
    'val': transforms.Compose([
        ### 데이터 전처리를 진행해 주세요 ###
        transforms.Resize([256,256]),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])
}

train_dataset_resnet = ImageFolder(root='./splitted/train', transform = data_transforms['train'])
val_dataset_resnet = ImageFolder(root='./splitted/val', transform = data_transforms['val'])

train_loader_resnet = torch.utils.data.DataLoader(train_dataset_resnet, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader_resnet = torch.utils.data.DataLoader(val_dataset_resnet, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

  cpuset_checked))


In [56]:
from torchvision import models
 
resnet = models.resnet50(pretrained=True)   # resnet50 불러오기, pretrained = True로 학습된 파라미터 값들 불러오기
num_ftrs = resnet.fc.in_features            # resnet50의 fully connected layer의 input 노드 수
resnet.fc = nn.Linear(num_ftrs, 21)         # input 노드 수를 이용해 새로운 layer 추가
resnet = resnet.to(DEVICE)                  # resnet 모델 DEVICE에 할당
 
criterion = nn.CrossEntropyLoss()                                                           # CrossEntropyLoss 로 loss 계산
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001) # optimizer -> adam

In [57]:
# Pretrained Model의 일부 layer freeze 하기
ct = 0 
for child in resnet.children():  
    ct+= 1  
    if ct < 7: 
        for param in child.parameters():
            param.requires_grad = False

Fine Tuning을 진행해 주세요!

In [58]:
def train_resnet(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    ### finetuning 함수를 만들어 주세요 ### 
    ### (위의 train_baseline 함수 참고) ###
    best_acc = 0.0  
    best_model_wts = copy.deepcopy(model.state_dict()) 
 
    for epoch in range(1, num_epochs + 1):
        since = time.time()  
        train(model, train_loader, optimizer)
        train_loss, train_acc = evaluate(model, train_loader)
        val_loss, val_acc = evaluate(model, val_loader)
        
        if val_acc>best_acc: 
            best_acc = val_acc 
            best_model_wts = copy.deepcopy(model.state_dict())

        time_elapsed = time.time() - since 
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))   
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60)) 


    model.load_state_dict(best_model_wts)  
    return model

In [59]:
import copy
import time

resnet_train = train_resnet(resnet, train_loader_resnet, val_loader_resnet, optimizer_ft)  # resnet transfer learning 진행

  cpuset_checked))


-------------- epoch 1 ----------------
train Loss: 0.8285, Accuracy: 38.74%
val Loss: 1.2713, Accuracy: 39.47%
Completed in 2m 20s
-------------- epoch 2 ----------------
train Loss: 0.0317, Accuracy: 63.52%
val Loss: 0.0414, Accuracy: 61.42%
Completed in 0m 31s
-------------- epoch 3 ----------------
train Loss: 0.0136, Accuracy: 77.48%
val Loss: 0.0226, Accuracy: 74.18%
Completed in 0m 31s
-------------- epoch 4 ----------------
train Loss: 0.0179, Accuracy: 69.32%
val Loss: 0.0312, Accuracy: 68.25%
Completed in 0m 31s
-------------- epoch 5 ----------------
train Loss: 0.0122, Accuracy: 77.29%
val Loss: 0.0182, Accuracy: 74.78%
Completed in 0m 31s
-------------- epoch 6 ----------------
train Loss: 0.0079, Accuracy: 82.40%
val Loss: 0.0190, Accuracy: 71.51%
Completed in 0m 31s
-------------- epoch 7 ----------------
train Loss: 0.0111, Accuracy: 80.14%
val Loss: 0.0222, Accuracy: 70.62%
Completed in 0m 31s
-------------- epoch 8 ----------------
train Loss: 0.0182, Accuracy: 70.89%

In [60]:
# Classification 하기
transform_res = transforms.Compose([
        ### 데이터 전처리 ###
        transforms.Resize([256,256]),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]) 
test_res = ImageFolder(root='./splitted/test',transform=transform_res)  
test_loader_res = torch.utils.data.DataLoader(test_res, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

  cpuset_checked))


In [99]:
def predict_test(model, data_loader):
  preds = []
  labels = []
  with torch.no_grad():
    for data, label in data_loader:
      data = data.to(DEVICE)
      label = label.to(DEVICE)
      # 신경망에 이미지를 통과시켜 출력을 계산
      outputs = model(data)
      # 가장 높은 값(energy)를 갖는 분류(class)를 정답으로 선택
      _, pred = torch.max(outputs, 1)
      for i in pred:
        preds.append(int(i))
      for j in label:
        labels.append(int(j))
  return preds, labels

# 모델 평가

모델 평가를 진행할 때 accuracy, precision, recall f1-score 등 다양한 평가지표를 가지고 진행해 주세요 ~!

### 베이스라인 모델

In [100]:
predictions, labels = predict_test(base, test_loader_base)

  cpuset_checked))


In [103]:
print(predictions[:10])
print(labels[:10])

[5, 6, 0, 5, 6, 6, 2, 6, 6, 5]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [None]:
from sklearn.metrics import accuracy_score
round(accuracy_score(labels, predictions)*100,1)

39.8

### Transfer Learning 모델

In [104]:
preds, labels = predict_test(resnet, test_loader_res)

  cpuset_checked))


In [105]:
print(preds[:10])
print(labels[:10])

[0, 0, 0, 0, 0, 2, 0, 0, 4, 4]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


##### Confusion Matrix

In [117]:
from sklearn.metrics import confusion_matrix
confusion_matrix(labels, preds)

array([[53,  0,  4,  2,  3,  0,  3],
       [ 6, 27,  3,  1,  1,  0,  3],
       [ 0,  0, 45,  1,  0,  0,  1],
       [ 1,  1,  0, 23,  0,  0,  1],
       [ 4,  1,  5,  0, 17,  0,  3],
       [ 1,  0,  1,  0,  0, 46,  1],
       [ 3,  3,  4,  0,  1,  2, 66]])

In [106]:
from sklearn.metrics import classification_report
target_names = ['0','1','2','3','4','5','6']
print(classification_report(labels, preds, target_names=target_names))

              precision    recall  f1-score   support

           0       0.78      0.82      0.80        65
           1       0.84      0.66      0.74        41
           2       0.73      0.96      0.83        47
           3       0.85      0.88      0.87        26
           4       0.77      0.57      0.65        30
           5       0.96      0.94      0.95        49
           6       0.85      0.84      0.84        79

    accuracy                           0.82       337
   macro avg       0.83      0.81      0.81       337
weighted avg       0.83      0.82      0.82       337



In [124]:
f1_score(labels, preds, average=None)

array([0.79699248, 0.73972603, 0.82568807, 0.86792453, 0.65384615,
       0.94845361, 0.84076433])

##### Accuracy

In [127]:
round(accuracy_score(labels, preds)*100,1)

82.2