# Import Library

In [None]:
from sklearn.model_selection import StratifiedKFold

import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import os
import glob
import cv2
import pandas as pd
import seaborn as sns

# Data load

In [None]:
BASE_DIR = os.path.dirname(os.path.abspath(__file__))

In [None]:
img_height = 256
img_width = 256
batch_size = 32

# 19 categories
label_list = ['skin', 'nose', 'eye_g'
              , 'l_eye', 'r_eye', 'l_brow'
              , 'r_brow', 'l_ear', 'r_ear'
              , 'mouth', 'u_lip', 'l_lip'
              , 'hair', 'hat', 'ear_r'
              , 'neck_l', 'neck', 'cloth']

color_list = [[0, 0, 0]
              , [204, 0, 0], [76, 153, 0], [204, 204, 0]
              , [51, 51, 255], [204, 0, 204], [0, 255, 255]
              , [255, 204, 204], [102, 51, 0], [255, 0, 0]
              , [102, 204, 0], [255, 255, 0], [0, 0, 153]
              , [0, 0, 204], [255, 51, 153], [0, 204, 204]
              , [0, 51, 0], [255, 153, 51], [0, 204, 0]]

face_labels = [0,2,4,5,6,7,10,11,12]

In [None]:
def crop_face_region(path, margin=10, target_size=256):
    # load the image
    img = cv2.imread(path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    gray[gray != 0] = 255

    # find border pixel
    border = np.where(gray == 255)
    x_min, x_max = border[1].min(), border[1].max()
    y_min, y_max = border[0].min(), border[0].max()

    # crop the target region
    crop = img[y_min:y_max+1, x_min:x_max+1]
    crop_size = max(x_max-x_min, y_max-y_min)

    canvas = np.zeros((crop_size+10, crop_size+10)).astype(np.uint8)

    center_x, center_y = canvas.shape[0]//2, canvas.shape[0]//2
    size_y, size_x = (y_max-y_min)//2, (x_max-x_min)//2
    lefttop_point_y, lefttop_point_x = center_y-size_y, center_x-size_x
    canvas[lefttop_point_y:lefttop_point_y+(y_max-y_min)+1
           , lefttop_point_x:lefttop_point_x+(x_max-x_min)+1] = crop[:,:,0]
    canvas = cv2.resize(canvas, (target_size, target_size), interpolation = cv2.INTER_NEAREST)

    return canvas

def label_changing(anno_mask, mode="celeb2class"):
    if mode == "celeb2class":
        for cls_idx, org_idx in enumerate(face_labels):
            anno_mask[anno_mask == org_idx] = cls_idx
    elif mode == "class2celeb":
        for cls_idx in range(8, -1, -1):
            org_idx = face_labels[cls_idx]
            anno_mask[anno_mask == cls_idx] = org_idx
    return anno_mask

def mask_coloring(anno_mask, ver='face'):
    anno_mask = cv2.cvtColor(anno_mask, cv2.COLOR_GRAY2RGB)
    canvas = np.zeros_like(anno_mask)
    for i, c in enumerate(color_list):
        if ver == 'all' :
            canvas[np.where((anno_mask == (i,i,i)).all(axis = 2))] = c
        elif ver == 'face' :
            if i in face_labels :
                canvas[np.where((anno_mask == (i,i,i)).all(axis = 2))] = c
    return canvas

def map_func(path):
    name = path.numpy().decode()
    name = name.split('\\')[-1]
    mask_cropped = crop_face_region(path.numpy().decode())
    mask_label = label_changing(mask_cropped, mode= "celeb2class")
    return name, mask_label

In [None]:
s_dataset = os.path.join(BASE_DIR, "Datasets/GENKI-seg/smile/labels/face_*.png")
ns_dataset = os.path.join(BASE_DIR, "Datasets/GENKI-seg/non_smile/labels/face_*.png")

s_dataset_path = glob.glob(s_dataset)
s_dataset_path.sort(reverse=True)
s_dataset_path = tf.data.Dataset.from_tensor_slices(s_dataset_path)

ns_dataset_path = glob.glob(ns_dataset)
ns_dataset_path.sort(reverse=True)
ns_dataset_path = tf.data.Dataset.from_tensor_slices(ns_dataset_path)

s_mask_ds = s_dataset_path.map(lambda item: tf.py_function(map_func, [item], [tf.string, tf.int32]))
ns_mask_ds = ns_dataset_path.map(lambda item: tf.py_function(map_func, [item], [tf.string, tf.int32]))

In [None]:
smile_mask_names = np.array([name.numpy().decode('utf-8') for name, data in s_mask_ds])
smile_mask_ds_arr = np.array([data for name, data in s_mask_ds])
smile_target_arr = np.array([1 for _ in range(len(s_mask_ds))])

nonsmile_mask_names = np.array([name.numpy().decode('utf-8') for name, data in ns_mask_ds])
nonsmile_mask_ds_arr = np.array([data for name, data in ns_mask_ds])
nonsmile_target_arr = np.array([0 for _ in range(len(ns_mask_ds))])

* fold 나누기

In [None]:
NUM_FOLDS = 5
skf = StratifiedKFold(n_splits=NUM_FOLDS)

In [None]:
target_fold_num = 3

smile_mask_folds = skf.split(smile_mask_ds_arr, smile_target_arr)
nonsmile_mask_folds = skf.split(nonsmile_mask_ds_arr, nonsmile_target_arr)

temp_fold = 1
for (s_train_idx, s_val_idx), (ns_train_idx, ns_val_idx) in zip(smile_mask_folds, nonsmile_mask_folds):
    if temp_fold == target_fold_num:

        #### train data
        smile_mask_names = smile_mask_names[s_train_idx]
        smile_masks = smile_mask_ds_arr[s_train_idx]
        smile_mask_target = smile_target_arr[s_train_idx]

        nonsmile_mask_names = nonsmile_mask_names[ns_train_idx]
        nonsmile_masks = nonsmile_mask_ds_arr[ns_train_idx]
        nonsmile_mask_target = nonsmile_target_arr[ns_train_idx]

        smile_masks = tf.data.Dataset.from_tensor_slices(smile_masks)
        smile_masks = smile_masks.batch(batch_size)

        nonsmile_masks = tf.data.Dataset.from_tensor_slices(nonsmile_masks)
        nonsmile_masks = nonsmile_masks.batch(batch_size)

        #### validataion data
        smile_mask_names_val = smile_mask_names[s_val_idx]
        smile_masks_val = smile_mask_ds_arr[s_val_idx]
        smile_mask_target_val = smile_target_arr[s_val_idx]

        nonsmile_mask_names_val = nonsmile_mask_names[ns_val_idx]
        nonsmile_masks_val = nonsmile_mask_ds_arr[ns_val_idx]
        nonsmile_mask_target_val = nonsmile_target_arr[ns_val_idx]

        smile_masks_val = tf.data.Dataset.from_tensor_slices(smile_masks_val)
        smile_masks_val = smile_masks_val.batch(batch_size)

        nonsmile_masks_val = tf.data.Dataset.from_tensor_slices(nonsmile_masks_val)
        nonsmile_masks_val = nonsmile_masks_val.batch(batch_size)
        break

    temp_fold += 1

# Set DL Model

In [None]:
OUTPUT_CHANNEL = 9
SAVE_DIR = os.path.join(BASE_DIR, "models/cVAE")

class CVAE(tf.keras.Model):
    def __init__(self
                 , latent_dim
                 , fold_num
                 , optimizer=None):

        super(CVAE, self).__init__()
        self.output_dir = SAVE_DIR
        self.latent_dim = latent_dim
        self.fold_num = fold_num
        self.optimizer = optimizer

        self.encoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(256, 256, 1)),
                tf.keras.layers.Conv2D(
                    filters=8, kernel_size=3, strides=(2, 2), padding='same', activation='relu'),
                tf.keras.layers.Conv2D(
                    filters=16, kernel_size=3, strides=(2, 2), padding='same', activation='relu'),
                tf.keras.layers.Conv2D(
                    filters=32, kernel_size=3, strides=(2, 2), padding='same', activation='relu'),
                tf.keras.layers.Conv2D(
                    filters=64, kernel_size=3, strides=(2, 2), padding='same', activation='relu'),
                tf.keras.layers.Conv2D(
                    filters=128, kernel_size=3, strides=(2, 2), padding='same', activation='relu'),
                tf.keras.layers.Flatten(),
                # No activation
                tf.keras.layers.Dense(latent_dim + latent_dim),
            ]
        )

        self.decoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
                tf.keras.layers.Dense(units=8*8*128, activation=tf.nn.relu),
                tf.keras.layers.Reshape(target_shape=(8, 8, 128)),
                tf.keras.layers.Conv2DTranspose(
                    filters=128, kernel_size=3, strides=2, padding='same',
                    activation='relu'),
                tf.keras.layers.Conv2DTranspose(
                    filters=64, kernel_size=3, strides=2, padding='same',
                    activation='relu'),
                tf.keras.layers.Conv2DTranspose(
                    filters=32, kernel_size=3, strides=2, padding='same',
                    activation='relu'),
                tf.keras.layers.Conv2DTranspose(
                    filters=16, kernel_size=3, strides=2, padding='same',
                    activation='relu'),
                tf.keras.layers.Conv2DTranspose(
                    filters=8, kernel_size=3, strides=2, padding='same',
                    activation='relu'),
                # No activation
                tf.keras.layers.Conv2D(OUTPUT_CHANNEL, (1, 1)),
            ]
        )

        self.checkpoints = tf.train.Checkpoint(encoder=self.encoder,
                                               decoder=self.decoder,
                                               optimizer = self.optimizer)

    @tf.function
    def sample(self, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=(100, self.latent_dim))

        return self.decode(eps, apply_softmax=True)

    def encode(self, x):
        mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        return mean, logvar

    def reparameterize(self, mean, logvar, mode='train'):
        if mode == 'train':
            eps = tf.random.normal(shape=mean.shape)
        elif mode == "inference":
            eps = tf.random.normal(shape=mean.shape, seed=1)
        return eps * tf.exp(logvar * .5) + mean

    def decode(self, z, apply_softmax=False):
        logits = self.decoder(z)
        if apply_softmax:
            probs = tf.nn.softmax(logits)
            return probs
        return logits

    def save_models(self):
        self.checkpoints.save(file_prefix = '{}/fold_{}/checkpoints/ckpt'.format(self.output_dir
                                                                                , self.fold_num))

    def load_models(self, idx=None):
        if idx==None:
            ckpt_prefix = '{}/checkpoints/fold_{}'.format(self.output_dir, self.fold_num)
            self.checkpoints.restore(tf.train.latest_checkpoint(ckpt_prefix))
        else:
            self.checkpoints.restore(file_prefix = '{}/fold_{}/checkpoints/ckpt-{}'.format(self.output_dir
                                                                                           , self.fold_num
                                                                                           , idx))
        self.encoder = self.checkpoints.encoder
        self.decoder = self.checkpoints.decoder
        self.optimizer = self.checkpoints.optimizer

