#Notes: I ran it on Kaggle using Accelerator GPU

#Get data from gg Drive

In [None]:
# Update gdown
!pip install --upgrade --no-cache-dir gdown

In [None]:
# Download data from google drive
!gdown --id 1pRcJVhzns_6eZz6iq26w6C9UdGeb4u0B

In [None]:
# Unzip data
!unzip -q ./makeup_data.zip -d ./

# Import Module/Library

In [None]:
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt 

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.vgg19 import VGG19
from PIL import Image

In [None]:
tf.__version__

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
  try:
    # Currently, memory growth needs to be the same across GPUs
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Memory growth must be set before GPUs have been initialized
    print(e)


# Config Parameters

In [None]:
BATCH_SIZE  = 1
IMG_HEIGHT  = 256
IMG_WIDTH   = 256
IMG_CHANNEL = 3

# Path to folder de-makeup data 
IMG_PATH = "./makeup_data"
BUFFER_SIZE = 100

# Load Data

In [None]:
def load(image_file):
    """
    Parameters
    ----------
    image_path : string 

    Returns
    -------
    input_image: tf.Tensor (tf.float32)  image makeup
    target_image: tf.Tensor (tf.float32) image demakeup
    """
    # Read image
    image = tf.io.read_file(image_file)
    image = tf.image.decode_jpeg(image)

    #Resize image 
    image  = tf.image.resize(image, (IMG_HEIGHT, IMG_WIDTH*3))

    # Split image 
    target_image = image[:, IMG_WIDTH : IMG_WIDTH*2, :]
    input_image = image[:, IMG_WIDTH*2: :]
    
    # Convert to float32
    input_image  = tf.cast(input_image, tf.float32)
    target_image = tf.cast(target_image, tf.float32)

    return input_image, target_image

In [None]:
path = "./makeup_data/train/0.png"
input_image, target_image = load(path)

print(input_image.shape)
print(target_image.shape)


# casting to int for matplotlib to show the image
plt.figure()
plt.imshow(input_image/255.0)
plt.figure()
plt.imshow(target_image/255.0)

In [None]:
@tf.function()
def random_flip(input_image, target_image):

    """
    Parameters
    ----------
    input_image: tf.Tensor (tf.float32)  image makeup
    target_image: tf.Tensor (tf.float32) image demakeup

    Returns
    -------
    input_image: tf.Tensor (tf.float32)  image makeup - flip (50%)
    target_image: tf.Tensor (tf.float32) image demakeup - flip (50%)
    """

    # random 50% flip right or left
    if tf.random.uniform(()) > 0.5:
        input_image  = tf.image.flip_left_right(input_image)
        target_image = tf.image.flip_left_right(target_image)

    return input_image, target_image

    
def processing_image(input_image, target_image):
    """
    Parameters
    ----------
    input_image: tf.Tensor (tf.float32)  image makeup
    target_image: tf.Tensor (tf.float32) image demakeup

    Returns
    -------
    input_image: tf.Tensor (tf.float32) - processing_image
    target_image: tf.Tensor (tf.float32)- processing_image
    """

    # Use preprocess input resnet50 because of using Pretrained model
    # input_image = keras.applications.resnet50.preprocess_input(input_image)

    input_image  = (input_image / 127.5) - 1
    target_image = (target_image / 127.5) - 1

    return input_image, target_image


def load_image_train(image_file):
    """
    ----------
    input_image: tf.Tensor (tf.float32)  image makeup
    target_image: tf.Tensor (tf.float32) image demakeup

    Returns
    -------
    input_image: tf.Tensor (tf.float32)  load image makeup for training
    target_image: tf.Tensor (tf.float32) load image demakeup for training
    """
    # load image
    input_image, target_image = load(image_file)

    # random_flip
    input_image, target_image = random_flip(input_image, target_image)

    # processing_image
    input_image, target_image = processing_image(input_image, target_image)
    
    return input_image, target_image


def load_image_val(image_file):
    """
    Parameters
    ----------
    input_image: tf.Tensor (tf.float32)  image makeup
    target_image: tf.Tensor (tf.float32) image demakeup

    Returns
    -------
    input_image: tf.Tensor (tf.float32)  load image makeup for validation
    target_image: tf.Tensor (tf.float32) load image demakeup for validation
    """
    # dùng hàm load để load ảnh 
    input_image, target_image = load(image_file)
    
    # dùng hàm processing_image để xử lý ảnh
    input_image, target_image = processing_image(input_image, target_image)
    
    return input_image, target_image

In [None]:
train_dataset = tf.data.Dataset.list_files(str(IMG_PATH + '/train/*.png'))
train_dataset = train_dataset.map(load_image_train,
                                  num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)

In [None]:
val_dataset = tf.data.Dataset.list_files(str(IMG_PATH + '/val/*.png'))
val_dataset = val_dataset.map(load_image_val)
val_dataset = val_dataset.batch(BATCH_SIZE)

In [None]:
test_dataset = tf.data.Dataset.list_files(str(IMG_PATH + '/test/*.png'))
test_dataset = test_dataset.map(load_image_val)
test_dataset = test_dataset.batch(BATCH_SIZE)

