In [None]:
import os
import random
import pandas as pd
import numpy as np

import json

import cv2
import matplotlib.pyplot as plt


import mediapipe as mp
from mtcnn import MTCNN
from deepface import DeepFace

from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA

from tensorflow import keras
import tensorflow as tf
import keras.backend as K

from keras.optimizers.legacy import Adam
from keras.losses import binary_crossentropy
from keras.callbacks import ModelCheckpoint, EarlyStopping

from skimage.io import imread
from skimage.transform import resize
from skimage.color import gray2rgb

# Part 1 - Check what image is anfas

In [None]:
# folder of init images
TRAIN_FOLDER = "wiki_crop"
# key:image_path, value:box of face and other
IMAGES_INFO = {}
# folder for new crop image
CROPPED_FOLDER = "cropped/"

In [None]:
# Initialize MediaPipe Face Mesh.
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils
drawing_spec = mp_drawing.DrawingSpec(color=(128, 0, 128), thickness=2, circle_radius=1)


def get_head_pos(image_path, tresh):
    # Load an image.
    image = cv2.imread(image_path)  # Make sure to use the correct path to your image.

    # Convert the color from BGR to RGB.
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    image.flags.writeable = False

    # Process the image.
    results = face_mesh.process(image)

    image.flags.writeable = True

    # Convert the color from RGB to BGR.
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

    img_h, img_w, img_c = image.shape
    face_2d = []
    face_3d = []

    if results.multi_face_landmarks:
        for face_landmarks in results.multi_face_landmarks:
            for idx, lm in enumerate(face_landmarks.landmark):
                if idx == 33 or idx == 263 or idx == 1 or idx == 61 or idx == 291 or idx == 199:
                    if idx == 1:
                        nose_2d = (lm.x * img_w, lm.y * img_h)
                        nose_3d = (lm.x * img_w, lm.y * img_h, lm.z * 3000)
                    x, y = int(lm.x * img_w), int(lm.y * img_h)

                    face_2d.append([x, y])
                    face_3d.append(([x, y, lm.z]))

            # Convert to numpy array for CV2 processing.
            face_2d = np.array(face_2d, dtype=np.float64)
            face_3d = np.array(face_3d, dtype=np.float64)

            focal_length = 1 * img_w
            cam_matrix = np.array([[focal_length, 0, img_h / 2],
                                  [0, focal_length, img_w / 2],
                                  [0, 0, 1]])
            distortion_matrix = np.zeros((4, 1), dtype=np.float64)

            # Solve the PnP problem.
            success, rotation_vec, translation_vec = cv2.solvePnP(face_3d, face_2d, cam_matrix, distortion_matrix)

            rmat, jac = cv2.Rodrigues(rotation_vec)
            angles, mtxR, mtxQ, Qx, Qy, Qz = cv2.RQDecomp3x3(rmat)

            x = angles[0] * 360
            y = angles[1] * 360
            z = angles[2] * 360

            # Determine the text based on the angles.
            text = "Forward"
            if y < -tresh:
                text = "Looking Left"
            elif y > tresh:
                text = "Looking Right"
            elif x < -tresh:
                text = "Looking Down"
            elif x > tresh:
                text = "Looking Up"
            
            return text

    else:
        return "idk"

In [None]:
# result list of image pathes that "look forward"
result_list = []
for folder in os.listdir(TRAIN_FOLDER):
    # if it is directory
    if os.path.isdir(os.path.join(TRAIN_FOLDER, folder)):
        
        # path to images in that directory
        images_path = os.path.join(TRAIN_FOLDER, folder)
        
        for img_name in os.listdir(images_path):
            
            # image path
            image_path = os.path.join(images_path, img_name)
            res = get_head_pos(image_path, 15) 
            # print(res)
            if res == "Forward":
                result_list.append(image_path)
            if len(result_list) % 100 == 0:
                print(len(result_list))

In [None]:
# # alternative save as txt

# # File path
# file_path = 'full_face_list.txt'