In [None]:
vae_model = CVAE(latent_dim=10
            , fold_num = target_fold_num
            , optimizer = tf.keras.optimizers.Adam(1e-4))

# vae_model.load_models()
vae_model.load_weights('{}/checkpoints/fold_{}/ckpt-10'.format(SAVE_DIR, target_fold_num))

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x23c6fa6bdc0>

# Calculate IoU score

In [None]:
class CalAnomalyScore:
    def __init__(self, model):
        self.model = model

    def iou_calculation(self, org_cmps, seg_cmps):
        iou = []
        # Calculate IoU score per facial component
        for org_c, seg_c in zip(org_cmps, seg_cmps):
            if np.all(org_c == 0) and np.all(seg_c == 0):
                iou.append(0)
            else:
                intersection = cv2.countNonZero(np.bitwise_and(org_c, seg_c))
                union = cv2.countNonZero(np.bitwise_or(org_c, seg_c))
                iou.append(intersection/union)

        return iou

    def select_and_score(self, dataset, target_labels=[6], mode='iou'):
        scores = []
        for label in dataset:
            mean, logvar = self.model.encode(tf.expand_dims(label, axis=-1))
            z = self.model.reparameterize(mean, logvar)
            predictions = self.model.sample(z)
            predictions = tf.argmax(predictions, axis=-1)

            for org, seg in zip(label, predictions):
                org_cmps, seg_cmps = [], []
                for idx in target_labels:
                    o_base, s_base = np.zeros_like(org), np.zeros_like(seg)
                    o_base[org == idx] = 1
                    s_base[seg == idx] = 1
                    org_cmps.append(o_base)
                    seg_cmps.append(s_base)

                # Calculate IoU score per facial component
                if mode == 'iou':
                    scores.append(self.iou_calculation(org_cmps, seg_cmps))
                elif mode == 'spsim':
                    scores.append(self.shape_similarity(org_cmps, seg_cmps))

        return scores

    def plot_anomaly_score(self, data1, data2, metric_name, fig_title):
        sns.set(rc = {'figure.figsize':(20,5)})
        sns.set(font_scale = 2)
        sns.set_style("whitegrid")
        sns.histplot(data=data1[0], kde=True, label=data1[1], color="green", element='step')
        sns.histplot(data=data2[0], kde=True, label=data2[1], color="purple", element='step')
        plt.xlabel('{} Score'.format(metric_name))
        plt.legend()
        plt.title(fig_title)
        plt.axvline(x=np.mean(data1[0]), color='green', linestyle='--')
        plt.axvline(x=np.mean(data2[0]), color="purple", linestyle='--')

        plt.show()

