# Fine Tuning

In [None]:
# from google.colab import drive
# drive.mount('/content/gdrive', force_remount=True)
# !gdown --id 1MP5IVX30XcYqN5RHwBKPdWLEvRfXLH71
# !mv catanddog.zip cat-and-dog/ ; mkdir cat-and-dog ; cd cat-and-dog
# !mkdir cat-and-dog ; mv catanddog.zip cat-and-dog/
# !cd cat-and-dog; unzip catanddog.zip
# !mv cat-and-dog/test_set/test_set/* cat-and-dog/test_set
# !mv cat-and-dog/training_set/training_set/* cat-and-dog/training_set
# !rm -r cat-and-dog/test_set/test_set cat-and-dog/training_set/training_set

In [None]:
import numpy as np
import pandas as pd
import os
def make_catndog_dataframe():
  paths = []
  dataset_gubnus = []
  label_gubuns = []
  for dirname, _, filenames in os.walk('cat-and-dog'):
    for filename in filenames:
      if '.jpg' in filename:
        file_path = dirname + '/' + filename
        paths.append(file_path)
        if '/training_set/' in file_path:
          dataset_gubnus.append('train')
        elif '/test_set/' in file_path:
          dataset_gubnus.append('test')
        else: dataset_gubnus.append('N/A')
        if 'dogs' in file_path:
          label_gubuns.append('DOG')
        elif 'cats' in file_path:
          label_gubuns.append('CAT')
        else: label_gubuns.append('N/A')
  data_df = pd.DataFrame({'path': paths, 'dataset': dataset_gubnus, 'label': label_gubuns})
  return data_df

pd.set_option('display.max_colwidth', 200)
data_df = make_catndog_dataframe()
print('data_df shape: ', data_df.shape)
data_df.head()

data_df shape:  (10028, 3)


Unnamed: 0,path,dataset,label
0,cat-and-dog/test_set/cats/cat.4102.jpg,test,CAT
1,cat-and-dog/test_set/cats/cat.4347.jpg,test,CAT
2,cat-and-dog/test_set/cats/cat.4790.jpg,test,CAT
3,cat-and-dog/test_set/cats/cat.4511.jpg,test,CAT
4,cat-and-dog/test_set/cats/cat.4592.jpg,test,CAT


In [None]:
# Keras Sequence를 상속 받은 Dataset객체를 생성
from tensorflow.keras.utils import Sequence
import sklearn
import cv2

# 배치 크기와 이미지 크기 변수 선언
BATCH_SIZE = 64
IMAGE_SIZE = 224

class CnD_Dataset(Sequence):
  # 객체 생성 인자로 들어온 값을 객체 내부 변수로 할당
  def __init__(self, image_filenames, labels, batch_size=BATCH_SIZE, 
               augmentor=None, shuffle=False, pre_func=None):
    '''
    image_filenames : image를 로드할 파일의 절대 경로
    labels : 해당 이미지의 label
    batch_size : __getitem__(self, index) 호출 시 마다 가져올 batch건수
    augmentor : albumentations 객체
    shuffle: 학습 데이터의 경우 epoch 종료시마다 데이타를 섞을지 여부
    '''
    self.image_filenames = image_filenames
    self.labels = labels
    self.batch_size = batch_size
    self.augmentor = augmentor
    self.shuffle = shuffle
    self.pre_func = pre_func

  # Sequence를 상속받은 Dataset은 batch_size 단위로 입력된 데이터를 처리함.
  # __len__()은 전체 데이터 건수가 주어졌을 때 batch_size 단위로 몇번 데이터를 반환하는지 나타냄
  def __len__(self):
    # batch_size단위로 데이터를 몇번 가져와야하는지 계산, 전체 데이터 건수를 batch_size로 나누되, 
    # 정수로 정확히 나줘 지지 않을 경우 1회를 더한다.
    return int(np.ceil(len(self.labels)/self.batch_size))

  def __getitem__(self, index):
    # batch_size 단위로 image_array, label_array 데이터를 가져와서 변환한 뒤 다시 반환함
    # 인자로 몇번째 batch인지를 나타내는 index를 입력하면 해당 순서에 해당하는 batch_size 만큼의 데이터를 가공하여 반환
    # batch_size 갯수만큼 변환된 image_array와 label_array 반환

    image_name_batch = self.image_filenames[index * self.batch_size:(index+1) * self.batch_size]
    if self.labels is not None:
      label_batch = self.labels[index * self.batch_size : (index + 1) * self.batch_size]
    
    image_batch = np.zeros((image_name_batch.shape[0], IMAGE_SIZE, IMAGE_SIZE, 3))
    for image_index in range(image_name_batch.shape[0]):
      image = cv2.cvtColor(cv2.imread(image_name_batch[image_index]), cv2.COLOR_BGR2RGB)
      image = cv2.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
      if self.augmentor is not None:
        image = self.augmentor(image=image)['image']
      # 이미지 픽셀 값을 0~1로 스케일링
      # image = image/255.
      if self.pre_func is not None:
        image = self.pre_func(image)
      image_batch[image_index] = image
    return image_batch, label_batch
    
  def on_epoch_end(self):
    # epoch각 한번 수행 완료 될 때마다 모델의 fit()에서 호출 함.
    if (self.shuffle):
      self.image_filenames, self.labels = sklearn.utils.shuffle(self.image_filenames, self.labels)
    else:
      pass

