<a href="https://colab.research.google.com/github/chminPark/ml-python/blob/master/%EC%8B%A4%EC%8A%B5_CNN_EarlyStopping.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from __future__ import print_function

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from PIL import Image
import matplotlib.pyplot as plt

import torchvision
import torchvision.transforms as transforms
import torchvision.models as models

import copy
import os
import numpy as np
import random
import time
import os

torch.backends.cudnn.deterministic = True # Use cudnn as deterministic mode for reproducibility
torch.backends.cudnn.benchmark = False

# VGG 모델 생성

In [None]:
class VGG(nn.Module):
    def __init__(self):
      super(VGG, self).__init__()

      self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1) # Convolutional layer with 3x3 kernel. The size of feature does not change due to the usage of padding.
      self.conv2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1)

      self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
      self.conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)

      self.conv5 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
      self.conv6 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1)

      self.max_pool = nn.MaxPool2d(kernel_size=2, stride=2) #Maxpooling layer to change feature size
      self.avg_pool = nn.AdaptiveAvgPool2d(output_size = (1, 1)) ## 실습: 이미지의 평균값을 계산하는 Layer
      self.fc = nn.Linear(in_features=128, out_features=10) # 실습 : 10개 Class를 만들어 내는 FC

    def forward(self, x):
      x = self.conv1(x)
      x = F.relu(x)
      x = self.conv2(x)
      x = F.relu(x)
      x = self.max_pool(x)

      x = self.conv3(x)
      x = F.relu(x)
      x = self.conv4(x)
      x = F.relu(x)
      x = self.max_pool(x)

      x = self.conv5(x)
      x = F.relu(x)
      x = self.conv6(x)
      x = F.relu(x)   # 128x8x8
      x = self.avg_pool(x) # 128x1x1

      x = x.view(-1, 128)
      x = self.fc(x)
      return x

# CIFAR-10 데이터를 이용한다.

In [None]:
transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4), # Random Crop: Randomly crop the part of the large image and utilize it as an augmented data
        transforms.RandomHorizontalFlip(), # Random Horizontal Flip: Randomly flip the image and utilize it as an augmented data
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[0.2023,0.1994,0.2010]), # Normalize the data using the given mean and standard deviation
        ])

#Apply data preprocessing for test set
transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[0.2023,0.1994,0.2010]),
        ])

dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)


Files already downloaded and verified
Files already downloaded and verified


In [None]:
dataset.classes

['airplane',
 'automobile',
 'bird',
 'cat',
 'deer',
 'dog',
 'frog',
 'horse',
 'ship',
 'truck']

In [None]:
class_count = {}
for _, index in dataset:
    label = dataset.classes[index]
    if label not in class_count:
        class_count[label] = 0
    class_count[label] += 1
class_count

{'frog': 5000,
 'truck': 5000,
 'deer': 5000,
 'automobile': 5000,
 'bird': 5000,
 'horse': 5000,
 'ship': 5000,
 'cat': 5000,
 'dog': 5000,
 'airplane': 5000}

# Train과 Validation 데이터를 나눈다

In [None]:
# dataset을 train 과 validation데이터로 나눈다.
# Pytorch DataSet 을 나눌때에는 random_split을 수행한다.
from torch.utils.data import random_split

train_len = int(len(dataset) * 0.8)
val_len = len(dataset) - train_len

train_dataset, val_dataset = random_split(dataset, [train_len, val_len])
len(train_dataset), len(val_dataset)


(40000, 10000)

In [None]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=100, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=100, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100)

# Early Stopping 객체를 생성한다

In [None]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement.
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
            trace_func (function): trace print function.
                            Default: print
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

# 학습함수를 선언하고 학습을 수행한다

In [None]:
def reset_seed(seed):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

