In [1]:
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.preprocessing.image import random_rotation, random_shear, random_zoom
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.applications.efficientnet import preprocess_input
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, Dropout, Input
import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import os
import itertools
import mlflow.tensorflow
import mlflow
import cv2

In [2]:
def prepare_data(data, to_3_channels=True, to_clahe=False):
    """ Prepare data for modeling
        input: data frame with labels and pixel data
        output: image and label array in shape(48,48,3) and pixel range(0,256) """
    clahe = cv2.createCLAHE(clipLimit=2)
    channels = 3 if to_3_channels == True else 1

    image_array = np.zeros(shape=(len(data), 48, 48, channels))
    image_label = np.array(list(map(int, data['emotion'])))

    for i, row in enumerate(data.index):
        image = np.fromstring(data.loc[row, 'pixels'], dtype=int, sep=' ')
        image = np.reshape(image, (48, 48, 1))  # 灰階圖的channel數為1

        #  CLAHE (Contrast Limited Adaptive Histogram Equalization)
        if to_clahe == True:
            image = image[:, :, 0].astype("uint8")
            image = clahe.apply(image)
            image = np.reshape(image, (48, 48, 1))

        # Convert to 3 channels
        if to_3_channels == True:
            image = np.stack(
                [image[:, :, 0], image[:, :, 0], image[:, :, 0]], axis=-1)
        image_processed = preprocess_input(image)
        image_array[i] = image_processed

    return image_array, image_label


def build_model(preModel=EfficientNetB0, num_classes=7):

    pre_model = preModel(include_top=False, weights='imagenet',
                         input_shape=(48, 48, 3),
                         pooling='max', classifier_activation='softmax')

    output_layer = Dense(
        num_classes, activation="softmax", name="main_output")

    model = tf.keras.Model(
        pre_model.input, output_layer(pre_model.output))

    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])

    return model


def resize_image(img_array, output_shape=(224, 224)):
    output_img = cv2.resize(img_array, output_shape)
    return output_img


def augmentation_image(img_array):
    tf.random.set_seed(19960220)
    img_array = random_rotation(img_array, rg=30, channel_axis=2)  # 旋轉
    img_array = random_shear(img_array, intensity=20, channel_axis=2)  # 剪裁
    img_array = random_zoom(img_array, zoom_range=(
        0.8, 0.8), channel_axis=2)  # 縮放
    return img_array


def auto_augmentation(X_train, y_train, class_sample_size, ratio=1):
    max_class_size = np.max(class_sample_size)
    fill_class_sample_size = [int(ratio*max_class_size - size)
                              for size in class_sample_size]
    X_train_aug_array = []
    y_train_aug_array = []
    for i, fill_size in enumerate(fill_class_sample_size):
        samples = np.random.choice(list(np.where(y_train == i)[0]), fill_size)
        for image in X_train[samples]:
            image_aug = augmentation_image(image)
            X_train_aug_array.append(image_aug)
            y_train_aug_array.append(i)
    X_train_aug_array = np.array(X_train_aug_array)
    y_train_aug_array = np.array(y_train_aug_array)
    return X_train_aug_array, y_train_aug_array


def plot_one_emotion(data, img_arrays, img_labels, label=0):
    fig, axs = plt.subplots(1, 5, figsize=(25, 12))
    fig.subplots_adjust(hspace=.2, wspace=.2)
    axs = axs.ravel()
    for i in range(5):
        idx = data[data['emotion'] == label].index[i]
        axs[i].imshow(img_arrays[idx][:, :, 0], cmap='gray')
        axs[i].set_title(emotions[img_labels[idx]])
        axs[i].set_xticklabels([])
        axs[i].set_yticklabels([])


def step_decay(epoch):
    """
    Warm-up applying high learning rate at first few epochs.
    Step decay schedule drops the learning rate by a factor every few epochs.
    """
    lr_init = 0.001
    drop = 0.5
    epochs_drop = 5
    warm_up_epoch = 0
    if epoch+1 < warm_up_epoch:  # warm_up_epoch之前採用warmup
        lr = drop * ((epoch+1) / warm_up_epoch)
    else:  # 每epochs_drop個epoch，lr乘以drop倍。
        lr = lr_init * (drop**(int(((1+epoch)/epochs_drop))))
    return lr


