In [1]:
!pip install -q -U 'tensorflow-text==2.8.*'
!pip install -q tf-models-official==2.7.0
!pip install tensorflow_addons
!pip install -U --no-cache-di gdown --pre

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.9/4.9 MB[0m [31m49.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m498.0/498.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m23.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m462.3/462.3 KB[0m [31m18.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.8/5.8 MB[0m [31m44.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m46.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 KB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m118.9/118.9 KB[0m [31m11.7 MB/s[0m eta

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!cp '/content/drive/My Drive/Dataset/demake_up_data.zip' '/content'
!unzip /content/demake_up_data.zip

In [2]:
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow.keras as keras

from tensorflow.keras import layers
from tensorflow.keras.models import Model

 The versions of TensorFlow you are currently using is 2.8.4 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


In [3]:
# CONFIG PARAMETERS
BATCH_SIZE = 16
IMG_HEIGHT = 224
IMG_WIDTH = 224
IMG_CHANNELS = 3
BUFFER_SIZE = BATCH_SIZE * 10
IMG_PATH = '/content/demake_up_data'
EPOCHS = 20

np.random.seed(69)

# Data processing

In [None]:
def load(img_file):
  makeup_img_file, non_img_file = tf.split(img_file, 2)

  makeup_img = tf.io.read_file(makeup_img_file[0])
  makeup_img = tf.image.decode_jpeg(makeup_img, channels=IMG_CHANNELS)

  non_img = tf.io.read_file(non_img_file[0])
  non_img = tf.image.decode_jpeg(non_img, channels=IMG_CHANNELS)

  makeup_img = tf.cast(makeup_img, tf.float32)
  non_img = tf.cast(non_img, tf.float32)

  return makeup_img, non_img

In [None]:
makeup_img, non_img = load([str(IMG_PATH+'/train/makeup/0.png'), 
                            str(IMG_PATH+'/train/non-makeup/0.png')])
print(makeup_img.shape)
print(non_img.shape)

plt.figure(figsize=(10, 8))
plt.imshow(makeup_img/255.0)

plt.figure(figsize=(10, 8))
plt.imshow(non_img/255.0)

In [None]:
@tf.function
def random_flip(makeup_img, non_img):
  if tf.random.uniform(()) > 0.5:
    # Random_mirroring
    makeup_img = tf.image.flip_left_right(makeup_img)
    non_img = tf.image.flip_left_right(non_img)
  return makeup_img, non_img

def processing_image(makeup_img, non_img):
  makeup_img = (makeup_img/255.0)
  non_img = (non_img/255.0)
  return makeup_img, non_img

def load_image_train(image_file):
  makeup_img, non_img = load(image_file)
  makeup_img, non_img = random_flip(makeup_img, non_img)
  makeup_img, non_img = processing_image(makeup_img, non_img)
  return makeup_img, non_img

def load_image_val(image_file):
  makeup_img, non_img = load(image_file)
  makeup_img, non_img = processing_image(makeup_img, non_img)
  return makeup_img, non_img

In [None]:
def prepare_data(path):
  makeup_img_list = [os.path.join(path, f) for f in os.listdir(path)]
  data_list = [[i, i.replace('makeup', 'non-makeup')] for i in makeup_img_list]
  return data_list

train_data_list = prepare_data(str(IMG_PATH + '/train/makeup/'))
val_data_list = prepare_data(str(IMG_PATH + '/val/makeup/'))
test_data_list = prepare_data(str(IMG_PATH + '/test/makeup'))

np.random.shuffle(train_data_list)
np.random.shuffle(val_data_list)
np.random.shuffle(test_data_list)

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices(train_data_list)
train_dataset = train_dataset.map(load_image_train, 
                                  num_parallel_calls=tf.data.AUTOTUNE)
train_datatset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)


val_dataset = tf.data.Dataset.from_tensor_slices(val_data_list)
val_dataset = val_dataset.map(load_image_val)
val_dataset = val_dataset.batch(BATCH_SIZE)


test_dataset = tf.data.Dataset.from_tensor_slices(test_data_list)
test_dataset = test_dataset.map(load_image_val)
test_dataset = test_dataset.batch(BATCH_SIZE)

# Model Implementation

In [4]:
class ResNetBlock():
    def __init__(self):
        pass

    def init_block(self, inputs):
        x = keras.layers.ZeroPadding2D(padding=3)(inputs)
        x = keras.layers.Conv2D(64, kernel_size=(7,7), strides=2)(x)
        x = tfa.layers.InstanceNormalization()(x)
        x = keras.layers.Activation('gelu')(x)
        return x

    def identity_block(self, inputs, n_filters):
        x = keras.layers.Conv2D(n_filters, kernel_size=(3, 3), padding='same', use_bias=False)(inputs)
        x = tfa.layers.InstanceNormalization()(x)
        x = keras.layers.Activation('gelu')(x)
        x = keras.layers.Conv2D(n_filters, kernel_size=(3, 3), padding='same', use_bias=False)(x)
        x = tfa.layers.InstanceNormalization()(x)
        skip_connection = keras.layers.Add()([inputs, x])
        x = keras.layers.Activation('relu')(skip_connection)
        return x

    def projection_block(self, inputs, filters, strides=2):
        x = keras.layers.Conv2D(filters=filters, kernel_size=(3, 3), padding='same', strides=strides, use_bias=False)(inputs)
        x = tfa.layers.InstanceNormalization()(x)
        x = keras.layers.Activation('gelu')(x)
        x = keras.layers.Conv2D(filters=filters, kernel_size=(3, 3), padding='same', use_bias=False)(x)
        x = tfa.layers.InstanceNormalization()(x)
        shortcut = keras.layers.Conv2D(filters=filters, kernel_size=(1, 1), padding='same', strides=strides, use_bias=False)(inputs)
        shortcut = tfa.layers.InstanceNormalization()(shortcut)
        skip_connection = keras.layers.Add()([shortcut, x])
        x = keras.layers.Activation('gelu')(skip_connection)
        return x

    def build_block(self, inputs, filters, n_iter, projection_block=False):
        layer = inputs
        if projection_block == True:
          layer = self.projection_block(layer, filters, strides=2)
        for iter in range(n_iter):
          layer = self.identity_block(layer, filters)
        return layer


In [52]:
class MultiTasking():
    def __init__(self):
      pass

    def task(self):
      pass

    def conv_block(self, inputs, num_filters):
      x = layers.Conv2D(filters=num_filters, kernel_size=(3,3), padding="same")(inputs)
      x = tfa.layers.InstanceNormalization()(x)
      x = layers.Activation('gelu')(x)
      return x


class Task1(MultiTasking):
    def __init__(self):
        pass

    def task(self, input_feature):
        non_feature = self.conv_block(input_feature, 32)
        non_feature = self.conv_block(non_feature, 16)
        output = layers.Conv2D(3, kernel_size=(1, 1), activation='sigmoid')(non_feature)
        return output

class Task2(MultiTasking):
    def __init__(self):
      pass

    def task(self, input_feature):
      sr_feature = layers.Conv2DTranspose(32, kernel_size=(2, 2), strides=2, padding='same')(input_feature)
      sr_feature = layers.Conv2DTranspose(16, kernel_size=(2, 2), strides=2, padding='same')(sr_feature)
      output = layers.Conv2D(3, kernel_size=(1, 1), activation='sigmoid')(sr_feature)
      return output

In [54]:
class Res34UNetMultiTask():
    def __init__(self):
        self.res34 = ResNetBlock()
        self.task1 = Task1()
        self.task2 = Task2()

    def conv_block(self, inputs, num_filters):
      x = layers.Conv2D(filters=num_filters, kernel_size=(3,3), padding="same")(inputs)
      x = tfa.layers.InstanceNormalization()(x)
      x = layers.Activation('gelu')(x)
      return x

    def downsampling(self, inputs, n_filters, n_iter, projection_block=False):
      down = self.res34.build_block(inputs, n_filters, n_iter, projection_block)
      branch = keras.layers.Dropout(0.3)(down)
      return down, branch

    def upsampling(self, up_inputs, branch_inputs, n_filters):
        x_up = keras.layers.Conv2DTranspose(n_filters, kernel_size=(2, 2), padding='same', strides=2)(up_inputs)
        x = keras.layers.concatenate([x_up, branch_inputs])
        x = self.conv_block(x, n_filters)
        return x

    def build_model(self, input_shape=(IMG_WIDTH, IMG_HEIGHT, IMG_CHANNELS)):
        # Inputs
        inputs = keras.layers.Input(shape=input_shape)

        # Encoder: ResNet3
        init_block = self.res34.init_block(inputs)
        pre_second_block = keras.layers.ZeroPadding2D(padding=1)(init_block)
        pre_second_block = keras.layers.MaxPooling2D(pool_size=(3, 3), strides=2)(pre_second_block)
        down2, branch2 = self.downsampling(pre_second_block, 64, 3, False)
        down3, branch3 = self.downsampling(down2, 128, 4, True)
        down4, branch4 = self.downsampling(down3, 256, 6, True)
        
        # Bridge
        down5, branch5 = self.downsampling(down4, 512, 3, True)
        bridge = down5

        # Decoder
        up4 = self.upsampling(bridge, down4, 512)
        up3 = self.upsampling(up4, branch3, 256)
        up2 = self.upsampling(up3, branch2, 128)
        up1 = self.upsampling(up2, init_block, 64)
        init_branch = keras.layers.Conv2D(64, kernel_size=(1, 1), padding='same', strides=1)(inputs)
        up0 = self.upsampling(up1, init_branch, 64)

        # Output
        first_feature = layers.Conv2D(64, kernel_size=(3, 3), padding='same')(inputs)
        first_feature = layers.Conv2DTranspose(64, kernel_size=(2, 2), padding='same', strides=2)(first_feature)
        final_feature = self.upsampling(up0, first_feature, 64)
        outputs1 = self.task1.task(final_feature)
        outputs2 = self.task2.task(final_feature)
        model = Model(inputs=inputs, outputs=[outputs1, outputs2])
        return model

In [55]:
model = Res34UNetMultiTask()
res34unet = model.build_model(input_shape=(IMG_WIDTH, IMG_HEIGHT, IMG_CHANNELS))
res34unet.summary()

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_23 (InputLayer)          [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 zero_padding2d_44 (ZeroPadding  (None, 230, 230, 3)  0          ['input_23[0][0]']               
 2D)                                                                                              
                                                                                                  
 conv2d_1088 (Conv2D)           (None, 112, 112, 64  9472        ['zero_padding2d_44[0][0]']      
                                )                                                           

# Training

In [None]:
def evaluate(model, epoch, dataset):  
    psnr_non_mean = 0.0
    psnr_sr_mean = 0.0
    count = 0
    for makeup_img, non_img, hr_img in dataset:
        
        pred_non, pred_sr = model([makeup_img], training=False)
        
        psnr_non = tf.image.psnr(pred_non, non_img, max_val=1.0)
        psnr_sr = tf.image.psnr(pred_sr, hr_img, max_val=1.0)

        __psnr_non_mean = tf.math.reduce_mean(psnr_non)
        __psnr_sr_mean = tf.math.reduce_mean(psnr_sr)
        # psnr_mean = psnr_mean_l
        
        psnr_non_mean += __psnr_non_mean
        psnr_sr_mean += __psnr_sr_mean
        count =count + 1
    
    psnr_non_mean = psnr_non_mean/count
    psnr_sr_mean = psnr_sr_mean/count
    print('-------- psnr_non: ', psnr_non_mean.numpy(),  'psnr_sr: ', psnr_sr_mean.numpy(), '   ----- epoch: ', epoch, '  count: ', count)
    
    return psnr_non_mean, psnr_sr_mean
    

def generate_images(model, makeup_img, non_img, hr_img):
    pred_non, pred_sr = model([makeup_img], training=False)
    plt.figure(figsize=(15,20))
    
    display_list = [makeup_img[0], non_img[0], pred_non[0]]
    
    
    title = ['Input', 'Non-makeup', 'Predicted']    

    for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
        plt.imshow(display_list[i])
        plt.axis('off')
    plt.show()
    
    display_list2 = [hr_img[0], pred_sr[0]]
    title2 = ['Target', 'Pred_SR']
    for i in range(2):
        plt.subplot(1, 2, i+1)
        plt.title(title2[i])
        plt.imshow(display_list2[i])
        plt.axis('off')
    plt.show()

In [None]:
from official.nlp import optimization  # to create AdamW optimizer
steps_per_epoch = tf.data.experimental.cardinality(train_dataset).numpy()
num_train_steps = steps_per_epoch * EPOCHS
num_warmup_steps = int(0.1*num_train_steps)

init_lr = 1e-2
generator_optimizer = optimization.create_optimizer(init_lr=init_lr,
                                          num_train_steps=num_train_steps,
                                          num_warmup_steps=num_warmup_steps,
                                          optimizer_type='adamw')

In [None]:
from tqdm import tqdm

@tf.function
def train_step(model, makeup_img, non_img, hr_img):
    with tf.GradientTape() as tape:
        # output
        pred_non, pred_sr = model([makeup_img], training=True)     
        loss_non = tf.reduce_mean(tf.square(pred_non-non_img))*100
        loss_sr = tf.reduce_mean(tf.square(pred_sr-hr_img))*100
        loss = 2*loss_non + loss_sr
        
    generator_gradients = tape.gradient(loss, model.trainable_variables)
    generator_optimizer.apply_gradients(zip(generator_gradients, model.trainable_variables))
    
    return loss

    
    
def fit(model, train_ds, epochs, val_ds):
    best_pnsr = 0.0
    step_counter = 0
    for epoch in range(epochs):
        # Train
        total_loss = 0.0
        for makeup_img, non_img, hr_img in tqdm(train_ds):
            loss = train_step(model, makeup_img, non_img, hr_img)
            total_loss = total_loss + loss
            step_counter += 1
        total_loss = total_loss/step_counter
        print('epoch: {}   loss: {}'.format(epoch, total_loss))
        
        pnsr = tf.reduce_mean(evaluate(model, epoch, val_ds))        
        if best_pnsr < pnsr:
            best_pnsr = pnsr
            
            for makeup_img, non_img, hr_img in val_ds.take(1):
                generate_images(model, makeup_img, non_img, hr_img)


In [None]:
from time import time
a = time()
fit(res34unet, train_dataset, EPOCHS, val_dataset)
print(time()-a)