### **Fine Tuning, 미세 조정**
- ImageNet으로 학습된 Pretrained Model을 다른 목적 또는 다른 용도로 활용할 때 Feature Extractor의 Weight를 제어하기 위한 기법이다.
- 특정 Layer(층)들을 Freeze시켜서 학습에서 제외시키고 Learning Rate를 점차 감소시켜 적용한다.
- ImageNet과 유사한 데이터 세트이거나 개별 클래스 별로 데이터 건수가 작을 경우 사용한다.
- Fine Tuning이 언제나 모델이 좋은 성능을 가져오는 것이 아니기 때문에 적절히 사용할 수 있어야 한다.
- 먼저 Classification Layers에만 학습을 시킨 뒤 전체에 학습을 시키는 순서로 진행하게 되며, 이를 위해 fit을 최소 2번 사용한다.
- 층별로 Freeze 혹은 UnFreeze 결정을 위해 미세 조정을 진행 시, 학습률이 높으면 이전 지식을 잃을 수 있기 때문에 작은 학습률을 사용한다.

<div style="display: flex;">
    <div>
        <img src="./images/transfer_learning03.png" width="600">
    </div>
    <div>
        <img src="./images/transfer_learning04.png" width="500" style="margin-left: -80px">
    </div>
</div>

In [1]:
# 이미 훈련된 데이터와 내가 할 데이터 두개를 비교했을 때,
# 유사도
# 개수
# 가 높고 낮음에 따라 4가지 경우가 나온다. 이에(상황) 따라 각각 다르게 해야한다.
# 1사분면 : 사전훈련모델의 일부만 훈련시킨다. 레이블을 구성할때 비활성화해서 일부만.
# 2사분면 : 전체 다 훈련 
# 3사분면 : 1처럼 일부부분만 훈련시키되, (훈련은 아래 층으로 갈수록 섬세한 패턴을 찾아낸다) 마지막 층만 훈련시키는 느낌
# 4사분면 : 다 날리고 마지막 부분(classifier) 만 훈련시킨다. 
# 1, 3이 비슷하여 총 전략이 3가지가 있다고 본다.

In [2]:
# Feature Extractor : CNN 부분
# 학습시간을 단축시키기 위해 하는것 -> 미세 조정
# 정확도 높이려고 쓰는게 아님

In [3]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
# 데이터 디렉토리 경로 설정
root = './datasets/animals/original/'

# ImageDataGenerator 객체를 생성하여 이미지 정규화(rescale)
idg = ImageDataGenerator(rescale=1./255)

# 이미지를 읽어오기
generator = idg.flow_from_directory(root, target_size=(64, 64), batch_size=32, class_mode='categorical')
# 각 클래스 인덱스 출력
print(generator.class_indices)

Found 26179 images belonging to 10 classes.
{'butterfly': 0, 'cat': 1, 'chicken': 2, 'cow': 3, 'dog': 4, 'elephant': 5, 'horse': 6, 'sheep': 7, 'spider': 8, 'squirrel': 9}


In [4]:
# {인덱스 번호 : 클래스 이름} 형태로 변환하여 딕셔너리로 만들어 target_name에 저장
target_name = {v: k for k, v in generator.class_indices.items()}
target_name

{0: 'butterfly',
 1: 'cat',
 2: 'chicken',
 3: 'cow',
 4: 'dog',
 5: 'elephant',
 6: 'horse',
 7: 'sheep',
 8: 'spider',
 9: 'squirrel'}

In [5]:
# 리스트 생성
target_names = []
# target_names에 인덱스에 해당하는 클래스이름을 담아주는 반복문
for target in generator.classes:
    target_names.append(target_name[target])

In [6]:
import pandas as pd

# 데이터프레임 생성
animal_df = pd.DataFrame({'file_paths': generator.filepaths, 'target_names': target_names, 'targets': generator.classes})
# 파일 경로에서 \\를 /로 변환
animal_df.file_paths = animal_df.file_paths.apply(lambda file_path: file_path.replace('\\', '/'))
# 데이터프레임 출력
animal_df

