### 개와 고양이 데이터 세트의 디렉토리와 파일을 기반으로 메타 정보용 DataFrame 생성 

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

def make_catndog_dataframe():
    paths = []
    dataset_gubuns = []
    label_gubuns = []
    # os.walk()를 이용하여 특정 디렉토리 밑에 있는 모든 하위 디렉토리를 모두 조사. 
    # cat-and-dog 하위 디렉토리 밑에 jpg 확장자를 가진 파일이 모두 이미지 파일임
    # cat-and-dog 밑으로 /train/, /test/ 하위 디렉토리 존재(학습, 테스트 용 이미지 파일들을 가짐)

    for dirname, _, filenames in os.walk('/kaggle/input/cat-and-dog'):
        for filename in filenames:
            # 이미지 파일이 아닌 파일도 해당 디렉토리에 있음.
            if '.jpg' in filename:
                # 파일의 절대 경로를 file_path 변수에 할당. 
                file_path = dirname+'/'+ filename
                paths.append(file_path)
                # 파일의 절대 경로에 training_set, test_set가 포함되어 있으면 데이터 세트 구분을 'train'과 'test'로 분류. 
                if '/training_set/' in file_path:
                    dataset_gubuns.append('train')  
                elif '/test_set/' in file_path:
                    dataset_gubuns.append('test')
                else: dataset_gubuns.append('N/A')

                # 파일의 절대 경로에 dogs가 있을 경우 해당 파일은 dog 이미지 파일이고, cats일 경우는 cat 이미지 파일임. 
                if 'dogs' in file_path:
                    label_gubuns.append('DOG')
                elif 'cats' in file_path:
                    label_gubuns.append('CAT')
                else: label_gubuns.append('N/A')
    
    data_df = pd.DataFrame({'path':paths, 'dataset':dataset_gubuns, 'label':label_gubuns})
    return data_df

In [2]:
pd.set_option('display.max_colwidth', 200)
data_df = make_catndog_dataframe()
print('data_df shape:', data_df.shape)
data_df.head()

data_df shape: (10028, 3)


Unnamed: 0,path,dataset,label
0,/kaggle/input/cat-and-dog/test_set/test_set/dogs/dog.4329.jpg,test,DOG
1,/kaggle/input/cat-and-dog/test_set/test_set/dogs/dog.4223.jpg,test,DOG
2,/kaggle/input/cat-and-dog/test_set/test_set/dogs/dog.4253.jpg,test,DOG
3,/kaggle/input/cat-and-dog/test_set/test_set/dogs/dog.4190.jpg,test,DOG
4,/kaggle/input/cat-and-dog/test_set/test_set/dogs/dog.4354.jpg,test,DOG


### 주요 사용 라이브러리 재 로딩
* Sequence를 상속받은 Dataset 객체를 생성
* 학습/검증/테스트 데이터 세트 분할
* Pretrained 모델 생성

In [3]:
from tensorflow.keras.utils import Sequence
import sklearn 
import cv2

# 배치 크기와 이미지 크기를 전역 변수로 선언 
BATCH_SIZE = 64
IMAGE_SIZE = 160

