## ResNet50 + ArcFace

In [18]:
from keras.preprocessing.image import ImageDataGenerator


from keras.utils import np_utils, Sequence
from keras.models import Sequential
from keras.layers import *
from keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.applications import ResNet50
from keras import optimizers

import os
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

import datetime
import math

from sklearn.metrics import accuracy_score

In [19]:
trainImagePath = '../dataSet/splitImages/train'
validationImagePath = '../dataSet/splitImages/val'
testImagePath = '../dataSet/splitImages/test'
checkpointPath = '../checkpoints/epoch_{epoch:04d}.ckpt'
logsPath = '../logs/fit/'+ datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
bestCheckpointPath = '../model/best/best_model'

batchSize = 32
imageWidth = 224
imageHeight = 224
imageChannel = 3
input_shape = (imageWidth, imageHeight, imageChannel)
n_classes = 44
scale = 30
margin = 0.3
dropout_rate = 0.0
dense_units=512



In [20]:
class ArcMarginProduct(tf.keras.layers.Layer):
    '''
    Implements large margin arc distance.

    Reference:
        https://arxiv.org/pdf/1801.07698.pdf
        https://github.com/lyakaap/Landmark2019-1st-and-3rd-Place-Solution/
            blob/master/src/modeling/metric_learning.py
    '''
    def __init__(self, n_classes, s=30, m=0.50, easy_margin=False,
                 ls_eps=0.0, **kwargs):

        super(ArcMarginProduct, self).__init__(**kwargs)

        self.n_classes = n_classes
        self.s = s
        self.m = m
        self.ls_eps = ls_eps
        self.easy_margin = easy_margin
        self.cos_m = tf.math.cos(m)
        self.sin_m = tf.math.sin(m)
        self.th = tf.math.cos(math.pi - m)
        self.mm = tf.math.sin(math.pi - m) * m
    
    def get_config(self):
        config = super(ArcMarginProduct, self).get_confit()
        config.update({
            'n_classes':self.n_classes,
            's':self.s,
            'm':self.m,
            'ls_eps':self.ls_eps,
            'easy_margin':self.easy_margin,
            'cos_m':self.cos_m,
            'sin_m':self.sin_m,
            'th':self.tf,
            'mm':self.mm
        })
        return config

    def build(self, input_shape):
        super(ArcMarginProduct, self).build(input_shape[0])

        self.W = self.add_weight(
            name='W',
            shape=(int(input_shape[0][-1]), self.n_classes),
            initializer='glorot_uniform',
            dtype='float32',
            trainable=True,
            regularizer=None)

    def call(self, inputs):
        X, y = inputs
        y = tf.cast(y, dtype=tf.int32)
        cosine = tf.matmul(
            tf.math.l2_normalize(X, axis=1),
            tf.math.l2_normalize(self.W, axis=0)
        )
        sine = tf.math.sqrt(1.0 - tf.math.pow(cosine, 2))
        phi = cosine * self.cos_m - sine * self.sin_m
        if self.easy_margin:
            phi = tf.where(cosine > 0, phi, cosine)
        else:
            phi = tf.where(cosine > self.th, phi, cosine - self.mm)
        one_hot = tf.cast(
            tf.one_hot(y, depth=self.n_classes),
            dtype=cosine.dtype
        )
        if self.ls_eps > 0:
            one_hot = (1 - self.ls_eps) * one_hot + self.ls_eps / self.n_classes

        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)
        output *= self.s
        return output

In [21]:
class Dataset(Sequence) :
    
    def __init__(self,generator) :
        self.generator = generator
    def __len__(self) :
        return len(self.generator)
    def __getitem__(self,idx) :
        x = self.generator[idx] [0]
        y = np.argmax(self.generator[idx] [1],axis=1)
        return (x,y),y

### 이미지 데이터 생성

ImageDataGenerator (train, validation)

In [22]:
trainDataGen = ImageDataGenerator(rescale=1./255,
                                  rotation_range = 30,
                                  width_shift_range=0.1,
                                  height_shift_range=0.1,
                                  shear_range=20,
                                  zoom_range=0.3,
                                  horizontal_flip=False,
                                  vertical_flip=False,
                                  brightness_range=[0.7, 1.3],
                                  fill_mode='nearest',
                                  )