# Build Model 

In [None]:
class BuildRes50Unet():
    def __init__(self):
        pass
    
    def conv_block(self, inputs, num_filters):
        """Two convolution 
        Parameters
        ----------
        inputs: tf.Tensor 
        num_filters: int 

        Returns
        -------
        x: tf.Tensor 
            inputs pass 2 convlution block (Conv2D, LayerNormalization, Activation function LeakyReLU)
        """
        # Init weight
        initializer = tf.random_normal_initializer(0., 0.02)

        # Conv 1
        x = layers.Conv2D(num_filters, 3, strides=1, padding='same',
                          kernel_initializer=initializer, use_bias=False)(inputs)
        x = layers.LayerNormalization()(x)
        x = layers.LeakyReLU()(x)

        # Conv 2
        x = layers.Conv2D(num_filters, 3, strides=1, padding='same',
                          kernel_initializer=initializer, use_bias=False)(x)
        x = layers.LayerNormalization()(x)
        x = layers.LeakyReLU()(x)
        
        return x
    
    
    def upsample_concate_block(self, inputs, skip_connection, num_filters):
        """Upsampling, skip connection and via 2 convolution block
        Parameters
        ----------
        inputs: tf.Tensor 
        skip_connection: tf.Tensor 
        num_filters: int 

        Returns
        -------
        x: tf.Tensor 
        """
        
        # Upsampling 
        x = layers.UpSampling2D(interpolation='bilinear')(inputs)

        # Concatenate tensor upsample and skip connection
        x = layers.Concatenate()([x, skip_connection])

        # Via 2 convolution block
        x = self.conv_block(x, num_filters)

        return x    
    
    def build_model(self, input_shape):
        """Build Unet architecture use pretrained backbone (encoder)
        Parameters
        ----------
        input_shape: Tuple  
            
        Returns
        -------
        model: tf.keras.Model
            Unet model 
        """
        # Input layer 
        inputs = layers.Input(shape=input_shape)

        # Encoder

        # Get pretrained backbone 
        """ Pre-trained ResNet50 Model """
        resnet50 = ResNet50(include_top=False, weights="imagenet", input_tensor=inputs)

        """ Encoder """
        s1 = resnet50.layers[0].output         
        s2 = resnet50.get_layer("conv1_relu").output        
        s3 = resnet50.get_layer("conv2_block3_out").output  
        s4 = resnet50.get_layer("conv3_block4_out").output  

        """ Bridge """
        b1 = resnet50.get_layer("conv4_block6_out").output  

        """ Decoder """
        d1 = self.upsample_concate_block(b1, s4, 512)             
        d2 = self.upsample_concate_block(d1, s3, 256)                  
        d3 = self.upsample_concate_block(d2, s2, 128)                   
        d4 = self.upsample_concate_block(d3, s1, 64)  
        
        """ Output """
        # Use tanh for output [-1: 1]
        outputs = layers.Conv2D(filters=input_shape[-1], kernel_size=(1,1), activation='tanh',
                                      kernel_initializer='he_normal', use_bias=False)(d4)
        
        # Value input and output [-1:1]
        model = Model(inputs=[inputs], outputs=[outputs])
        
        return model

In [None]:
model = BuildRes50Unet()
res50Unet = model.build_model(input_shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL))

In [None]:
res50Unet.summary()

# Config and Compile Model

## Show Generated Images and Evaluation Function

In [None]:
def evaluate(model, epoch, val_ds):  
    psnr_mean = 0.0
    count = 0

    # Get input and targets from validation dataset
    for inputs, targets in val_ds:
        fake = model([inputs], training=True)
         
        # Scale to [0,1] and compute PSNR
        psnr = tf.image.psnr(fake*0.5 + 0.5, targets*0.5 + 0.5, max_val=1.0)

        __psnr_mean = tf.math.reduce_mean(psnr)
        
        psnr_mean += __psnr_mean
        count =count + 1
    
    psnr_mean = psnr_mean/count
    print('-------- psnr: ', psnr_mean.numpy(), '   ----- epoch: ', epoch, '  count: ', count)
    
    return psnr_mean
    
def generate_images(model, inputs, targets, epoch, pnsr):
    # Show images genearation from validation dataset
    fake = model([inputs], training=True)
    plt.figure(figsize=(15,20))
    
    # Get one images
    display_list = [inputs[0], targets[0], fake[0]]
    title = ['Input', 'Real', 'Predicted']    

    # Save output better performance than 
    output = np.concatenate([display_list[0], display_list[1], display_list[2]], axis=1)
    output = (output * 0.5 + 0.5) * 255
    output = output.astype(np.uint8)
    Image.fromarray(output).save(f"./save_result/validation_results/epoch:{epoch} pnsr:{pnsr}.png")
    
    # Show output
    for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
        plt.imshow(display_list[i] * 0.5 + 0.5)
        plt.axis('off')
    plt.show()        

## Create Folder to save result

In [None]:
os.makedirs("./save_result", exist_ok=True)
os.makedirs("./save_result/validation_results", exist_ok=True)
os.makedirs("./save_result/testing_results", exist_ok=True)
os.makedirs("./save_result/save_model", exist_ok=True)