# 입력 인자 image_filenames, labels는 모두 numpy array로 들어옴. 
class CnD_Dataset(Sequence):
    def __init__(self, image_filenames, labels, batch_size = BATCH_SIZE, augmentor = None, shuffle = False, pre_func = None):
        '''
        파라미터 설명
        image_filenames: opencv로 image를 로드할 파일의 절대 경로들
        labels: 해당 image의 label들
        batch_size: __getitem__(self, index) 호출 시 마다 가져올 데이터 batch 건수
        augmentor: albumentations 객체
        shuffle: 학습 데이터의 경우 epoch 종료시마다 데이터를 섞을지 여부
        '''
        # 객체 생성 인자로 들어온 값을 객체 내부 변수로 할당. 
        self.image_filenames = image_filenames
        self.labels = labels
        self.batch_size = batch_size
        self.augmentor = augmentor
        self.pre_func = pre_func
        # train data의 경우 
        self.shuffle = shuffle
        if self.shuffle:
            # 객체 생성시에 한번 데이터를 섞음. 
            #self.on_epoch_end()
            pass
    
    # Sequence를 상속받은 Dataset은 batch_size 단위로 입력된 데이터를 처리함. 
    # __len__()은 전체 데이터 건수가 주어졌을 때 batch_size단위로 몇번 데이터를 반환하는지 나타남
    def __len__(self):
        # batch_size단위로 데이터를 몇번 가져와야하는지 계산하기 위해 전체 데이터 건수를 batch_size로 나누되, 정수로 정확히 나눠지지 않을 경우 1회를 더한다. 
        return int(np.ceil(len(self.image_filenames) / BATCH_SIZE))
    
    # batch_size 단위로 image_array, label_array 데이터를 가져와서 변환한 뒤 다시 반환함
    # 인자로 몇번째 batch 인지를 나타내는 index를 입력하면 해당 순서에 해당하는 batch_size 만큼의 데이타를 가공하여 반환
    # batch_size 갯수만큼 변환된 image_array와 label_array 반환. 
    def __getitem__(self, index):
        # index는 몇번째 batch인지를 나타냄. 
        # batch_size만큼 순차적으로 데이터를 가져오려면 array에서 index*self.batch_size:(index+1)*self.batch_size 만큼의 연속 데이터를 가져오면 됨
        image_name_batch = self.image_filenames[index * self.batch_size : (index + 1) * self.batch_size]
        if self.labels is not None:
            label_batch = self.labels[index * self.batch_size : (index + 1) * self.batch_size]
        
        # 만일 객체 생성 인자로 albumentation으로 만든 augmentor가 주어진다면 아래와 같이 augmentor를 이용하여 image 변환
        # albumentations은 개별 image만 변환할 수 있으므로 batch_size만큼 할당된 image_name_batch를 한 건씩 iteration하면서 변환 수행. 
        # image_batch 배열은 float32 로 설정. 
        image_batch = np.zeros((image_name_batch.shape[0], IMAGE_SIZE, IMAGE_SIZE, 3), dtype = 'float32')
        
        # batch_size에 담긴 건수만큼 iteration 하면서 opencv image load -> image augmentation 변환(augmentor가 not None일 경우)-> image_batch에 담음. 
        for image_index in range(image_name_batch.shape[0]):
            image = cv2.cvtColor(cv2.imread(image_name_batch[image_index]), cv2.COLOR_BGR2RGB)
            image = cv2.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
            if self.augmentor is not None:
                image = self.augmentor(image = image)['image']
            
            # 만일 preprocessing_input이 pre_func인자로 들어오면 이를 이용하여 scaling 적용. 
            if self.pre_func is not None:
                image = self.pre_func(image)
                
            image_batch[image_index] = image
        
        return image_batch, label_batch
    
    # epoch가 한번 수행이 완료 될 때마다 모델의 fit()에서 호출됨. 
    def on_epoch_end(self):
        if(self.shuffle):
            #print('epoch end')
            # 전체 image 파일의 위치와 label를 쌍을 맞춰서 섞어준다. scikt learn의 utils.shuffle에서 해당 기능 제공
            self.image_filenames, self.labels = sklearn.utils.shuffle(self.image_filenames, self.labels)
        else:
            pass

In [4]:
from sklearn.model_selection import train_test_split

# 학습 데이터의 50%를 검증 데이터에 할당. 
def get_train_valid_test(data_df):
    # 학습 데이터와 테스트 데이터용 Dataframe 생성. 
    train_df = data_df[data_df['dataset'] == 'train']
    test_df = data_df[data_df['dataset'] == 'test']

    # 학습 데이터의 image path와 label을 Numpy array로 변환 및 Label encoding
    train_path = train_df['path'].values
    train_label = pd.factorize(train_df['label'])[0]
    
    test_path = test_df['path'].values
    test_label = pd.factorize(test_df['label'])[0]

    tr_path, val_path, tr_label, val_label = train_test_split(train_path, train_label, test_size = 0.5, random_state = 2021)
    print('학습용 path shape:', tr_path.shape, '검증용 path shape:', val_path.shape, 
      '학습용 label shape:', tr_label.shape, '검증용 label shape:', val_label.shape)
    return tr_path, tr_label, val_path, val_label, test_path, test_label

In [5]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Dense , Conv2D , Dropout , Flatten , Activation, MaxPooling2D , GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam , RMSprop 
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau , EarlyStopping , ModelCheckpoint , LearningRateScheduler
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.applications import Xception, MobileNetV2