# # Write the list to a file
# with open(file_path, 'w') as file:
#     for item in result_list:
#         file.write(f"{item}\n")

# Part 2 - Crop image with MTCNN

## Detect box of faces

In [None]:
# # opening the file in read mode 
# my_file = open(file_path, "r") 
  
# # reading the file 
# data = my_file.read() 
  
# # replacing end splitting the text  
# # when newline ('\n') is seen. 
# data_into_list = data.split("\n")
# my_file.close() 

# # if you also have ".DS_Store" file
# data_into_list.pop(-1)

In [None]:
# for time consumption
start_time = time.time()

# model for face detection
detector = MTCNN()


for image_path in result_list:

                
    # open image
    img = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB)
    
    # save model predict
    IMAGES_INFO[image_path] = detector.detect_faces(img)

            

print("Time of execution:", time.time()-start_time)

In [None]:
# # alternative save as json
# # Specify the filename to save the dictionary as a JSON file
# filename = 'IMAGES_INFO_FINAL.json'

# # Writing JSON data
# with open(filename, 'w') as f:
#     json.dump(IMAGES_INFO, f)

# print(f"Dictionary saved as JSON in {filename}")

## Cropping and saving new images

In [None]:
# file = open(filename)

# IMAGES_INFO = json.load(file)

In [None]:
size_mean = ([], [])

for image_path in IMAGES_INFO.keys():
    try:
        x, y, h, w = IMAGES_INFO[image_path][0]['box']
    
        # Read the image into memory
        image = cv2.imread(image_path)
        
        cropped_image = image[y:(y+w), x:(x+h)]
        
        size_mean[0].append(cropped_image.shape[0])
        size_mean[1].append(cropped_image.shape[1])
        
        cv2.imwrite(os.path.join(CROPPED_FOLDER,image_path[13:]), cropped_image)
    except:
        print('pass')
        pass

In [None]:
#Mean size of image
(np.mean(size_mean[0]), np.mean(size_mean[1]))

# Part 3 - Classification images with Deepface for segmentation

In [None]:
#key:image_path, value:results of classification
result_dict = {}

all_images = os.listdir(CROPPED_FOLDER)

for i, path in enumerate(all_images):
    if path[0] == '.':
        # print(path)
        # print(i)
        all_images.pop(i)

for ind, image_name in enumerate(all_images):
    image_path = CROPPED_FOLDER + "/" + image_name
    # print(image_path)
    objs = DeepFace.analyze(
        img_path= image_path,
        actions= ['age', 'gender', 'race', 'emotion'],
        enforce_detection= False
    )
    result_dict[image_name] = {
        'age': objs[0]['age'],
        'gender': objs[0]['dominant_gender'], 
        'race': objs[0]['dominant_race'],
        'emotion': objs[0]['dominant_emotion'],
    }
    if ind % 100 == 0:
        print(ind)

In [None]:
# # alternative save as json
# # Specify the filename to save the dictionary as a JSON file
# filename = 'crop_photo_classes.json'

# # Writing JSON data
# with open(filename, 'w') as f:
#     json.dump(result_dict, f)

# print(f"Dictionary saved as JSON in {filename}")

In [None]:
# lists of image_path for different classes

men = []
women = []


age_25 = []
age_25_30 = []
age_30_40 = []
age_50 = []

asian = []
white = []
middle_eastern = []
indian = []
latino = []
black = []


angry = []
fear = []
neutral = []
sad = []
disgust = []
happy = []
surprise = []


