In [None]:
import numpy as np
import pandas as pd
import os

def make_catndog_dataframe():
    paths = []
    dataset_gubuns = []
    label_gubuns = []
    
    for dirname, _, filenames in os.walk('/kaggle/input/cat-and-dog'):
        for filename in filenames:
            if '.jpg' in filename:
                file_path = dirname+'/'+filename
                paths.append(file_path)
                if '/training_set' in file_path:
                    dataset_gubuns.append('train')
                elif '/test_set/' in file_path:
                    dataset_gubuns.append('test')
                else: dataset_gubuns.append('N/A')
                    
                if 'dogs' in file_path:
                    label_gubuns.append('DOG')
                elif 'cats' in file_path:
                    label_gubuns.append('CAT')
                else: label_gubuns.append('N/A')
                    
    data_df = pd.DataFrame({'path':paths, 'dataset':dataset_gubuns, 'label':label_gubuns})
    return data_df
            

In [None]:
pd.set_option('display.max_colwidth', 200)
data_df = make_catndog_dataframe()
print('data_df shape:', data_df.shape)
data_df.head()

In [None]:
list_a = [1, 2, 3, 4, 5]
print(len(list_a)) # __len__(self) 호출
print(list_a[0]) # __getitem__(self, index)

In [None]:
from tensorflow.keras.utils import Sequence
import sklearn
import cv2

BATCH_SIZE = 64
IMAGE_SIZE = 224

# 입력 인자 image_filenames, labels는 모두 numpy array로 들어옴
class CnD_Dataset(Sequence):
    
    # 객체 생성 인자로 들어온 값을 객체 내부 변수로 할당.
    def __init__(self, image_filenames, labels, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=None):
        
        # 객체 생성 인자로 들어온 값을 객체 내부 변수로 할당.
        self.image_filenames = image_filenames
        self.labels = labels
        self.batch_size = batch_size
        self.augmentor = augmentor
        self.shuffle = shuffle
        # train data의 경우
        if self.shuffle:
            # 객체 생성시에 한번 데이터를 섞음
            #self.on_epoch_end()
            pass
        
    # Sequence를 상속받은 Dataset은 batch_size 단위로 입력된 데이터를 처리함
    # __len__()은 전체 데이터 건수가 주어졌을 때 batch_size단위로 몇번 데이터를 반환하는지 나타냄

    def __len__(self):
        # batch_size 단위로 데이터를 몇번 가져와야하는지 계산하기 위해 전체 데이터 건수를 batch_size로 나누되, 정수로 정확히 나눠지지 않을 경우 1회를 더한다.
        return int(np.ceil(len(self.labels)/self.batch_size))
    
    def __getitem__(self, index):
    # batch_size 단위로 image_array, label_array 데이터를 가져와서 변환한 뒤 다시 반환함
    # 인자로 몇번째 batch 인지를 나타내는 index를 입력하면 해당 순서에 해당하는 batch_size 만큼의 데이터를 가공하여 반환
    # batch_size 갯수만큼 변환된 image_array와 label_array 반환
    
        # index는 몇번째 batch인지를 나타냄.
        # batch_size만큼 순차적으로 데이터를 가져오려면 array에서 index*self.batch_size:(index+1)*self.batch_size 만큼의 연속 데이터를 가져오면 됨
        image_name_batch = self.image_filenames[index*self.batch_size:(index+1)*self.batch_size]
        if self.labels is not None:
            label_batch = self.labels[index*self.batch_size:(index+1)*self.batch_size]
        
        # 만일 객체 생성 albumentation으로 만든 augmentor가 주어진다면 아래와 같이 augmentor를 이용하여 image 변환
        # albumentations은 개별 image만 변환할 수 있으므로 batch_size만큼 할당된 image_name_batch를 한 건씩 iteration 하면서 변환 수행
        # image_batch 배열은 float32로 설정
        image_batch = np.zeros((image_name_batch.shape[0], IMAGE_SIZE, IMAGE_SIZE, 3))
        # batch_size에 담긴 건수만큼 iteration 하면서 opencv image load => image augmentation 변환(augmentor가 not None일 경우) => image_batch에 담음
        for image_index in range(image_name_batch.shape[0]):
            image = cv2.cvtColor(cv2.imread(image_name_batch[image_index]), cv2.COLOR_BGR2RGB)
            image = cv2.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
            if self.augmentor is not None:
                image = self.augmentor(image=image)['image']
            
            # 만일 preprocessing_input이 pre_func 인자로 들어오면 이를 이용하여 scaling 적용
            if self.pre_func is not None:
                image = self.pre_func(image)
            
            image_batch[image_index] = image
        
        return image_batch, label_batch
    
    
    # epoch가 한번 수행이 완료 될때마다 모델의 fit()에서 호출됨.
    def on_epoch_end(self):
        if(self.shuffle):
            #print('epoch end')
            # 전체 image 파일의 위치와 label를 쌍을 맞춰서 섞어준다. scikit learn의 utils.shuffle에서 해당 기능 제공
            self.image_filenames, self.labels = sklearn.utils.shuffle(self.image_filenames, self.labels)
        else:
            pass
    