In [None]:
from sklearn.model_selection import train_test_split
def get_train_valid_test(data_df):
  # 학습 데이터와 테스트 데이터용 Dataframe 생성.
  train_df = data_df[data_df['dataset'] == 'train']
  test_df = data_df[data_df['dataset'] == 'test']
  # 학습 데이터의 image path와 label을 numpy array로 변환 및 label encoding
  train_path = train_df['path'].values
  train_label = pd.factorize(train_df['label'])[0]
  test_path = test_df['path'].values
  test_label = pd.factorize(test_df['label'])[0]
  # 학습데이터를 훈련용, 검증용 나누기
  tr_path, val_path, tr_label, val_label = train_test_split(train_path, train_label, test_size=0.5, random_state=10)
  return tr_path, tr_label, val_path, val_label, test_path, test_label


In [None]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Dense, Conv2D, Dropout, Flatten, Activation, MaxPooling2D, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, LearningRateScheduler

from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.applications import Xception, MobileNetV2

def create_model(model_name='vgg16', verbose=False):
  input_tensor = Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
  if model_name == 'vgg16':
    base_model = VGG16(input_tensor=input_tensor, include_top=False, weights='imagenet')
  elif model_name == 'resnet50':
    base_model = ResNet50V2(input_tensor=input_tensor, include_top=False, weights='imagenet')
  elif model_name == 'xception':
    base_model = Xception(input_tensor=input_tensor, include_top=False, weights='imagenet')
  elif model_name == 'mobilenet':
    base_model = MobileNetV2(input_tensor=input_tensor, include_top=False, weights='imagenet')
  bm_output = base_model.output
  x = GlobalAveragePooling2D()(bm_output)
  if model_name != 'vgg16':
    x = Dropout(rate=0.5)(x)
  x = Dense(50, activation='relu', name='fc1')(x)
  output = Dense(1, activation='sigmoid', name='output')(x)
  model = Model(inputs=input_tensor, outputs=output)
  if verbose:
    model.summary()
  return model

In [None]:
# Pretrained 모델 학습하는 함수
# - learning rate, early stopping callback 제외
# - 반복 횟수는 20, 초기 learning rate는 0.0001
def train_model(data_df, model_name, augmentor, preprocessing_func):
  tr_path, tr_label, val_path, val_label, test_path, test_label = get_train_valid_test(data_df)

  tr_ds = CnD_Dataset(tr_path, tr_label, batch_size=BATCH_SIZE, augmentor=augmentor, 
                      shuffle=True, pre_func=preprocessing_func)
  val_ds = CnD_Dataset(val_path, val_label, batch_size=BATCH_SIZE, augmentor=augmentor, 
                       shuffle=False, pre_func=preprocessing_func)
  model = create_model(model_name=model_name)
  model.compile(optimizer=Adam(0.0001), loss='binary_crossentropy', 
                metrics=['accuracy'])

  N_EPOCHS = 20
  history = model.fit(tr_ds, epochs=N_EPOCHS,
                      steps_per_epoch=int(np.ceil(tr_path.shape[0]/BATCH_SIZE)),
                      validation_data=val_ds,
                      validation_steps=int(np.ceil(val_path.shape[0]/BATCH_SIZE)),
                      verbose=1
                      )
  return model, history

In [None]:
# MobileNetV2로 모델 학습 및 성능 평가.
# - 학습 시간 단축을 위해 학습 데이터 크기를 줄임.
from tensorflow.keras.applications.mobilenet import preprocess_input as mobile_preprocess_input
import tensorflow as tf
input_df, _ = train_test_split(data_df, test_size=0.7, random_state=10)
mobile_model, mobile_history = train_model(input_df, 'mobilenet', None, 
                                           mobile_preprocess_input)


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
test_df = data_df[data_df['dataset'] == 'test']
test_path = test_df['path'].values
test_label = pd.factorize(test_df['label'])[0]