for key in data.keys():
    if data[key]["gender"] == "Man":
        men.append(key)
    else:
        women.append(key)


    if data[key]["race"] == "asian":
        asian.append(key)
    elif data[key]["race"] == "white":
        white.append(key)
    elif data[key]["race"] == "middle eastern":
        middle_eastern.append(key)
    elif data[key]["race"] == "indian":
        indian.append(key)
    elif data[key]["race"] == "latino hispanic":
        latino.append(key)
    else:
        black.append(key)

    if data[key]["emotion"] == "happy":
        happy.append(key)
    elif data[key]["emotion"] == "neutral":
        neutral.append(key)
    elif data[key]["emotion"] == "fear":
        fear.append(key)
    elif data[key]["emotion"] == "sad":
        sad.append(key)
    elif data[key]["emotion"] == "angry":
        angry.append(key)
    elif data[key]["emotion"] == "surprise":
        surprise.append(key)
    else:
        disgust.append(key)

    if data[key]["age"] <= 25:
        age_25.append(key)
    elif data[key]["age"] <= 30:
        age_25_30.append(key)
    elif data[key]["age"] <= 40:
        age_30_40.append(key)
    else:
        age_50.append(key)

# Part 4 - Fit Autoencoders

## Conv Autoencoder

In [None]:
MODEL_WEIGHTS_PATH = 'models_weigths/'


#Training parameters
VALID_SIZE = 0.1
N_EPOCHS = 15
BATCH_SIZE = 32
IMAGE_RESIZE = (256, 256)

#random seed
RANDOM_SEED = 42

In [None]:
all_images = os.listdir(CROPPED_FOLDER)

for i, path in enumerate(all_images):
    if path[0] == '.':
        print(path)
        print(i)
        all_images.pop(i)

#split images 
train_filenames, valid_filenames = train_test_split(
    all_images, 
    test_size=VALID_SIZE,
    random_state=RANDOM_SEED
)

In [None]:
print('Train sample size: ', len(train_filenames))
print('Valid sample size: ', len(valid_filenames))

### Data generator

In [None]:
#Data generator for images

class generator(keras.utils.Sequence):
    
    def __init__(self, folder, filenames, batch_size=BATCH_SIZE, image_size=IMAGE_RESIZE, shuffle=True, predict=False):
        self.folder = folder
        self.filenames = filenames
        self.batch_size = batch_size
        self.image_size = image_size
        self.shuffle = shuffle
        self.predict = predict
        self.on_epoch_end()
        
    def __load__(self, filename):
        # load jpg file as numpy array
        img = imread(os.path.join(self.folder, filename))
        
        if len(img.shape) == 2:
            img = gray2rgb(img)
        
        img = resize(img.astype(np.float32), self.image_size, mode='reflect')/255.0
        
        return img
    
    def __loadpredict__(self, filename):
        # load jpg file as numpy array
        img = imread(os.path.join(self.folder, filename))
        
        if len(img.shape) == 2:
            img = gray2rgb(img)
            
        # resize both image and mask
        img = resize(img.astype(np.float32), self.image_size, mode='reflect')/255.0 
        return img
        
    def __getitem__(self, index):
        # select batch
        filenames = self.filenames[index*self.batch_size:(index+1)*self.batch_size]
        # predict mode: return images and filenames
        if self.predict:
            # load files
            imgs = [self.__loadpredict__(filename) for filename in filenames]
            # create numpy batch
            imgs = np.array(imgs)
            return imgs, filenames
        # train mode: return images and masks
        else:
            # load files
            imgs = [self.__load__(filename) for filename in filenames]
            
            # create numpy batch
            imgs = np.array(imgs)
            
            return imgs, imgs
        
    def on_epoch_end(self):
        if self.shuffle:
            random.shuffle(self.filenames)
        
    def __len__(self):
        if self.predict:
            # return everything
            return int(np.ceil(len(self.filenames) / self.batch_size))
        else:
            # return full batches only
            return int(len(self.filenames) / self.batch_size)

In [None]:
#Train and valid data generators
train_gen = generator(TRAIN_FOLDER, train_filenames)
valid_gen = generator(TRAIN_FOLDER, valid_filenames)

### Conv stacked Autoencoder

