# Mohammed

In [1]:
import numpy as np
import os
import io
import tensorflow as tf
import ipywidgets as widgets
from PIL import Image
from IPython.display import display, clear_output
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.layers import Input, Dense, Lambda, Flatten, Reshape, Conv2D, Conv2DTranspose
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.losses import binary_crossentropy
from tensorflow.keras import backend as K
import random
from ipywidgets import interact, interactive, fixed, interact_manual, VBox, HBox
from PIL import Image


In [2]:
def load_data(directory):
    images = []
    for folder in os.listdir(directory):
        emotion_folder = os.path.join(directory, folder)
        if os.path.isdir(emotion_folder):
            for file in os.listdir(emotion_folder):
                img_path = os.path.join(emotion_folder, file)
                img = load_img(img_path, color_mode='grayscale', target_size=(48, 48))
                img = img_to_array(img)
                img = img / 255.0 
                images.append(img)
    return np.array(images)

train_data = load_data('FER2013/train')
test_data = load_data('FER2013/test')

In [3]:
input_shape = (48, 48, 1)
latent_dim = 6

inputs = Input(shape=input_shape, name='encoder_input')
x = Conv2D(32, 3, activation='relu', strides=2, padding='same')(inputs)
x = Conv2D(64, 3, activation='relu', strides=2, padding='same')(x)
x = Flatten()(x)
x = Dense(16, activation='relu')(x)
z_mean = Dense(latent_dim, name='z_mean')(x)
z_log_var = Dense(latent_dim, name='z_log_var')(x)

def sampling(args):
    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])


encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')


latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
x = Dense(12 * 12 * 64, activation='relu')(latent_inputs)
x = Reshape((12, 12, 64))(x)
x = Conv2DTranspose(64, 3, activation='relu', strides=2, padding='same')(x)
x = Conv2DTranspose(32, 3, activation='relu', strides=2, padding='same')(x)
outputs = Conv2DTranspose(1, 3, activation='sigmoid', padding='same', name='decoder_output')(x)


decoder = Model(latent_inputs, outputs, name='decoder')


outputs = decoder(encoder(inputs)[2])
vae = Model(inputs, outputs, name='vae')

reconstruction_loss = binary_crossentropy(K.flatten(inputs), K.flatten(outputs))
reconstruction_loss *= input_shape[0] * input_shape[1]
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.sum(kl_loss, axis=-1)
kl_loss *= -0.5
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
vae.compile(optimizer='adam')