In [None]:
def train(model, data_loader, val_loader, early_stopping, criterion, optimizer, n_epoch):
  ### 실습 : 학습 중임을 알림
  model.train() ###
  for epoch in range(n_epoch):
    running_loss = 0
    for i, (images, labels) in enumerate(data_loader):

      images, labels = images.cuda(), labels.cuda()
      outputs = model(images)
      loss = criterion(outputs, labels)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      running_loss += loss.item()
      if (i + 1) % 100 == 0:
        print('iteration: [{}/{}]'.format(i + 1, len(data_loader)))

    # 각 epoch 이후 validation 수행
    model.eval()
    val_loss = 0
    for images, labels in val_loader:
      images, labels = images.cuda(), labels.cuda()
      outputs = model(images)
      vloss = criterion(outputs, labels)

      val_loss += vloss

    # early stopping을 체크
    early_stopping(val_loss, model)

    # overfitting 발생 시 학습 종료
    if early_stopping.early_stop:
      break

    # 원래 상태로 돌아옴
    model.train()
    running_loss = running_loss/len(data_loader)
    val_loss = val_loss / len(val_loader)
    print(f'Epoch {epoch + 1}, train_loss = {running_loss:.3f}, val_loss = {val_loss:.3f}')

# 학습수행

In [None]:
reset_seed(10000) # seed 값에 따라서 딥러닝모델의 weight가 초기화 됩니다.
criterion = nn.CrossEntropyLoss()

vgg_model = VGG().to("cuda")
optimizer = optim.Adam(params=vgg_model.parameters())

# Early Stop 객체를 생성하고 validation loss가 연속으로 3번 상승되면 정지하는 코드를 만든다.
earlystop = EarlyStopping(patience=3)
train(vgg_model, train_loader, val_loader, earlystop, criterion, optimizer, n_epoch=30)

iteration: [100/400]
iteration: [200/400]
iteration: [300/400]
iteration: [400/400]
Epoch 1, train_loss = 1.836, val_loss = 1.571
iteration: [100/400]
iteration: [200/400]
iteration: [300/400]
iteration: [400/400]
Epoch 2, train_loss = 1.425, val_loss = 1.348
iteration: [100/400]
iteration: [200/400]
iteration: [300/400]
iteration: [400/400]
Epoch 3, train_loss = 1.203, val_loss = 1.117
iteration: [100/400]
iteration: [200/400]
iteration: [300/400]
iteration: [400/400]
Epoch 4, train_loss = 1.076, val_loss = 1.023
iteration: [100/400]
iteration: [200/400]
iteration: [300/400]
iteration: [400/400]
Epoch 5, train_loss = 0.986, val_loss = 0.930
iteration: [100/400]
iteration: [200/400]
iteration: [300/400]
iteration: [400/400]
Epoch 6, train_loss = 0.914, val_loss = 0.905
iteration: [100/400]
iteration: [200/400]
iteration: [300/400]
iteration: [400/400]
Epoch 7, train_loss = 0.858, val_loss = 0.875
iteration: [100/400]
iteration: [200/400]
iteration: [300/400]
iteration: [400/400]
Epoch 

# 평가 함수를 구성하고 평가 정확도를 확인한다

In [None]:
# early stop 된 객체를 load 한다.
state_dict = torch.load('checkpoint.pt')
vgg_model = VGG().to("cuda")
vgg_model.load_state_dict(state_dict)


<All keys matched successfully>

In [None]:
def eval(model, data_loader):
  #### 실습 : 평가를 위해서는 eval()을 선언
  model.eval()
  total = 0
  correct = 0
  #### 실습 : Gradient를 타지 않아야 한다
  with torch.no_grad():
    for images, labels in data_loader:
      images, labels = images.cuda(), labels.cuda()
      outputs = model(images)
      _, predicted = torch.max(outputs, 1)
      total += labels.size(0)
      correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total

  print('Test Accuracy: {}%'.format(accuracy))

In [None]:
eval(vgg_model, test_loader)

Test Accuracy: 82.34%