def exponential_decay(epoch):
    """
    Warm-up applying high learning rate at first few epochs.
    Step decay schedule drops the learning rate by a factor every few epochs.
    """
    lr_init = 0.001
    lr = lr_init * tf.math.exp(-0.1 * (epoch+1))
    return lr


def polynomial_decay(epoch):
    """
    Warm-up applying high learning rate at first few epochs.
    Step decay schedule drops the learning rate by a factor every few epochs.
    """
    lr_init = 0.001
    lr_end = 0.0001
    global_step = epoch+1
    decay_steps = 10
    lr = (lr_init-lr_end)*((1-(global_step/decay_steps))**2) + lr_end
    return lr


emotions = {0: 'Angry', 1: 'Disgust', 2: 'Fear',
            3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

In [3]:
def prepare_data(data, to_3_channels=True, to_clahe=False):
    """ Prepare data for modeling
        input: data frame with labels and pixel data
        output: image and label array in shape(48,48,3) and pixel range(0,256) """
    clahe = cv2.createCLAHE(clipLimit=2)
    channels = 3 if to_3_channels == True else 1

    image_array = np.zeros(shape=(len(data), 48, 48, channels))
    image_label = np.array(list(map(int, data['emotion'])))

    for i, row in enumerate(data.index):
        image = np.fromstring(data.loc[row, 'pixels'], dtype=int, sep=' ')
        image = np.reshape(image, (48, 48, 1))  # 灰階圖的channel數為1

        #  CLAHE (Contrast Limited Adaptive Histogram Equalization)
        if to_clahe == True:
            image = image[:, :, 0].astype("uint8")
            image = clahe.apply(image)
            image = np.reshape(image, (48, 48, 1))

        # Convert to 3 channels
        if to_3_channels == True:
            image = np.stack(
                [image[:, :, 0], image[:, :, 0], image[:, :, 0]], axis=-1)
        image_processed = preprocess_input(image)
        image_array[i] = image_processed

    return image_array, image_label


def build_model(preModel=EfficientNetB0, num_classes=7):

    pre_model = preModel(include_top=False, weights='imagenet',
                         input_shape=(48, 48, 3),
                         pooling='max', classifier_activation='softmax')

    output_layer = Dense(
        num_classes, activation="softmax", name="main_output")

    model = tf.keras.Model(
        pre_model.input, output_layer(pre_model.output))

    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.CategoricalCrossentropy(), metrics=['accuracy'])

    return model


def resize_image(img_array, output_shape=(224, 224)):
    output_img = cv2.resize(img_array, output_shape)
    return output_img


def augmentation_image(img_array):
    tf.random.set_seed(19960220)
    img_array = random_rotation(img_array, rg=30, channel_axis=2)  # 旋轉
    img_array = random_shear(img_array, intensity=20, channel_axis=2)  # 剪裁
    img_array = random_zoom(img_array, zoom_range=(
        0.8, 0.8), channel_axis=2)  # 縮放
    return img_array


def auto_augmentation(X_train, y_train, class_sample_size, ratio=1):
    max_class_size = np.max(class_sample_size)
    fill_class_sample_size = [int(ratio*max_class_size - size)
                              for size in class_sample_size]
    X_train_aug_array = []
    y_train_aug_array = []
    for i, fill_size in enumerate(fill_class_sample_size):
        samples = np.random.choice(list(np.where(y_train == i)[0]), fill_size)
        for image in X_train[samples]:
            image_aug = augmentation_image(image)
            X_train_aug_array.append(image_aug)
            y_train_aug_array.append(i)
    X_train_aug_array = np.array(X_train_aug_array)
    y_train_aug_array = np.array(y_train_aug_array)
    return X_train_aug_array, y_train_aug_array