Unnamed: 0,file_paths,target_names,targets
0,./datasets/animals/original/butterfly/butterfl...,butterfly,0
1,./datasets/animals/original/butterfly/butterfl...,butterfly,0
2,./datasets/animals/original/butterfly/butterfl...,butterfly,0
3,./datasets/animals/original/butterfly/butterfl...,butterfly,0
4,./datasets/animals/original/butterfly/butterfl...,butterfly,0
...,...,...,...
26174,./datasets/animals/original/squirrel/squirrel9...,squirrel,9
26175,./datasets/animals/original/squirrel/squirrel9...,squirrel,9
26176,./datasets/animals/original/squirrel/squirrel9...,squirrel,9
26177,./datasets/animals/original/squirrel/squirrel9...,squirrel,9


In [7]:
from sklearn.model_selection import train_test_split

# 데이터를 train과 test로 나누기
train_images, test_images, train_targets, test_targets = \
train_test_split(animal_df.file_paths, 
                 animal_df.targets, 
                  # stratify 매개변수에 animal_df.targets를 지정하여, 분할된 데이터셋에서도 클래스 비율을 동일하게 유지
                 stratify=animal_df.targets, 
                 test_size=0.2, random_state=124)

# 출력하기
print(train_targets.value_counts())
print(test_targets.value_counts())

targets
4    3890
8    3857
2    2478
6    2098
0    1690
3    1493
9    1490
7    1456
1    1334
5    1157
Name: count, dtype: int64
targets
4    973
8    964
2    620
6    525
0    422
3    373
9    372
7    364
1    334
5    289
Name: count, dtype: int64


In [8]:
from sklearn.model_selection import train_test_split
# train 데이터를 train과 validation 나누기
train_images, validation_images, train_targets, validation_targets = \
train_test_split(train_images, 
                 train_targets, 
                 stratify=train_targets, 
                 test_size=0.2, random_state=124)

# 출력하기
print(train_targets.value_counts())
print(validation_targets.value_counts())
print(test_targets.value_counts())

targets
4    3112
8    3086
2    1982
6    1678
0    1352
3    1194
9    1192
7    1165
1    1067
5     926
Name: count, dtype: int64
targets
4    778
8    771
2    496
6    420
0    338
3    299
9    298
7    291
1    267
5    231
Name: count, dtype: int64
targets
4    973
8    964
2    620
6    525
0    422
3    373
9    372
7    364
1    334
5    289
Name: count, dtype: int64


In [9]:
# 원래의 데이터프레임에서 train_images.index에 해당하는 행을 선택하여 데이터프레임을 생성하고 인덱스 초기화
train_df = animal_df.iloc[train_images.index].reset_index(drop=True)
validation_df = animal_df.iloc[validation_images.index].reset_index(drop=True)
test_df = animal_df.iloc[test_images.index].reset_index(drop=True)

# 출력하기 
print(train_df.shape)
print(validation_df.shape)
print(test_df.shape)

(16754, 3)
(4189, 3)
(5236, 3)


In [10]:
import numpy as np
from tensorflow.keras.utils import Sequence
from sklearn.utils import shuffle
import cv2

# 이미지와 배치사이즈 선언
IMAGE_SIZE = 224
BATCH_SIZE = 64