test_ds = CnD_Dataset(test_path, test_label, batch_size=BATCH_SIZE, 
                      augmentor=None, 
                      shuffle=False, 
                      pre_func=mobile_preprocess_input)
mobile_model.evaluate(test_ds)



[10.170909881591797, 0.015323776751756668]

In [None]:
# Pretrained 모델의 Layer별 구조 보기
model = create_model(model_name='mobilenet')
model.summary()

Model: "model_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_4 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_4[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
____________________________________________________________________________________________

In [None]:
# 모델의 전체 layer 출력
print(type(model.layers))
print(model.layers)
model.layers

<class 'list'>
[<keras.engine.input_layer.InputLayer object at 0x7ff4cd849cd0>, <keras.layers.convolutional.Conv2D object at 0x7ff6b8077c90>, <keras.layers.normalization.batch_normalization.BatchNormalization object at 0x7ff4cd85d950>, <keras.layers.advanced_activations.ReLU object at 0x7ff4cd868350>, <keras.layers.convolutional.DepthwiseConv2D object at 0x7ff4cd868b10>, <keras.layers.normalization.batch_normalization.BatchNormalization object at 0x7ff4cd865890>, <keras.layers.advanced_activations.ReLU object at 0x7ff74b03f5d0>, <keras.layers.convolutional.Conv2D object at 0x7ff4cd868e50>, <keras.layers.normalization.batch_normalization.BatchNormalization object at 0x7ff74b056090>, <keras.layers.convolutional.Conv2D object at 0x7ff74b055ed0>, <keras.layers.normalization.batch_normalization.BatchNormalization object at 0x7ff74b04fcd0>, <keras.layers.advanced_activations.ReLU object at 0x7ff4cd86ce10>, <keras.layers.convolutional.ZeroPadding2D object at 0x7ff74b0668d0>, <keras.layers.con

[<keras.engine.input_layer.InputLayer at 0x7ff4cd849cd0>,
 <keras.layers.convolutional.Conv2D at 0x7ff6b8077c90>,
 <keras.layers.normalization.batch_normalization.BatchNormalization at 0x7ff4cd85d950>,
 <keras.layers.advanced_activations.ReLU at 0x7ff4cd868350>,
 <keras.layers.convolutional.DepthwiseConv2D at 0x7ff4cd868b10>,
 <keras.layers.normalization.batch_normalization.BatchNormalization at 0x7ff4cd865890>,
 <keras.layers.advanced_activations.ReLU at 0x7ff74b03f5d0>,
 <keras.layers.convolutional.Conv2D at 0x7ff4cd868e50>,
 <keras.layers.normalization.batch_normalization.BatchNormalization at 0x7ff74b056090>,
 <keras.layers.convolutional.Conv2D at 0x7ff74b055ed0>,
 <keras.layers.normalization.batch_normalization.BatchNormalization at 0x7ff74b04fcd0>,
 <keras.layers.advanced_activations.ReLU at 0x7ff4cd86ce10>,
 <keras.layers.convolutional.ZeroPadding2D at 0x7ff74b0668d0>,
 <keras.layers.convolutional.DepthwiseConv2D at 0x7ff74b06aa90>,
 <keras.layers.normalization.batch_normalizati

In [None]:
# 마직막 4번째 부터 layer 보기
model.layers[-4:]

[<keras.layers.pooling.GlobalAveragePooling2D at 0x7ff74afeae90>,
 <keras.layers.core.Dropout at 0x7ff74ae25610>,
 <keras.layers.core.Dense at 0x7ff4d43787d0>,
 <keras.layers.core.Dense at 0x7ff74add3250>]

In [None]:
# Layer별 trainable 속성 확인
for layer in model.layers:
  print(layer.name, 'trainable: ', layer.trainable)

input_4 trainable:  True
Conv1 trainable:  True
bn_Conv1 trainable:  True
Conv1_relu trainable:  True
expanded_conv_depthwise trainable:  True
expanded_conv_depthwise_BN trainable:  True
expanded_conv_depthwise_relu trainable:  True
expanded_conv_project trainable:  True
expanded_conv_project_BN trainable:  True
block_1_expand trainable:  True
block_1_expand_BN trainable:  True
block_1_expand_relu trainable:  True
block_1_pad trainable:  True
block_1_depthwise trainable:  True
block_1_depthwise_BN trainable:  True
block_1_depthwise_relu trainable:  True
block_1_project trainable:  True
block_1_project_BN trainable:  True
block_2_expand trainable:  True
block_2_expand_BN trainable:  True
block_2_expand_relu trainable:  True
block_2_depthwise trainable:  True
block_2_depthwise_BN trainable:  True
block_2_depthwise_relu trainable:  True
block_2_project trainable:  True
block_2_project_BN trainable:  True
block_2_add trainable:  True
block_3_expand trainable:  True
block_3_expand_BN traina

In [None]:
for layer in model.layers[:-4]:
  layer.trainable = False
  print(layer.name, 'trainable: ', layer.trainable)

print('\n ### final 4 layers ###')
for layer in model.layers[-4:]:
  print(layer.name, 'trainable: ', layer.trainable)


input_4 trainable:  False
Conv1 trainable:  False
bn_Conv1 trainable:  False
Conv1_relu trainable:  False
expanded_conv_depthwise trainable:  False
expanded_conv_depthwise_BN trainable:  False
expanded_conv_depthwise_relu trainable:  False
expanded_conv_project trainable:  False
expanded_conv_project_BN trainable:  False
block_1_expand trainable:  False
block_1_expand_BN trainable:  False
block_1_expand_relu trainable:  False
block_1_pad trainable:  False
block_1_depthwise trainable:  False
block_1_depthwise_BN trainable:  False
block_1_depthwise_relu trainable:  False
block_1_project trainable:  False
block_1_project_BN trainable:  False
block_2_expand trainable:  False
block_2_expand_BN trainable:  False
block_2_expand_relu trainable:  False
block_2_depthwise trainable:  False
block_2_depthwise_BN trainable:  False
block_2_depthwise_relu trainable:  False
block_2_project trainable:  False
block_2_project_BN trainable:  False
block_2_add trainable:  False
block_3_expand trainable:  Fa

In [None]:
# Pretrained 모델에 Fine Tuning 적용
# - 1단계는 10회 epochs로 Feature Extractor layer들의 trainable=False로 설정
#    학습에서 제외하고 Classification layer들만 학습
# - 2단계는 10회 epochs로 전체 layer를 trainable=True로 설정,
#    BatchNormalization layer는 계속 제외, 
#    learing rate는 1단계의 1/10 수준으로 설정
from tensorflow.keras import layers
def train_model_fine_tune(data_df, model_name, augmentor, preprocessing_func):
  tr_path, tr_label, val_path, val_label, test_path, test_label = get_train_valid_test(data_df)

  tr_ds = CnD_Dataset(tr_path, tr_label, batch_size=BATCH_SIZE, augmentor=augmentor, 
                      shuffle=True, pre_func=preprocessing_func)
  val_ds = CnD_Dataset(val_path, val_label, batch_size=BATCH_SIZE, augmentor=augmentor, 
                       shuffle=False, pre_func=preprocessing_func)
  model = create_model(model_name=model_name)
  model.compile(optimizer=Adam(0.0001), loss='binary_crossentropy', 
                metrics=['accuracy'])

  # feature extractor layer들을 freeze
  for layer in model.layers[:-4]:
    layer.trainable = False
  FIRST_EPOCHS = 10
  SECOND_EPOCHS = 10
  # 1단계 fine tuning 학습 수행.
  history = model.fit(tr_ds, epochs=FIRST_EPOCHS,
                      steps_per_epoch=int(np.ceil(tr_path.shape[0]/BATCH_SIZE)),
                      validation_data=val_ds,
                      validation_steps=int(np.ceil(val_path.shape[0]/BATCH_SIZE)),
                      verbose=1)
  # 전체 layer들을 unfreeze, 단 batch normalization layer는 드래도 freeze
  for layer in model.layers:
    if not isinstance(layer, layers.BatchNormalization):
      layer.trainable = True
  # 2단계는 learning rate를 기존 보다 1/10 감소
  model.compile(optimizer=Adam(0.00001), loss='binary_crossentropy',
                metrics=['accuracy'])
  history = model.fit(tr_ds, epochs=FIRST_EPOCHS,
                      steps_per_epoch=int(np.ceil(tr_path.shape[0]/BATCH_SIZE)),
                      validation_data=val_ds,
                      validation_steps=int(np.ceil(val_path.shape[0]/BATCH_SIZE)),
                      verbose=1)
  return model, history


In [None]:
mobile_model_tuned, mobile_tuned_history = train_model_fine_tune(input_df, 
                        'mobilenet',
                        None,
                        mobile_preprocess_input)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
mobile_model_tuned.evaluate(test_ds)



[13.736985206604004, 0.011369253508746624]