def create_model(model_name='mobilenet', verbose = False):
    
    input_tensor = Input(shape = (IMAGE_SIZE, IMAGE_SIZE, 3))
    if model_name == 'vgg16':
        base_model = VGG16(input_tensor = input_tensor, include_top = False, weights = 'imagenet')
    elif model_name == 'resnet50':
        base_model = ResNet50V2(input_tensor = input_tensor, include_top = False, weights = 'imagenet')
    elif model_name == 'xception':
        base_model = Xception(input_tensor = input_tensor, include_top = False, weights = 'imagenet')
    elif model_name == 'mobilenet':
        base_model = MobileNetV2(input_tensor = input_tensor, include_top = False, weights = 'imagenet')
    
    bm_output = base_model.output

    x = GlobalAveragePooling2D()(bm_output)
    if model_name != 'vgg16':
        x = Dropout(rate = 0.5)(x)
    x = Dense(50, activation = 'relu', name = 'fc1')(x)
    # 최종 output 출력을 softmax에서 sigmoid로 변환. 
    output = Dense(1, activation = 'sigmoid', name = 'output')(x)

    model = Model(inputs = input_tensor, outputs = output)
    
    if verbose:
        model.summary()
        
    return model

### Pretrained 모델을 학습하는 함수 생성. 
* learning rate를 조정 및 early stopping callback 제외 
* 반복 횟수는 20회, 초기 learning rate는 0.0001

In [6]:
from tensorflow.keras.applications.mobilenet import preprocess_input as mobile_preprocess_input
   
def train_model(data_df, model_name, augmentor, preprocessing_func):
    # 학습/검증/테스트용 이미지 파일 절대경로와 Label encoding 된 데이터 세트 반환
    tr_path, tr_label, val_path, val_label, test_path, test_label = get_train_valid_test(data_df)
    
    # 학습과 검증용 Sequence Dataset 생성. 
    tr_ds = CnD_Dataset(tr_path, tr_label, batch_size = BATCH_SIZE, augmentor = augmentor, 
                          shuffle = True, pre_func = preprocessing_func)
    val_ds = CnD_Dataset(val_path, val_label, batch_size = BATCH_SIZE, augmentor = None, 
                           shuffle = False, pre_func = preprocessing_func)
    
    # 입력된 model_name에 따라 모델 생성. 
    model = create_model(model_name = model_name)
    # 최종 output 출력을 softmax에서 sigmoid로 변환되었으므로 binary_crossentropy로 변환 
    model.compile(optimizer = Adam(0.0001), loss = 'binary_crossentropy', metrics = ['accuracy'])

    # 2번 iteration내에 validation loss가 향상되지 않으면 learning rate을 기존 learning rate * 0.2로 줄임.  
    #rlr_cb = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, mode='min', verbose=1)
    # 5번 iteration내에 validation loss가 향상되지 않으면 더 이상 학습하지 않고 종료
    #ely_cb = EarlyStopping(monitor='val_loss', patience=7, mode='min', verbose=1)

    N_EPOCHS = 20
    # 학습 수행. 
    history = model.fit(tr_ds, epochs = N_EPOCHS, steps_per_epoch = int(np.ceil(tr_path.shape[0] / BATCH_SIZE)), 
                       validation_data = val_ds, validation_steps = int(np.ceil(val_path.shape[0] / BATCH_SIZE)),
                        verbose = 1)
    
    return model, history

### MobileNetV2로 모델 학습 및 성능 평가. 
* 학습 시간 단축을 위해 학습 데이터 크기를 줄임. 학습/검증/테스트로 쪼개질 데이터를 전체 데이터의 30%로 설정

In [7]:
from tensorflow.keras.applications.mobilenet import preprocess_input as mobile_preprocess_input
import tensorflow as tf

# 학습/검증/테스트로 쪼개질 데이터를 전체 데이터의 30%로 설정. 
input_df, _ = train_test_split(data_df, test_size = 0.7, random_state = 2021)

mobile_model, mobile_history = train_model(input_df, 'mobilenet', None, mobile_preprocess_input)

학습용 path shape: (1202,) 검증용 path shape: (1202,) 학습용 label shape: (1202,) 검증용 label shape: (1202,)


2022-02-01 16:58:10.653238: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-02-01 16:58:10.776719: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-02-01 16:58:10.777840: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-02-01 16:58:10.780334: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5


2022-02-01 16:58:15.881271: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


Epoch 1/20