# Dataset class를 만들어서 Sequence를 상속받아서 만들어서 사용한다
class Dataset(Sequence):
    
    # 생성자
    # 훈련할 경로로, 타겟, 배치사이즈, aug, preprocess를 전달하는데 augmentation은 객체를 전달해야한다.
    # Dataset을 객체화 할 때 aug에 객체를 전달하면 cv2로 가져온 이미지를 넣어주면서 적용되는 원리이다.
    def __init__(self, file_paths, targets, batch_size=BATCH_SIZE, aug=None, preprocess=None, shuffle=False):
        self.file_paths = file_paths
        self.targets = targets
        self.batch_size = batch_size
        self.aug = aug
        # 전처리 함수 (preprocess)
        self.preprocess = preprocess
        self.shuffle = shuffle

        # 1 에포크당 데이터셋 객체가 자동으로 객체화 되며, 1 에포크당 1번의 새로운 데이터가 만들어진다.
        # 에포크 종료시, 객체 생성 및 데이터 섞기 (매 에포크당 새로운 객체를 쓸 수 있게 만들어준다)
        if self.shuffle:
            self.on_epoch_end()
            
    # len 재정의 함수 (하나의 배치사이즈에 필요한 개수를 리턴해주기 위해, 데이터의 길이를 리턴) 
    def __len__(self):
        # (반올림(전체 개수 / 전달받은 배치사이즈 개수)) 전체가 float 이기때문에 int로 형변환 
        return int(np.ceil(len(self.targets) / self.batch_size))

    # len 의 개수에 맞춰 getitem으로 해당하는 데이터를 가져오며 그걸 fit 하는 원리이다.
    # 해당 인덱스에 해당하는 batch_size 단위로 이미지 배열과 타켓 데이터들을 가져온 뒤 변환한 값을 리턴한다.
    def __getitem__(self, index):
        
        # index 번호에 맞는 데이터들을 가져온다.
        file_paths_batch = self.file_paths[index * self.batch_size: (index + 1) * self.batch_size]
        targets_batch = self.targets[index * self.batch_size: (index + 1) * self.batch_size]

        # file_path_batch 행의 개수(데이터 수)만큼 4차원으로 nd array를 만들어야한다. (배치사이즈, 길이, 높이, 깊이)
        results_batch = np.zeros((file_paths_batch.shape[0], IMAGE_SIZE, IMAGE_SIZE, 3))

        # 이미지 전처리(aug)는 1장씩 하기때머문에, 반복문을 통해 하나씩 실행하고 그걸 전부 리스트에 담아서 리턴한다.
        for i in range(file_paths_batch.shape[0]):
            
            # 파일 경로에서 이미지를 읽어온 후 BGR에서 RGB로 변환
            image = cv2.cvtColor(cv2.imread(file_paths_batch[i]), cv2.COLOR_BGR2RGB)
            # 원본 이미지를 선언해놓은 이미지 사이즈로 재조정
            image = cv2.resize(image, (IMAGE_SIZE, IMAGE_SIZE))

            # augmentation이 있다면 그걸 이미지에 적용
            if self.aug is not None:
                image = self.aug(image=image)['image']

            # preprocess가 전달됐다면 이미지 전처리 적용
            if self.preprocess is not None:
                image = self.preprocess(image)
                    
            # 변환이 다 끝난 이미지 저장
            results_batch[i] = image

        # 처리된 이미지 결과와 타겟을 리턴
        return results_batch, targets_batch

    # 1 epoch가 끝날 때마다 shuffle로 데이터를 섞어주는 함수
    def on_epoch_end(self):
        
        # shuffle 메소드를 사용하여 한 쌍씩(이미지, 타겟) 섞기
        if self.shuffle:
            self.file_paths, self.targets = shuffle(self.file_paths, self.targets)        

In [11]:
import albumentations as A
from tensorflow.keras.applications.xception import preprocess_input as xception_preprocess_input

# 학습에 사용할 경로와 타겟을 values만 가져와서 리스트로 쓰기
train_file_paths = train_df['file_paths'].values
# train_targets = train_df['targets'].values # SparseCategoricalCrossEntropy (레이블이 정수일 때 원핫인코딩을 해줌)
train_targets = pd.get_dummies(train_df['targets']).values # CategoricalCrossEntropy (원핫인코딩이 되어있을때 사용)

validation_file_paths = validation_df['file_paths'].values
# validation_targets = validation_df['targets'].values # SparseCategoricalCrossEntropy (레이블이 정수일 때 원핫인코딩을 해줌)
validation_targets = pd.get_dummies(validation_df['targets']).values # CategoricalCrossEntropy (원핫인코딩이 되어있을때 사용)

test_file_paths = test_df['file_paths'].values
# test_targets = test_df['targets'].values # SparseCategoricalCrossEntropy (레이블이 정수일 때 원핫인코딩을 해줌)
test_targets = pd.get_dummies(test_df['targets']).values # CategoricalCrossEntropy (원핫인코딩이 되어있을때 사용)

# 데이터 증강(augmentation) 정의
aug = A.Compose([
    A.ShiftScaleRotate(p=0.5),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0, p=0.5)
])

# 데이터세트 생성하기
train_dataset = Dataset(train_file_paths, 
                        train_targets, 
                        batch_size=BATCH_SIZE, 
                        aug=aug, 
                        preprocess=xception_preprocess_input, 
                        shuffle=True)

validation_dataset = Dataset(validation_file_paths, 
                        validation_targets, 
                        batch_size=BATCH_SIZE, 
                        preprocess=xception_preprocess_input)

test_dataset = Dataset(test_file_paths, 
                        test_targets, 
                        batch_size=BATCH_SIZE, 
                        preprocess=xception_preprocess_input)

In [12]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense , Conv2D , Dropout , Flatten , Activation, MaxPooling2D , GlobalAveragePooling2D
from tensorflow.keras.layers import BatchNormalization