In [None]:
conv_encoder = keras.models.Sequential([
    keras.layers.Input(shape=(256, 256, 3)),
    keras.layers.Conv2D(16, kernel_size=3, padding="SAME", activation="selu"),
    keras.layers.MaxPool2D(pool_size=2),
    keras.layers.Conv2D(32, kernel_size=3, padding="SAME", activation="selu"),
    keras.layers.MaxPool2D(pool_size=2),
    keras.layers.Conv2D(64, kernel_size=3, padding="SAME", activation="selu"),
    keras.layers.MaxPool2D(pool_size=2),
    keras.layers.Conv2D(128, kernel_size=3, padding="SAME", activation="selu"),
    keras.layers.MaxPool2D(pool_size=2),
    keras.layers.Conv2D(256, kernel_size=3, padding="SAME", activation="selu"),
    keras.layers.MaxPool2D(pool_size=2),
    keras.layers.Conv2D(512, kernel_size=3, padding="SAME", activation="selu"),
    keras.layers.MaxPool2D(pool_size=2),
    keras.layers.Conv2D(1024, kernel_size=3, padding="SAME", activation="selu"),
    keras.layers.MaxPool2D(pool_size=2),
])
conv_decoder = keras.models.Sequential([
    keras.layers.Conv2DTranspose(512, kernel_size=3, strides=2, padding="SAME", activation="selu", 
                                 input_shape=[2, 2, 1024]),
    keras.layers.Conv2DTranspose(256, kernel_size=3, strides=2, padding="SAME", activation="selu"),
    keras.layers.Conv2DTranspose(128, kernel_size=3, strides=2, padding="SAME", activation="selu"),
    keras.layers.Conv2DTranspose(64, kernel_size=3, strides=2, padding="SAME", activation="selu"),
    keras.layers.Conv2DTranspose(32, kernel_size=3, strides=2, padding="SAME", activation="selu"),
    keras.layers.Conv2DTranspose(16, kernel_size=3, strides=2, padding="SAME", activation="selu"),
    keras.layers.Conv2DTranspose(3, kernel_size=3, strides=2, padding="SAME", activation="sigmoid")
])
conv_ae = keras.models.Sequential([conv_encoder, conv_decoder])

In [None]:
#Model compile
conv_ae.compile(optimizer=Adam(1e-4), 
                loss='mse')

In [None]:
#add ModelCheckpoint and EarlyStopping callbacks
weight_path="{}_weights_best.h5".format(MODEL_WEIGHTS_PATH+'auto_encoder')

checkpoint = ModelCheckpoint(weight_path, monitor='val_loss', verbose=1, 
                             save_best_only=True, mode='min', save_weights_only = True)

early = EarlyStopping(monitor="val_loss", 
                      mode="min", 
                      patience=5)

callbacks_list = [checkpoint, early]

In [None]:
#Training model
loss_history = conv_ae.fit(train_gen,
                           epochs=N_EPOCHS,
                           validation_data=valid_gen,
                           callbacks=callbacks_list
                              )

In [None]:
plt.figure(figsize=(12,4))
plt.plot(loss_history.epoch, loss_history.history["loss"], label="Train loss")
plt.plot(loss_history.epoch, loss_history.history["val_loss"], label="Valid loss")
plt.legend()

## Sparse Autoencoder

In [None]:
#Train and valid data generators
train_gen_dence = generator(TRAIN_FOLDER, train_filenames, image_size=(70, 50))
valid_gen_dence = generator(TRAIN_FOLDER, valid_filenames, image_size=(70, 50))

In [None]:
sparse_l1_encoder = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[70, 50, 3]),
    keras.layers.Dense(5000, activation="selu"),
    keras.layers.Dense(2000, activation="selu"),
    keras.layers.Dense(1000, activation="selu"),
    keras.layers.Dense(500, activation="selu"),
    keras.layers.Dense(250, activation="sigmoid"),
    keras.layers.ActivityRegularization(l1=1e-3)  # Alternatively, you could add
                                                  # activity_regularizer=keras.regularizers.l1(1e-3)
                                                  # to the previous layer.
])
sparse_l1_decoder = keras.models.Sequential([
    keras.layers.Dense(500, activation="selu", input_shape=[250]),
    keras.layers.Dense(1000, activation="selu"),
    keras.layers.Dense(2000, activation="selu"),
    keras.layers.Dense(5000, activation="selu"),
    keras.layers.Dense(70 * 50 * 3, activation="sigmoid"),
    keras.layers.Reshape([70, 50, 3])
])
sparse_l1_ae = keras.models.Sequential([sparse_l1_encoder, sparse_l1_decoder])