2022-02-01 16:58:22.677196: I tensorflow/stream_executor/cuda/cuda_dnn.cc:369] Loaded cuDNN version 8005


Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [8]:
test_df = data_df[data_df['dataset'] == 'test']

# 테스트 데이터의 image path와 label을 Numpy array로 변환 및 Label encoding
test_path = test_df['path'].values
test_label = pd.factorize(test_df['label'])[0]

test_ds = CnD_Dataset(test_path, test_label, batch_size = BATCH_SIZE, augmentor = None, 
                       shuffle = False, pre_func = mobile_preprocess_input)

mobile_model.evaluate(test_ds)



[0.07461676746606827, 0.9817103147506714]

### Pretrained 모델의 Layer별 구조 보기

In [9]:
model = create_model(model_name = 'mobilenet')
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 160, 160, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 80, 80, 32)   864         input_2[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 80, 80, 32)   128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 80, 80, 32)   0           bn_Conv1[0][0]                   
____________________________________________________________________________________________

In [10]:
# 모델의 전체 layer출력
#print(type(model.layers))
#print(model.layers)
model.layers

[<keras.engine.input_layer.InputLayer at 0x7fdb241a6290>,
 <keras.layers.convolutional.Conv2D at 0x7fdb9ee23b50>,
 <keras.layers.normalization.batch_normalization.BatchNormalization at 0x7fdb9faa2d50>,
 <keras.layers.advanced_activations.ReLU at 0x7fdb24140710>,
 <keras.layers.convolutional.DepthwiseConv2D at 0x7fdb24305f90>,
 <keras.layers.normalization.batch_normalization.BatchNormalization at 0x7fdb241bbb50>,
 <keras.layers.advanced_activations.ReLU at 0x7fdb24140ed0>,
 <keras.layers.convolutional.Conv2D at 0x7fdb9c071d10>,
 <keras.layers.normalization.batch_normalization.BatchNormalization at 0x7fdb9ee12590>,
 <keras.layers.convolutional.Conv2D at 0x7fdb24157910>,
 <keras.layers.normalization.batch_normalization.BatchNormalization at 0x7fdb24152a90>,
 <keras.layers.advanced_activations.ReLU at 0x7fdb241572d0>,
 <keras.layers.convolutional.ZeroPadding2D at 0x7fdb2416a850>,
 <keras.layers.convolutional.DepthwiseConv2D at 0x7fdb9c0f4110>,
 <keras.layers.normalization.batch_normalizati

In [11]:
# 마지막 4번째에서 마지막 Layer 보기
model.layers[-4 :]


[<keras.layers.pooling.GlobalAveragePooling2D at 0x7fdb0cf93cd0>,
 <keras.layers.core.Dropout at 0x7fdb0ced15d0>,
 <keras.layers.core.Dense at 0x7fdb9c0563d0>,
 <keras.layers.core.Dense at 0x7fdb0ce8ac50>]

### Layer별 trainable 속성 확인

In [12]:
for layer in model.layers:
    print(layer.name, 'trainable:', layer.trainable)

input_2 trainable: True
Conv1 trainable: True
bn_Conv1 trainable: True
Conv1_relu trainable: True
expanded_conv_depthwise trainable: True
expanded_conv_depthwise_BN trainable: True
expanded_conv_depthwise_relu trainable: True
expanded_conv_project trainable: True
expanded_conv_project_BN trainable: True
block_1_expand trainable: True
block_1_expand_BN trainable: True
block_1_expand_relu trainable: True
block_1_pad trainable: True
block_1_depthwise trainable: True
block_1_depthwise_BN trainable: True
block_1_depthwise_relu trainable: True
block_1_project trainable: True
block_1_project_BN trainable: True
block_2_expand trainable: True
block_2_expand_BN trainable: True
block_2_expand_relu trainable: True
block_2_depthwise trainable: True
block_2_depthwise_BN trainable: True
block_2_depthwise_relu trainable: True
block_2_project trainable: True
block_2_project_BN trainable: True
block_2_add trainable: True
block_3_expand trainable: True
block_3_expand_BN trainable: True
block_3_expand_rel

In [13]:
for layer in model.layers[: -4]:
    layer.trainable = False
    print(layer.name, 'trainable:', layer.trainable)

print('\n### final 4 layers ### ')
for layer in model.layers[-4 :]:
    print(layer.name, 'trainable:', layer.trainable)

input_2 trainable: False
Conv1 trainable: False
bn_Conv1 trainable: False
Conv1_relu trainable: False
expanded_conv_depthwise trainable: False
expanded_conv_depthwise_BN trainable: False
expanded_conv_depthwise_relu trainable: False
expanded_conv_project trainable: False
expanded_conv_project_BN trainable: False
block_1_expand trainable: False
block_1_expand_BN trainable: False
block_1_expand_relu trainable: False
block_1_pad trainable: False
block_1_depthwise trainable: False
block_1_depthwise_BN trainable: False
block_1_depthwise_relu trainable: False
block_1_project trainable: False
block_1_project_BN trainable: False
block_2_expand trainable: False
block_2_expand_BN trainable: False
block_2_expand_relu trainable: False
block_2_depthwise trainable: False
block_2_depthwise_BN trainable: False
block_2_depthwise_relu trainable: False
block_2_project trainable: False
block_2_project_BN trainable: False
block_2_add trainable: False
block_3_expand trainable: False
block_3_expand_BN traina

### Pretrained 모델에 Fine Tuning 적용
* 1단계는 10회 epochs로 Feature Extractor layer들의 trainable=False로 설정하여 학습에서 제외하고 Classification layer들만 학습. 
* 2단계는 10회 epochs로 전체 layer를 trainable=True로 설정하되 BatchNormalization layer는 계속 제외함. learing rate는 1단계의 1/10 수준으로 줄임. 

In [14]:
from tensorflow.keras import layers

def train_model_fine_tune(data_df, model_name, augmentor, preprocessing_func):
    # 학습/검증/테스트용 이미지 파일 절대경로와 Label encoding 된 데이터 세트 반환
    tr_path, tr_label, val_path, val_label, test_path, test_label = get_train_valid_test(data_df)
    
    # 학습과 검증용 Sequence Dataset 생성. 
    tr_ds = CnD_Dataset(tr_path, tr_label, batch_size = BATCH_SIZE, augmentor = augmentor, 
                          shuffle = True, pre_func = preprocessing_func)
    val_ds = CnD_Dataset(val_path, val_label, batch_size = BATCH_SIZE, augmentor = None, 
                           shuffle = False, pre_func = preprocessing_func)
    
    # 입력된 model_name에 따라 모델 생성. 
    model = create_model(model_name = model_name)
    # 최종 output 출력을 softmax에서 sigmoid로 변환되었으므로 binary_crossentropy로 변환 
    model.compile(optimizer = Adam(0.0001), loss = 'binary_crossentropy', metrics = ['accuracy'])
    
    # feature extractor layer들을 freeze
    for layer in model.layers[: -4]:
        layer.trainable = False
    
    FIRST_EPOCHS = 10
    SECOND_EPOCHS = 10
    # 1단계 fine tuning 학습 수행. 
    history = model.fit(tr_ds, epochs = FIRST_EPOCHS, steps_per_epoch = int(np.ceil(tr_path.shape[0] / BATCH_SIZE)), 
                       validation_data = val_ds, validation_steps = int(np.ceil(val_path.shape[0] / BATCH_SIZE)),
                       verbose = 1)
    # 전체 layer들을 unfreeze, 단 batch normalization layer는 그대로 freeze
    for layer in model.layers:
        if not isinstance(layer, layers.BatchNormalization):
            layer.trainable = True
    # 2단계는 learning rate를 기존 보다 1/10 감소    
    model.compile(optimizer = Adam(0.00001), loss = 'binary_crossentropy', metrics = ['accuracy'])    
    history = model.fit(tr_ds, epochs = SECOND_EPOCHS, steps_per_epoch = int(np.ceil(tr_path.shape[0] / BATCH_SIZE)), 
                       validation_data = val_ds, validation_steps = int(np.ceil(val_path.shape[0] / BATCH_SIZE)),
                       verbose = 1)
    
    return model, history

In [15]:
mobile_model_tuned, mobile_tuned_history = train_model_fine_tune(input_df, 'mobilenet', None, mobile_preprocess_input)

학습용 path shape: (1202,) 검증용 path shape: (1202,) 학습용 label shape: (1202,) 검증용 label shape: (1202,)
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [16]:
mobile_model_tuned.evaluate(test_ds)



[0.09405174106359482, 0.9782501459121704]