## Loss

In [None]:
class PeceptualLoss():
    def __init__(self, input_shape = (256,256,3)):
        self.input_shape = input_shape    
    
    def get_extractor(self):
        """
        Parameters
        ----------
        input_shape: tuple 

        Returns
        -------
        model:  tf.keras.Model 
            pretrained model (vgg19) to extract feature for perceptual loss
        """

        vgg = VGG19(input_shape= self.input_shape, include_top=False, weights='imagenet')
        model = tf.keras.Model(vgg.input, vgg.layers[20].output)
        
        return model
        
        
    def percep_loss(self, target_image, fake_image):

        """
        Parameters
        ----------
        target_image: tf.tensor  
        fake_image: tf.tensor  
        Returns
        -------
        loss:  tf.tensor
        """
        
        # Get feature
        model_vgg = self.get_extractor()
        
        # Get mean square error
        mean_squared_error = tf.keras.losses.MeanSquaredError()

        # Change chanel = 3 if chanel = 1
        if target_image.shape[3]==1:
            target_image = tf.concat([target_image, target_image, target_image], 3)

        if fake_image.shape[3]==1:
            fake_image = tf.concat([fake_image, fake_image, fake_image], 3)

        # Get feature for taget and fake image
        target_image_feature = model_vgg(target_image)
        fake_image_feature = model_vgg(fake_image)
        
        # Compute mean square error
        loss = mean_squared_error(target_image_feature, fake_image_feature)    

        return loss        

In [None]:
# Init perceptual loss
per_loss = PeceptualLoss(input_shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL))

In [None]:
def generator_loss(fake, target, loss_type='L1'):
    loss_list = ["L1", "Percep"]
    assert loss_type in loss_list

    # L1 loss
    if loss_type == loss_list[0]:
        loss = tf.reduce_mean(tf.abs(fake-target))

    # Perceptual Loss
    elif loss_type == loss_list[1]:
        loss = per_loss.percep_loss(target, fake)
    
    return loss

## Optimizer

In [None]:
generator_optimizer = tf.keras.optimizers.Adam(1e-4, beta_1=0.5)

## Training and Evaluation

In [None]:
@tf.function
def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        # output
        fake = res50Unet([inputs], training=True)     
        loss = generator_loss(fake, targets)
        
    generator_gradients = tape.gradient(loss, res50Unet.trainable_variables)
    generator_optimizer.apply_gradients(zip(generator_gradients, res50Unet.trainable_variables))
    
    return loss

    
    
def fit(train_ds, epochs, val_ds):
    best_pnsr = 0.0
    for epoch in range(epochs):
        # Train
        total_loss = 0.0
        for inputs, targets in train_ds:
            loss = train_step(inputs, targets)
            total_loss = total_loss + loss

        total_loss = (total_loss/len(train_ds))*100
        print('epoch: {}   loss: {}'.format(epoch, total_loss))
        
        pnsr = evaluate(res50Unet, epoch, val_ds)        
        if best_pnsr < pnsr:
            best_pnsr = pnsr
            res50Unet.save("./save_result/save_model/model-resnet-50-unet.h5")
            for inputs, targets in val_ds.take(1):
                generate_images(res50Unet, inputs, targets, epoch, best_pnsr)
                print("Save result successfully!")


In [None]:
EPOCHS = 150
fit(train_dataset, EPOCHS, val_dataset)

## Testing Results

In [None]:
for x in range(15):
    for inputs, targets in test_dataset.take(1):
        fake = res50Unet([inputs], training=True)
        plt.figure(figsize=(15,20))

        display_list = [inputs[0], targets[0], fake[0]]
        title = ['Input Left', 'Real Left', 'Predicted Left']    
        
        # Save output testing
        output = np.concatenate([display_list[0], display_list[1], display_list[2]], axis=1)
        output = (output * 0.5 + 0.5) * 255
        output = output.astype(np.uint8)
        Image.fromarray(output).save(f"./save_result/testing_results/testing_result_{x}.png")
        
        for i in range(3):
            plt.subplot(1, 3, i+1)
            plt.title(title[i])
            plt.imshow(display_list[i] * 0.5 + 0.5)
            plt.axis('off')
        plt.show() 

## Save Resuls

In [None]:
res50Unet.save("./save_result/save_model/model-resnet-50-unet-final")

In [None]:
!zip -r save_result.zip save_result

#Load model to use

In [None]:
# Load Model
from tensorflow import keras
res50Unet = keras.models.load_model('../input/model-unet-resnet50-l1/model-resnet-50-unet.h5')

In [None]:
for x in range(15):
    for inputs, targets in test_dataset.take(1):
        fake = res50Unet([inputs], training=True)
        plt.figure(figsize=(15,20))

        display_list = [inputs[0], targets[0], fake[0]]
        title = ['Input Left', 'Real Left', 'Predicted Left']    
        
        for i in range(3):
            plt.subplot(1, 3, i+1)
            plt.title(title[i])
            plt.imshow(display_list[i] * 0.5 + 0.5)
            plt.axis('off')
        plt.show() 