In [None]:
#Model compile
sparse_l1_ae.compile(optimizer=Adam(1e-4), 
                loss='mse')

In [None]:
#add ModelCheckpoint and EarlyStopping callbacks
weight_path="{}_weights_best.h5".format(MODEL_WEIGHTS_PATH+'auto_encoder_sparse')

checkpoint = ModelCheckpoint(weight_path, monitor='val_loss', verbose=1, 
                             save_best_only=True, mode='min', save_weights_only = True)

early = EarlyStopping(monitor="val_loss", 
                      mode="min", 
                      patience=5)

callbacks_list = [checkpoint, early]

In [None]:
#Training model
loss_history = sparse_l1_ae.fit(train_gen_dence,
                           epochs=N_EPOCHS,
                           validation_data=valid_gen_dence,
                           callbacks=callbacks_list
                              )

In [None]:
plt.figure(figsize=(12,4))
plt.plot(loss_history.epoch, loss_history.history["loss"], label="Train loss")
plt.plot(loss_history.epoch, loss_history.history["val_loss"], label="Valid loss")
plt.legend()

## PCA

In [None]:
#Train and valid data generators
train_gen_pca = generator(TRAIN_FOLDER, train_filenames, batch_size=3000, image_size=(200, 150))
valid_gen_pca = generator(TRAIN_FOLDER, valid_filenames, image_size=(200, 150))

In [None]:
pca = PCA(n_components=3000)

for items in train_gen_pca:
    imgs = items[0]
    break
    
    
flatten_imgs = np.array([x.flatten() for x in imgs])
pca.fit(flatten_imgs)

# Part 5 - present_results

In [None]:
# image names to combine
test_filenames = []

## Conv Autoencoder

In [None]:
test_gen = generator(TRAIN_FOLDER, test_filenames, predict=True, batch_size=len(test_filenames))

In [None]:
for items in test_gen:
    
    imgs_for_print = items[0]
    
pred_for_mix = conv_encoder.predict(imgs_for_print)
    
mean = np.mean(pred_for_mix, axis=0)
#mean = np.median(pred_for_mix, axis=0)

mean = np.expand_dims(mean, axis=0)
pred = conv_decoder.predict(mean)


In [None]:
plt.imshow(resize(pred[0], (200, 150)))
plt.show()

## Sparse Autoencoder

In [None]:
test_gen_sparse = generator(TRAIN_FOLDER, test_filenames, predict=True, batch_size=len(test_filenames), 
                            image_size=(70,50))

In [None]:
for items in test_gen_sparse:
    
    imgs_for_print = items[0]
    
pred_for_mix = sparse_l1_encoder.predict(imgs_for_print)
    
mean = np.mean(pred_for_mix, axis=0)
#mean = np.median(pred_for_mix, axis=0)

mean = np.expand_dims(mean, axis=0)
pred = sparse_l1_decoder.predict(mean)


In [None]:
plt.imshow(resize(pred[0], (200, 150)))
plt.show()

## PCA

In [None]:
test_gen_pca = generator(TRAIN_FOLDER, test_filenames, predict=True, batch_size=len(test_filenames), 
                            image_size=(200,150))

In [None]:
for items in test_gen:
    imgs = items[0]
    break

flatten_imgs = np.array([x.flatten() for x in imgs])
flatten_imgs_post = pca.transform(flatten_imgs)

In [None]:
mean = np.mean(flatten_imgs_post, axis=0)

mean = pca.inverse_transform(mean)

mean = mean.reshape(200, 150, 3)

In [None]:
plt.imshow(mean)
plt.show()