train set

In [23]:
trainGenSet = trainDataGen.flow_from_directory(
    trainImagePath,
    batch_size=batchSize,
    target_size=(imageWidth,imageHeight),
    class_mode='categorical',
    shuffle=True
)

Found 5601 images belonging to 44 classes.


validation set

In [24]:
validationGenSet = trainDataGen.flow_from_directory(
    validationImagePath,
    batch_size=batchSize,
    target_size=(imageWidth,imageHeight),
    class_mode='categorical'
)

Found 681 images belonging to 44 classes.


ImageDataGenerator (train, validation)

In [25]:
testDataGen = ImageDataGenerator(rescale=1./255)

test set

In [26]:
testGenSet = testDataGen.flow_from_directory(
    testImagePath,
    batch_size=batchSize,
    target_size=(imageWidth,imageHeight),
    class_mode='categorical'
)

Found 739 images belonging to 44 classes.


In [27]:
backbone = tf.keras.applications.ResNet50(
        include_top=False,
        input_shape=input_shape,
        weights=('imagenet')
    )

pooling = tf.keras.layers.GlobalAveragePooling2D(name='head/pooling')
dropout = tf.keras.layers.Dropout(dropout_rate, name='head/dropout')
dense = tf.keras.layers.Dense(dense_units, name='head/dense')

margin = ArcMarginProduct(
            n_classes = n_classes, 
            s = 30, 
            m = 0.5, 
            name='head/arc_margin', 
            dtype='float32'
            )

inp = tf.keras.layers.Input(shape = input_shape, name = 'inp1')
label = tf.keras.layers.Input((), name = 'inp2')

x = backbone(inp)
x = pooling(x)
x = dropout(x)
x = dense(x)
x = margin([x, label])
output = Dense(44, activation='softmax')(x)

model = tf.keras.models.Model(inputs = [inp, label], outputs = [output])

model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 inp1 (InputLayer)              [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 resnet50 (Functional)          (None, 7, 7, 2048)   23587712    ['inp1[0][0]']                   
                                                                                                  
 head/pooling (GlobalAveragePoo  (None, 2048)        0           ['resnet50[0][0]']               
 ling2D)                                                                                          
                                                                                            

In [28]:
model.compile(loss='sparse_categorical_crossentropy',
             optimizer='adam',
             metrics=['accuracy'])

In [29]:
checkpointPath = '../checkpoints/epoch_{epoch:04d}.ckpt'

In [30]:
checkpoint = ModelCheckpoint(
                        checkpointPath, monitor='val_loss', verbose=0, save_best_only=False,
                        save_weights_only=False, mode='auto', save_freq='epoch'
                    )


In [31]:
earlyStopping = [
                    EarlyStopping(monitor='val_loss', patience=10, ),
                    ModelCheckpoint( bestCheckpointPath, monitor='val_loss', save_best_only=True )
                ]

In [32]:
tensorboard = tf.keras.callbacks.TensorBoard(logsPath)

In [33]:
trainDataSet = Dataset(trainGenSet)
valDataSet = Dataset(validationGenSet)

In [34]:
epochs = 200
history = model.fit(
    trainDataSet,
    epochs=epochs,
    validation_data=valDataSet,
    callbacks=[checkpoint,earlyStopping, tensorboard],
    verbose=1
)

Epoch 1/200
  4/176 [..............................] - ETA: 32:38 - loss: 6.7439 - accuracy: 0.0703   

KeyboardInterrupt: 

In [None]:
scores = model.evaluate(testGenSet)
print(scores)

In [None]:
fig, loss_ax = plt.subplots()
acc_ax = loss_ax.twinx()

loss_ax.plot(history.history['loss'], 'y', label = 'train loss')
loss_ax.plot(history.history['val_loss'], 'r', label = 'val loss')
acc_ax.plot(history.history['accuracy'], 'b', label = 'train accuracy')
acc_ax.plot(history.history['val_accuracy'], 'g', label = 'val accuracy')

loss_ax.set_xlabel('epoch')
loss_ax.set_ylabel('loss')
acc_ax.set_xlabel('accuracy')

loss_ax.legend(loc = 'upper left')
acc_ax.legend(loc = 'lower left')

plt.show()

In [None]:
model.save('../model/resnetModel')