In [None]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Dense , Conv2D , Dropout , Flatten , Activation, MaxPooling2D , GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam , RMSprop 
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ReduceLROnPlateau , EarlyStopping , ModelCheckpoint , LearningRateScheduler
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.applications import Xception, MobileNetV2

def create_model(model_name='vgg16', verbose=False):
    
    input_tensor = Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
    if model_name == 'vgg16':
        base_model = VGG(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name == 'resnet50':
        base_model = ResNet50V2(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name == 'xception':
        base_model = Xception(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name == 'mobilenet':
        base_model = MobileNetV2(input_tensor=input_tensor, include_top=False, weights='imagenet')
        
    bm_output = base_model.output
    
    x = GlobalAveragePooling2D()(bm_output)
    if model_nema != 'vgg16':
        x = Dropout(rate=0.5)(x)
    x = Dense(50, activation='sigmoid', name='output')(x)
    
    model = Model(inputs=input_tensor, outputs=output)
    
    if verbose:
        model.summary()
    
    return model

In [None]:
from tensorflow.keras.applications.xception import preprocess_input as xcp_preprocess_input
from tensorflow.keras.applications.mobilenet import preprocess_input as mobile_preprocess_input

def get_train_valid_test(data_df):
    # 학습 데이터와 테스트 데이터용 Dataframe 생성
    train_df = data_df[data_df['dataset']=='train']
    test_df = data_df[data_df['dataset']=='test']
    
    # 학습 데이터의 image path와 label을 Numpy array로 변환 및 Label encoding
    train_path = train_df['path'].values
    train_label = pd.factorize(train_df['label'])[0]
    
    test_path = test_df['path'].values
    test_label = pd.factorize(test_df['label'])[0]
    
    tr_path, val_path, tr_label, val_label = train_test_split(train_path, train_label, test_size=0.15, random_state=2021)
    
    return tr_path, tr_label, val_path, val_label, test_path, test_label

def do_train_evaluation(data_df, model_name, augmentor, preprocessing_func):
    # 학습/검증/테스트용 이미지 파일 절대경로와 Label encoding된 데이터 세트 반환
    tr_path, tr_label, val_path, val_label, test_path, test_label = get_train_valid_test(data_df)
    
    # 학습과 검증용 Sequence Dataset 생성
    tr_ds = CnD_Dataset(tr_path, tr_label, batch_size=BATCH_SIZE, augmentor=augmentor,
                       shuffle=True, pre_func=preprocessing_func)
    val_ds = CnD_Dataset(val_path, val_label, batch_size=BATCH_SIZE, augmentor=None,
                        shuffle=False, pre_func=preprocessing_func)
    
    # 입력된 model_name에 따라 모델 생성
    model = create_model(model_name=model_name)
    model.compile(optimizer=Adam(0.001), loss='binary_crossentropy', metrics=['accuracy'])
    
    rlr_cb = ReducLROnPlateau(monitor='val_loss', factor=0.2, patience=3, mode='min', verbose=1)
    ely_cb = EarlyStopping(monitor='val_loss', patience=7, mode='min', verbose=1)
    
    N_EPOCHS = 15
    history = model.fit(tr_ds, epochs=N_EPOCHS, steps_per_epoch=int(np.ceil(tr_path.shape[0]/BATCH_SIZE)),
                       validation_data=val_ds, validation_steps=int(np.ceil(val_path.shape[0]/BATCH_SZIE)),
                       callbacks=([rlr_cb, ely_cb]), verbose=1)
    
    # 테스트용 Sequence Dataset 생성 후 evaluation 수행.
    test_ds = CnD_Dataset(test_path, test_label, batch_size=BATCH_SIZE, augmentor=None,
                         shuffle=False, pre_func=preprocessing_func)
    
    evaluation_result = model.evaluate(test_ds)
    print(evaluation_result)
    return history, evaluation_result

In [None]:
# tf.keras.backend.clear_session()

cnd_augmentor = A.Compose([
    A.HoriznotalFlip(p=0.3),
    A.ShiftScaleRotate(p=0.2),
    A.RandomBrightnessContrast(p=0.2)
])

history, result = do_train_evaluation(data_df, 'mobilenet', cnd_augmentor, mobile_preprocess_input )