from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.applications import Xception
from tensorflow.keras.applications import MobileNetV2

def create_model(model_name='vgg16', verbose=False):
    input_tensor = Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
    if model_name == 'vgg16':
        model = VGG16(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name == 'resnet50': # ResNet50, 74.9% ; ResNet50V2, 76.0%
        model = ResNet50V2(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name == 'xception': # Inception을 기초로 한 모델
        model = Xception(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name == 'mobilenet':
        model = MobileNetV2(input_tensor=input_tensor, include_top=False, weights='imagenet')

    x = model.output

    # 분류기
    x = GlobalAveragePooling2D()(x)
    if model_name != 'vgg16':
        x = Dropout(rate=0.5)(x)
    x = Dense(50, activation='relu')(x)
    if model_name != 'vgg16':
        x = Dropout(rate=0.5)(x)
    output = Dense(7, activation='softmax', name='output')(x)
    
    model = Model(inputs=input_tensor, outputs=output)
    
    if verbose:
        model.summary()
    
    return model

In [13]:
from tensorflow.keras.losses import SparseCategoricalCrossentropy, CategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Accuracy

model = create_model(model_name='mobilenet', verbose=True)
# model.compile(optimizer=Adam(), loss=SparseCategoricalCrossentropy(), metrics=['acc'])
model.compile(optimizer=Adam(), loss=CategoricalCrossentropy(), metrics=['acc'])

  model = MobileNetV2(input_tensor=input_tensor, include_top=False, weights='imagenet')


In [15]:
# 레이어에 대한 정보를 가져올 수 있어야 층을 확인하고 Freeze를 시킬 수 있다.
# 모델 레이어스를 인덱싱해서 번호로 정확하게 보면 더 편하다
model.layers

[<InputLayer name=input_layer, built=True>,
 <Conv2D name=Conv1, built=True>,
 <BatchNormalization name=bn_Conv1, built=True>,
 <ReLU name=Conv1_relu, built=True>,
 <DepthwiseConv2D name=expanded_conv_depthwise, built=True>,
 <BatchNormalization name=expanded_conv_depthwise_BN, built=True>,
 <ReLU name=expanded_conv_depthwise_relu, built=True>,
 <Conv2D name=expanded_conv_project, built=True>,
 <BatchNormalization name=expanded_conv_project_BN, built=True>,
 <Conv2D name=block_1_expand, built=True>,
 <BatchNormalization name=block_1_expand_BN, built=True>,
 <ReLU name=block_1_expand_relu, built=True>,
 <ZeroPadding2D name=block_1_pad, built=True>,
 <DepthwiseConv2D name=block_1_depthwise, built=True>,
 <BatchNormalization name=block_1_depthwise_BN, built=True>,
 <ReLU name=block_1_depthwise_relu, built=True>,
 <Conv2D name=block_1_project, built=True>,
 <BatchNormalization name=block_1_project_BN, built=True>,
 <Conv2D name=block_2_expand, built=True>,
 <BatchNormalization name=block_2

In [None]:
# Freeze는 무조건 위에서 아래로

In [None]:
# for layer in model.layers:
#     # 각 레이어가 학습 가능한 상태인지 확인
#     print(layer.trainable)
#     # 전체 Freeze
#     layer.trainable = False

In [None]:
# # 분류기 부분을 제외한 나머지 출력해보기
# model.layers[:-5]

In [16]:
from tensorflow.keras import layers

# 무조건 먼저 전체 다 False로 시작.
for layer in model.layers[:-5]:
    layer.trainable = False

# 학습하기
# model.fit()

for layer in model.layers:
    # 상태를 바꾸고싶은 객체, 클래스를 전달
    if not isinstance(layer, layers.BatchNormalization):
        layer.trainable = True

# 러닝레이트 0.00001 정도를 기본으로 주고 시작할 것
# model.compile(optimizer=Adam(0.00001))
# 학습하기
# model.fit()

In [None]:
# 위를 함수로 만들기

In [17]:
from tensorflow.keras.losses import SparseCategoricalCrossentropy, CategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Accuracy

IMAGE_SIZE = 224
BATCH_SIZE = 64

def fine_tune(datas, model_name, aug, preprocess):
    FIRST_EPOCHS = 10
    SECOND_EPOCHS = 10
    
    # datas 를 튜플로 전달 받아서 언패킹
    train_file_paths, train_targets, \
    validation_file_paths, validation_targets, \
    test_file_paths, test_targets = datas

    # 위에서 만든 dataset에 작업
    train_dataset = Dataset(train_file_paths, 
                            train_targets, 
                            batch_size=BATCH_SIZE, 
                            aug=aug, 
                            preprocess=xception_preprocess_input, 
                            shuffle=True)
    
    validation_dataset = Dataset(validation_file_paths, 
                            validation_targets, 
                            batch_size=BATCH_SIZE, 
                            preprocess=xception_preprocess_input)

    # 모델 만들기
    model = create_model(model_name=model_name, verbose=True)
    model.compile(optimizer=Adam(0.00001), loss=CategoricalCrossentropy(), metrics=['acc'])

    # feature extractor layer 들을 전부 freeze
    for layer in model.layers[:-5]:
        layer.trainable = False

    # 분류기만 훈련
    model.fit(train_dataset, 
                    batch_size=BATCH_SIZE, 
                    epochs=FIRST_EPOCHS, 
                    validation_data=validation_dataset)

    # 배치정규화만 freeze 진행
    for layer in model.layers:
        if not isinstance(layer, layers.BatchNormalization):
            layer.trainable = True

    # 부분 freeze 진행 : 직접 수를 세어 슬라이싱으로 넣어줘야한다.

    # 훈련
    model.compile(optimizer=Adam(0.00001), loss=CategoricalCrossentropy(), metrics=['acc'])
    model.fit(train_dataset, 
                batch_size=BATCH_SIZE, 
                epochs=SECOND_EPOCHS, 
                validation_data=validation_dataset)

    return model, history
    

In [18]:
import albumentations as A
from tensorflow.keras.applications.xception import preprocess_input as mobilenet_preprocess_input

# 학습에 사용할 경로와 타겟을 values만 가져와서 리스트로 쓰기
train_file_paths = train_df['file_paths'].values
train_targets = pd.get_dummies(train_df['targets']).values # CategoricalCrossEntropy

validation_file_paths = validation_df['file_paths'].values
validation_targets = pd.get_dummies(validation_df['targets']).values # CategoricalCrossEntropy

test_file_paths = test_df['file_paths'].values
test_targets = pd.get_dummies(test_df['targets']).values # CategoricalCrossEntropy

# 데이터 증강(augmentation) 정의
aug = A.Compose([
    A.ShiftScaleRotate(p=0.5),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0, p=0.5)
])

# datas 전달 
model, history = fine_tune((train_file_paths, train_targets,
           validation_file_paths, validation_targets,
           test_file_paths, test_targets),
          'mobilenet', 
          aug,
          mobilenet_preprocess_input)

  model = MobileNetV2(input_tensor=input_tensor, include_top=False, weights='imagenet')


Epoch 1/10


ValueError: Arguments `target` and `output` must have the same shape. Received: target.shape=(None, 10), output.shape=(None, 7)

In [19]:
# freeze 확인하기
for i, layer in enumerate(model.layers[:-5]):
    layer.trainable = False
    print(i + 1, '.', layer.name, 'trainable:', layer.trainable)

print('\n######### classifier layers ######### ')
for layer in model.layers[-5:]:
    print(layer.name, 'trainable:', layer.trainable)

1 . input_layer trainable: False
2 . Conv1 trainable: False
3 . bn_Conv1 trainable: False
4 . Conv1_relu trainable: False
5 . expanded_conv_depthwise trainable: False
6 . expanded_conv_depthwise_BN trainable: False
7 . expanded_conv_depthwise_relu trainable: False
8 . expanded_conv_project trainable: False
9 . expanded_conv_project_BN trainable: False
10 . block_1_expand trainable: False
11 . block_1_expand_BN trainable: False
12 . block_1_expand_relu trainable: False
13 . block_1_pad trainable: False
14 . block_1_depthwise trainable: False
15 . block_1_depthwise_BN trainable: False
16 . block_1_depthwise_relu trainable: False
17 . block_1_project trainable: False
18 . block_1_project_BN trainable: False
19 . block_2_expand trainable: False
20 . block_2_expand_BN trainable: False
21 . block_2_expand_relu trainable: False
22 . block_2_depthwise trainable: False
23 . block_2_depthwise_BN trainable: False
24 . block_2_depthwise_relu trainable: False
25 . block_2_project trainable: False
2

In [None]:
# 모델과 히스토리로 그래프 그리기