In [None]:
cas = CalAnomalyScore(model = vae_model)

In [None]:
smile_mask_folds = skf.split(smile_mask_ds_arr, smile_target_arr)
nonsmile_mask_folds = skf.split(nonsmile_mask_ds_arr, nonsmile_target_arr)

fold_no = 1
for (s_train_idx, s_val_idx), (ns_train_idx, ns_val_idx) in zip(smile_mask_folds, nonsmile_mask_folds):

    #### train data
    smile_names = smile_mask_names[s_train_idx]
    smile_masks = smile_mask_ds_arr[s_train_idx]
    smile_mask_target = smile_target_arr[s_train_idx]

    nonsmile_names = nonsmile_mask_names[ns_train_idx]
    nonsmile_masks = nonsmile_mask_ds_arr[ns_train_idx]
    nonsmile_mask_target = nonsmile_target_arr[ns_train_idx]

    smile_masks = tf.data.Dataset.from_tensor_slices(smile_masks)
    smile_masks = smile_masks.batch(batch_size)

    nonsmile_masks = tf.data.Dataset.from_tensor_slices(nonsmile_masks)
    nonsmile_masks = nonsmile_masks.batch(batch_size)

    #### validataion data
    smile_names_val = smile_mask_names[s_val_idx]
    smile_masks_val = smile_mask_ds_arr[s_val_idx]
    smile_mask_target_val = smile_target_arr[s_val_idx]

    nonsmile_names_val = nonsmile_mask_names[ns_val_idx]
    nonsmile_masks_val = nonsmile_mask_ds_arr[ns_val_idx]
    nonsmile_mask_target_val = nonsmile_target_arr[ns_val_idx]

    smile_masks_val = tf.data.Dataset.from_tensor_slices(smile_masks_val)
    smile_masks_val = smile_masks_val.batch(batch_size)

    nonsmile_masks_val = tf.data.Dataset.from_tensor_slices(nonsmile_masks_val)
    nonsmile_masks_val = nonsmile_masks_val.batch(batch_size)

    sw_iou_train = cas.select_and_score(smile_masks, target_labels=range(1,9))
    nsw_iou_train = cas.select_and_score(nonsmile_masks, target_labels=range(1,9))

    sw_iou_val = cas.select_and_score(smile_masks_val, target_labels=range(1,9))
    nsw_iou_val = cas.select_and_score(nonsmile_masks_val, target_labels=range(1,9))

    train_s_name = pd.DataFrame(smile_names)
    train_ns_name = pd.DataFrame(nonsmile_names)

    train_sw_iou = pd.DataFrame(sw_iou_train)
    train_nsw_iou = pd.DataFrame(nsw_iou_train)

    val_s_name = pd.DataFrame(smile_names_val)
    val_ns_name = pd.DataFrame(nonsmile_names_val)

    val_sw_iou = pd.DataFrame(sw_iou_val)
    val_nsw_iou = pd.DataFrame(nsw_iou_val)


    train_sw = pd.concat([train_s_name, train_sw_iou], axis=1)
    train_nsw = pd.concat([train_ns_name, train_nsw_iou], axis=1)

    val_sw = pd.concat([val_s_name, val_sw_iou], axis=1)
    val_nsw = pd.concat([val_ns_name, val_nsw_iou], axis=1)

    train_sw.to_csv('./iou/train_sw_iou(fold{})_withName.csv'.format(fold_no), header=False, index=False)
    train_nsw.to_csv('./iou/train_nsw_iou(fold{})_withName.csv'.format(fold_no), header=False, index=False)

    val_sw.to_csv('./iou/test_sw_iou(fold{})_withName.csv'.format(fold_no), header=False, index=False)
    val_nsw.to_csv('./iou/test_nsw_iou(fold{})_withName.csv'.format(fold_no), header=False, index=False)

    print(train_sw.head())
    fold_no += 1

               0         0         1         2         3         4         5  \
0  face_1729.png  0.895676  0.560976  0.652893  0.718363  0.845039  0.615848   
1  face_1728.png  0.895823  0.492386  0.486967  0.637989  0.787647  0.753160   
2  face_1727.png  0.894855  0.622103  0.600939  0.592186  0.731492  0.860233   
3  face_1726.png  0.909529  0.629902  0.588491  0.666316  0.706568  0.333333   
4  face_1725.png  0.868358  0.625800  0.736620  0.578577  0.590186  0.686787   

          6         7  
0  0.664569  0.868949  
1  0.656355  0.790425  
2  0.831717  0.805712  
3  0.528796  0.722964  
4  0.473467  0.819813  
               0         0         1         2         3         4         5  \
0  face_2162.png  0.944961  0.627551  0.398977  0.673645  0.728736  0.889650   
1  face_2161.png  0.924706  0.644970  0.504298  0.704861  0.785248  0.819780   
2  face_2160.png  0.882685  0.816358  0.748188  0.797289  0.692573  0.875870   
3  face_2159.png  0.933083  0.776699  0.876374  0.62509