In [4]:
vae.fit(train_data, epochs=10, batch_size=32, validation_data=(test_data, None))
vae.save('/vae_model.h5')


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


  saving_api.save_model(


In [5]:
def apply_mask(image, mask_size_ratio, mask_position):
    h, w = image.shape[:2]
    mask_size = int(min(h, w) * mask_size_ratio)
    x = int(w * mask_position[0])
    y = int(h * mask_position[1])
    x_end = min(x + mask_size, w)
    y_end = min(y + mask_size, h)
    masked_image = image.copy()
    masked_image[y:y_end, x:x_end] = 0
    return masked_image

def load_data_with_masks(directory):
    images = []
    for folder in os.listdir(directory):
        emotion_folder = os.path.join(directory, folder)
        if os.path.isdir(emotion_folder):
            for file in os.listdir(emotion_folder):
                img_path = os.path.join(emotion_folder, file)
                img = load_img(img_path, color_mode='grayscale', target_size=(48, 48))
                img = img_to_array(img)
                img = img / 255.0  

                
                mask_size_ratio = random.uniform(0.1, 0.5)  
                mask_position_x = random.uniform(0, 1)
                mask_position_y = random.uniform(0, 1)
                mask_position = (mask_position_x, mask_position_y)

                masked_img = apply_mask(img, mask_size_ratio, mask_position)
                images.append(masked_img)
    return np.array(images)




train_data_masked = load_data_with_masks('FER2013/train')
test_data_masked = load_data_with_masks('FER2013/test')

input_shape = (48, 48, 1)  
latent_dim = 6  


inputs_2 = Input(shape=input_shape, name='encoder_input')
x_2 = Conv2D(32, 3, activation='relu', strides=2, padding='same')(inputs_2)
x_2 = Conv2D(64, 3, activation='relu', strides=2, padding='same')(x_2)
x_2 = Flatten()(x_2)
x_2 = Dense(16, activation='relu')(x_2)
z_mean_2 = Dense(latent_dim, name='z_mean_2')(x_2)
z_log_var_2 = Dense(latent_dim, name='z_log_var_2')(x_2)


def sampling_2(args):
    z_mean_2, z_log_var_2 = args
    batch_2 = K.shape(z_mean_2)[0]
    dim_2 = K.int_shape(z_mean_2)[1]
    epsilon_2 = K.random_normal(shape=(batch_2, dim_2))
    return z_mean_2 + K.exp(0.5 * z_log_var_2) * epsilon_2

z_2 = Lambda(sampling_2, output_shape=(latent_dim,), name='z_2')([z_mean_2, z_log_var_2])


encoder_2 = Model(inputs_2, [z_mean_2, z_log_var_2, z_2], name='encoder_2')


latent_inputs_2 = Input(shape=(latent_dim,), name='z_sampling_2')
x_2 = Dense(12 * 12 * 64, activation='relu')(latent_inputs_2)
x_2 = Reshape((12, 12, 64))(x_2)
x_2 = Conv2DTranspose(64, 3, activation='relu', strides=2, padding='same')(x_2)
x_2 = Conv2DTranspose(32, 3, activation='relu', strides=2, padding='same')(x_2)
outputs_2 = Conv2DTranspose(1, 3, activation='sigmoid', padding='same', name='decoder_output')(x_2)


decoder_2 = Model(latent_inputs_2, outputs_2, name='decoder_2')


outputs_2 = decoder_2(encoder_2(inputs_2)[2])
vae_task2 = Model(inputs_2, outputs_2, name='vae_task2')


reconstruction_loss_2 = binary_crossentropy(K.flatten(inputs_2), K.flatten(outputs_2))
reconstruction_loss_2 *= input_shape[0] * input_shape[1]
kl_loss_2 = 1 + z_log_var_2 - K.square(z_mean_2) - K.exp(z_log_var_2)
kl_loss_2 = K.sum(kl_loss_2, axis=-1)
kl_loss_2 *= -0.5
vae_task2_loss = K.mean(reconstruction_loss_2 + kl_loss_2)
vae_task2.add_loss(vae_task2_loss)
vae_task2.compile(optimizer='adam')


vae_task2.fit(train_data_masked, epochs=10, batch_size=32, validation_data=(test_data_masked, None))
vae_task2.save('/vae_task2_model.h5')


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [7]:
def mean_squared_error(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)


# Function to calculate Structural Similarity Index (SSIM)
def calculate_ssim(y_true, y_pred):
    return (y_true, y_pred, data_range==y_true.max() - y_true.min())


# Function to calculate Peak Signal-to-Noise Ratio (PSNR)
def calculate_psnr(y_true, y_pred):
    return (y_true, y_pred, data_range==y_true.max() - y_true.min())


def reconstruct_image(image,  vae_model):
    image = image / 255.0
    image = np.expand_dims(image, axis=2)
    reconstructed_image = vae_model.predict(image[None, ...])[0]
    mse = mean_squared_error(image, reconstructed_image )
    ssim = calculate_ssim(image, reconstructed_image)
    psnr = calculate_psnr(image, reconstructed_image)
    print("MSE = ", mse)
    print("ssim_score",ssim)
    print("psnr_score", psnr)
    return reconstructed_image.squeeze(axis=2)




In [8]:
l1 = widgets.FloatSlider(min=-3, max=3, step=0.1, description='Dim 1')
l2 = widgets.FloatSlider(min=-3, max=3, step=0.1, description='Dim 2')
l3 = widgets.FloatSlider(min=-3, max=3, step=0.1, description='Dim 3')
l4 = widgets.FloatSlider(min=-3, max=3, step=0.1, description='Dim 4')
l5 = widgets.FloatSlider(min=-3, max=3, step=0.1, description='Dim 5')
l6 = widgets.FloatSlider(min=-3, max=3, step=0.1, description='Dim 6')


image_widget = widgets.Image(format='png', width=300, height=400)


def update_image(change):
    latent_vector = np.array([[l1.value, l2.value, l3.value, l4.value, l5.value, l6.value]])
    reconstructed_img = decoder.predict(latent_vector)
    reconstructed_img = np.squeeze(reconstructed_img, axis=0)
    reconstructed_img = (reconstructed_img * 255).astype(np.uint8)

    
    if reconstructed_img.ndim == 3:
        reconstructed_img = reconstructed_img[:, :, 0]

    
    img = Image.fromarray(reconstructed_img, 'L')
    with io.BytesIO() as output:
        img.save(output, format="PNG")
        contents = output.getvalue()
    image_widget.value = contents


l1.observe(update_image, names='value')
l2.observe(update_image, names='value')
l3.observe(update_image, names='value')
l4.observe(update_image, names='value')
l5.observe(update_image, names='value')
l6.observe(update_image, names='value')


sliders = widgets.VBox([l1, l2, l3, l4, l5, l6])
ui = widgets.HBox([sliders, image_widget])
display(ui)


update_image(None)


HBox(children=(VBox(children=(FloatSlider(value=0.0, description='Dim 1', max=3.0, min=-3.0), FloatSlider(valu…



In [9]:
def load_my_model(path):
    model = load_model(path)
    return model


vae = load_my_model("vae_model.h5")
vae_task2 = load_my_model("vae_task2_model.h5")

def to_array(image):

    image = Image.open(io.BytesIO(image))
    arr = np.array(image)
    return arr

def array_to_bytes(arr):
 
    
    if arr.max() <= 1:
        image = Image.fromarray((arr * 255).astype(np.uint8))
    else:
        image = Image.fromarray((arr).astype(np.uint8))
 
    img_byte_arr = io.BytesIO()
    image.save(img_byte_arr, format='jpeg')

    img_byte_arr = img_byte_arr.getvalue()

    return img_byte_arr


state = {}

state['image'] = np.ones((48,48))
state['masked_image'] = np.copy(state['image'])
state['upload_file'] = widgets.FileUpload(
        accept='jpg',
        multiple=False
    )

state['vae1'] = vae
state['vae2'] = vae_task2



image_np = np.array(state['image'])
masked_image = np.copy(image_np)




size_factor_slider = widgets.FloatSlider(value=0., min=0., max=.5, step=0.05, description='Size Factor:')
start_y_slider = widgets.FloatSlider(value=0., min=0., max=1., step=0.05, description='Start X:')
start_x_slider = widgets.FloatSlider(value=0., min=0., max=1., step=0.05, orientation='vertical' , description=' Start Y:')



output_widget_1 = widgets.Output()
output_widget_2 = widgets.Output()
output_widget_3 = widgets.Output()



def update_image(change):

        start_x = start_x_slider.value
        start_y = start_y_slider.value
        size_factor = size_factor_slider.value


        shape = state['image'].shape



        masked_image = np.copy(state['image'])
        mask_height = int(size_factor * shape[0])
        mask_width = int(size_factor * shape[1])


        start_x = int(start_x  *  (shape[0] - mask_height))
        start_y = int(start_y  *  (shape[1] - mask_width))

        end_x = int( start_x + mask_height )
        end_y = int( start_y + mask_width )
        masked_image[start_x:end_x , start_y:end_y] = 0

        with output_widget_1:
            clear_output(wait=True)


            display(widgets.Image(
                      value=array_to_bytes(masked_image),
                      format='jpeg',
                      width=300,
                      height=400,
                  ))

        with output_widget_2:
            clear_output(wait=True)

            display(widgets.Image(
                      value=array_to_bytes(reconstruct_image(masked_image, state['vae1'])),
                      format='jpeg',
                      width=300,
                      height=400,
                  ))

        with output_widget_3:
            clear_output(wait=True)

            display(widgets.Image(
                      value=array_to_bytes(reconstruct_image(masked_image, state['vae2'])),
                      format='jpg',
                      width=300,
                      height=400,
                  ))




start_x_slider.observe(update_image, names='value')
start_y_slider.observe(update_image, names='value')
size_factor_slider.observe(update_image, names='value')


def show_file_upload(event):
      display(state['upload_file'])


def on_upload_change(change):

    uploader = state['upload_file']
    uploaded_file = uploader.value[0]['content'].tobytes()
    state['image'] = to_array(uploaded_file)

    update_image("")


state['upload_file'].observe(on_upload_change, names='value')


upload_button = widgets.Button(description="upload a file")
upload_button.on_click(show_file_upload)
image_load = VBox([output_widget_1, upload_button])





internal = HBox([start_x_slider, image_load ])
col1 = VBox([size_factor_slider, start_y_slider,   internal])




col2 = VBox([output_widget_2])
col3 = VBox([output_widget_3])



main_box = HBox([col1, col2, col3])
update_image("")
display(main_box)




HBox(children=(VBox(children=(FloatSlider(value=0.0, description='Size Factor:', max=0.5, step=0.05), FloatSli…