def plot_one_emotion(data, img_arrays, img_labels, label=0):
    fig, axs = plt.subplots(1, 5, figsize=(25, 12))
    fig.subplots_adjust(hspace=.2, wspace=.2)
    axs = axs.ravel()
    for i in range(5):
        idx = data[data['emotion'] == label].index[i]
        axs[i].imshow(img_arrays[idx][:, :, 0], cmap='gray')
        axs[i].set_title(emotions[img_labels[idx]])
        axs[i].set_xticklabels([])
        axs[i].set_yticklabels([])


def step_decay(epoch):
    """
    Warm-up applying high learning rate at first few epochs.
    Step decay schedule drops the learning rate by a factor every few epochs.
    """
    lr_init = 0.001
    drop = 0.5
    epochs_drop = 5
    warm_up_epoch = 0
    if epoch+1 < warm_up_epoch:  # warm_up_epoch之前採用warmup
        lr = drop * ((epoch+1) / warm_up_epoch)
    else:  # 每epochs_drop個epoch，lr乘以drop倍。
        lr = lr_init * (drop**(int(((1+epoch)/epochs_drop))))
    return lr


def exp_decay(epoch):
    """
    Warm-up applying high learning rate at first few epochs.
    Step decay schedule drops the learning rate by a factor every few epochs.
    """
    lr_init = 0.001
    lr = lr_init * tf.math.exp(-0.1 * (epoch+1))
    return lr


def poly_decay(epoch):
    """
    Warm-up applying high learning rate at first few epochs.
    Step decay schedule drops the learning rate by a factor every few epochs.
    """
    lr_init = 0.001
    lr_end = 0.0001
    global_step = epoch+1
    decay_steps = 10
    lr = (lr_init-lr_end)*((1-(global_step/decay_steps))**2) + lr_end
    return lr


emotions = {0: 'Angry', 1: 'Disgust', 2: 'Fear',
            3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

In [4]:
df_raw = pd.read_csv("D:/mycodes/AIFER/data/FER2013/fer2013.csv")
#  資料前處理(CLAHE)
X_train, y_train = prepare_data(df_raw[df_raw['Usage'] == 'Training'])
X_val, y_val = prepare_data(df_raw[df_raw['Usage'] == 'PublicTest'])

In [5]:
df_raw = pd.read_csv("D:/mycodes/AIFER/data/FER2013/fer2013.csv")
#  資料前處理(CLAHE)
X_train, y_train = prepare_data(df_raw[df_raw['Usage'] == 'Training'])
X_val, y_val = prepare_data(df_raw[df_raw['Usage'] == 'PublicTest'])

In [6]:
model = build_model()
prob_res = model(X_train[:1]).numpy()
print(f"EFN build successfully!")

EFN build successfully!


In [7]:
zip(["step_decay","exp_decay","poly_decay"],[step_decay,exp_decay,poly_decay])

<zip at 0x2257e4f4c88>

In [8]:
for i,j in zip(["step_decay","exp_decay","poly_decay"],[step_decay,exp_decay,poly_decay]):
    print(i,j)

step_decay <function step_decay at 0x000002254BDF2168>
exp_decay <function exp_decay at 0x000002254BDF2288>
poly_decay <function poly_decay at 0x000002254BDF23A8>


In [9]:
epochs = 30
batch_size = 32

for lr_name, lr_fn in zip(["step_decay","exp_decay","poly_decay"],[step_decay,exp_decay,poly_decay]):
    lrate = LearningRateScheduler(lr_fn)
    callbacks_list = [lrate]
    model = build_model()
    y_train_oh, y_val_oh = to_categorical(y_train), to_categorical(y_val)
    with mlflow.start_run(experiment_id=1, run_name=lr_name):
        mlflow.tensorflow.autolog()
        hist1 = model.fit(X_train, y_train_oh, validation_data=(X_val, y_val_oh),
                        epochs=epochs, batch_size=batch_size, callbacks=callbacks_list)
    mlflow.end_run()

Epoch 1/30
Instructions for updating:
use `tf.profiler.experimental.stop` instead.
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
INFO:tensorflow:Assets written to: C:\Users\USER\AppData\Local\Temp\tmpozim3ztw\model\data\model\assets
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21

In [10]:
print("Done")

Done
