# Data cleaning of OuluCasia

## Clean badly paired images - rate their pair quality

In [None]:
import os
from PIL import Image
import json
import signal
import matplotlib.pyplot as plt

root_dir1 = 'data/B_OriginalImg/NI/Strong/'
root_dir2 = 'data/B_OriginalImg/VL/Strong/'

# Initialize a dictionary to store the ratings and directories
ratings = {}
# Initialize a list to store the unpaired images
unpaired_images = []

# Initialize a variable to store the previous pair
prev_pair = None

rate_em = None

# Loop over patient directories
for i, patient_dir in enumerate(sorted(os.listdir(root_dir1))):
    patient_path1 = os.path.join(root_dir1, patient_dir)
    patient_path2 = os.path.join(root_dir2, patient_dir)
    
    if i < 50:
        continue
    
    # Loop over emotion directories
    for emotion_dir in sorted(os.listdir(patient_path1)):
        emotion_path1 = os.path.join(patient_path1, emotion_dir)
        emotion_path2 = os.path.join(patient_path2, emotion_dir)
        
        # Loop over images
        for image_file in sorted(os.listdir(emotion_path1)):
            image_path1 = os.path.join(emotion_path1, image_file)
            image_path2 = os.path.join(emotion_path2, image_file)
            
            key = f"{patient_dir}-{emotion_dir}-{image_file}"
            
            if rate_em is not None:
                ratings[key] = (rate_em, image_path1, image_path2)
                continue
            
            # Check if the image has a pair in the second directory
            if os.path.exists(image_path2):
                # Open the image from the first directory
                image1 = Image.open(image_path1)
                
                # Open the image from the second directory
                image2 = Image.open(image_path2)
                
                while True:
                    # Create a subplot with 1 row and 2 columns
                    fig, axs = plt.subplots(1, 2, figsize=(10, 5))
                    
                    # Display the image from the first directory
                    axs[0].imshow(image1)
                    axs[0].set_title(f'First Directory - {patient_dir} - {emotion_dir} - {image_file}')
                    
                    # Display the image from the second directory
                    axs[1].imshow(image2)
                    axs[1].set_title(f'Second Directory - {patient_dir} - {emotion_dir} - {image_file}')
                    
                    # Show the plot
                    plt.show()
                    
                    # Get the user's rating
                    rating = input("Rate the quality of pairing from 1 (great) to 5 (worst). Press 'x' to continue to the next pair of images. Press 'r' to go back to the previous pair.")
                    
                    if rating in ['1', '2', '3', '4', '5']:
                        # Store the rating and directories
                        ratings[key] = (rating, image_path1, image_path2)
                        prev_pair = key
                        break
                    elif rating == 'd':
                        for im in sorted(os.listdir(emotion_path1)):
                            im1 = os.path.join(emotion_path1, im)
                            im2 = os.path.join(emotion_path2, im)
                            
                            if im[-4:] != 'jpeg' or not os.path.exists(im2):
                                continue
                            

                            
                            # Open the image from the first directory
                            img1 = Image.open(im1)

                            # Open the image from the second directory
                            img2 = Image.open(im2)
                            
                            # Create a subplot with 1 row and 2 columns
                            fig, axs = plt.subplots(1, 2, figsize=(10, 5))
                            
                            # Display the image from the first directory
                            axs[0].imshow(img1)
                            axs[0].set_title(f'First Directory - {patient_dir} - {emotion_dir} - {im}')

                            # Display the image from the second directory
                            axs[1].imshow(img2)
                            axs[1].set_title(f'Second Directory - {patient_dir} - {emotion_dir} - {im}')

                            # Show the plot
                            plt.show()
                    elif rating in ['e1', 'e2','e3', 'e4', 'e5']:
                        ratings[key] = (rating[1], image_path1, image_path2)
                        prev_pair = key
                        rate_em = rating[1]
                        break
                    elif rating == 'r' and prev_pair is not None:
                        # Go back to the previous pair
                        patient_dir, emotion_dir, image_file = prev_pair.split('-')
                        image_path1 = ratings[prev_pair][1]
                        image_path2 = ratings[prev_pair][2]
                        image1 = Image.open(image_path1)
                        image2 = Image.open(image_path2)
                    else:
                        print("Invalid input. Please enter a number from 1 to 5, 'x' to continue, or 'r' to go back.")
            else:
                # Log the unpaired image
                unpaired_images.append(image_path1)
        
        rate_em = None
        
    # Save the ratings and unpaired images to a JSON file
    with open('ratings_pt5.json', 'w') as f:
        json.dump({'ratings': ratings, 'unpaired_images': unpaired_images}, f)

# Print a summary of the ratings
print("Summary of ratings:")
summary = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
for rating, _ in ratings.values():
    summary[int(rating)] += 1
for rating, count in summary.items():
    print(f"Number of pairs with rating {rating}: {count}")

print("Unpaired images:")
for image in unpaired_images:
    print(image)

## Extract faces from cleaned data

In [None]:
from PIL import Image
from deepface import DeepFace

with open('ratings_merged_first_place.json') as f:
    rat_dict = json.load(f)

for rating, a, b in list(ratings.values()):
    if a[-2:] == 'db':
        continue
    if int(rating) == 1:
        im_a = cv2.imread(a)
        im_b = cv2.imread(b)
        
        print(a)
        print(b)
        if im_a is None or im_b is None:
            continue
        
        im_b = cv2.cvtColor(im_b, cv2.COLOR_BGR2RGB)
        im_ext_b = DeepFace.extract_faces(im_b, (224, 224), detector_backend='retinaface', align=True, enforce_detection=True)
        im_ext_a = DeepFace.extract_faces(im_a, (224, 224), detector_backend='retinaface', align=True, enforce_detection=True)
        im_ext_b = (255 * im_ext_b[0]['face']).astype(np.uint8)
        im_ext_a = (255 * im_ext_a[0]['face']).astype(np.uint8)
        
        cv2.imwrite('data/oulucasia-ranked-aligned/trainA/' + '-'.join(a.split('/')[-3:]), im_ext_a)
        cv2.imwrite('data/oulucasia-ranked-aligned/trainB/' + '-'.join(a.split('/')[-3:]), im_ext_b)

In [None]:
import cv2
import os
from PIL import Image
import numpy as np

a_pth = 'data/oulucasia-ranked-aligned/trainA/'
b_pth = 'data/oulucasia-ranked-aligned/trainB/'

for imA, imB in zip(sorted(os.listdir(a_pth)), sorted(os.listdir(b_pth))):
    imgA = cv2.imread(a_pth + imA)
    imgB = cv2.cvtColor(cv2.imread(b_pth + imB), cv2.COLOR_BGR2RGB)
    
    display(Image.fromarray(np.concatenate([imgA, imgB], axis=1)))

## Splitting cleaned extracted faces to train, test, val

In [None]:
splits

In [None]:
rat_dict

In [None]:
import random
import json
from pathlib import Path
import shutil

# load images
with open('ratings_merged_first_place.json') as f:
    rat_dict = json.load(f)


# create a dict by patients, emotions
im_dict = {}
for im_name in rat_dict['ratings']:
    # skip nonimages andimages with rating less than 1
    if im_name[-5:] != '.jpeg' or int(rat_dict['ratings'][im_name][0]) > 1:
        continue
    
    patient = im_name.split('-')[0]
    emotion = im_name.split('-')[1]
    img_id = im_name.split('-')[2]
    
    if patient not in list(im_dict.keys()):
        im_dict[patient] = {}
    if emotion not in list(im_dict[patient].keys()):
        im_dict[patient][emotion] = {}
    im_dict[patient][emotion][img_id] = rat_dict['ratings'][im_name]
    
# split to train, test, val
train_imgs_ratio = 0.65
test_imgs_ratio = 0.2
val_imgs_ratio = 0.15
splits = {"train_vl": [],"train_ni": [], 'test_vl': [], "test_ni": [], "val_vl": [], "val_ni": []}
for pt in im_dict.keys():
    for em in im_dict[pt].keys():
        imgs = im_dict[pt][em].keys()
        
        # get toal number of images for this folder
        train_imgs = int(train_imgs_ratio * len(imgs))
        test_imgs = int(test_imgs_ratio * len(imgs))
        val_imgs = int(val_imgs_ratio * len(imgs))

        # sample images
        _train = random.sample(list(im_dict[pt][em].keys()), train_imgs)
        splits['train_vl'] += [im_dict[pt][em][i][1] for i in _train]
        splits['train_ni'] += [im_dict[pt][em][i][2] for i in _train]
        _test = random.sample(list(set(im_dict[pt][em].keys()) - set(_train)), test_imgs)
        splits['test_vl'] += [im_dict[pt][em][i][1] for i in _test]
        splits['test_ni'] += [im_dict[pt][em][i][2] for i in _test]
        _val = random.sample(list(set(im_dict[pt][em].keys()) - set(_train) - set(_test)), val_imgs)
        splits['val_vl'] += [im_dict[pt][em][i][1] for i in _val]
        splits['val_ni'] += [im_dict[pt][em][i][2] for i in _val]

# save images to the folder
target_folder = Path("data/X-oulucasia-ranked-aligned-splitted/")
source_fp = "data/oulucasia-ranked-aligned"
for t, t2 in zip(splits['train_vl'], splits['train_ni']):
   
    if not Path(t).exists() or not Path(t2).exists():
        continue
    new_fp = target_folder / "A" / "train" / "-".join(Path(t).parts[-3:])
    print("train vl", t)
    # Create all the possible directories and copy the file
    new_fp.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy(Path(source_fp) / "trainA" / "-".join(Path(t).parts[-3:]), new_fp)

    new_fp = target_folder / "B" / "train" / "-".join(Path(t2).parts[-3:])
    print("train ni", t2)
    # Create all the possible directories and copy the file
    new_fp.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy(Path(source_fp) / "trainB" / "-".join(Path(t2).parts[-3:]), new_fp)
for t, t2 in zip(splits['test_vl'], splits['test_ni']):
    if not Path(t).exists() or not Path(t2).exists():
        continue
    new_fp = target_folder / "A" / "test" / "-".join(Path(t).parts[-3:])
    print('test vl',t)
    # Create all the possible directories and copy the file
    new_fp.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy(Path(source_fp) / "trainA" / "-".join(Path(t).parts[-3:]), new_fp)
    
    new_fp = target_folder / "B" / "test" / "-".join(Path(t2).parts[-3:])
    print('test ni',t2)
    # Create all the possible directories and copy the file
    new_fp.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy(Path(source_fp) / "trainB" / "-".join(Path(t2).parts[-3:]), new_fp)
for t, t2 in zip(splits['val_vl'], splits['val_ni']):
    if not Path(t).exists() or not Path(t2).exists():
        continue
    new_fp = target_folder / "A" / "val" /"-".join(Path(t).parts[-3:])
    print('val vl', t)
    # Create all the possible directories and copy the file
    new_fp.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy(Path(source_fp) / "trainA" / "-".join(Path(t).parts[-3:]), new_fp)

    new_fp = target_folder / "B" / "val" / "-".join(Path(t2).parts[-3:])
    print('val ni' , t2)
    # Create all the possible directories and copy the file
    new_fp.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy(Path(source_fp) / "trainB" / "-".join(Path(t2).parts[-3:]), new_fp)

## _Exploring various possibliities of aligning faces_

In [None]:
import cv2
import json
from retinaface import RetinaFace
import matplotlib.pyplot as plt
from IPython.display import display
from PIL import Image
import numpy as np
from matplotlib import cm
import dlib
from deepface import DeepFace

predictor_path = "notebooks/shape_predictor_81_face_landmarks.dat"
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(predictor_path)

In [None]:
def print_crosses_on_image(image, points, color=(255, 0, 0), size=3, thickness=1):
    for point in points:
        x = int(point[0])
        y = int(point[1])
        
        image = cv2.line(image, (x - size, y), (x + size, y), color, thickness)
        image = cv2.line(image, (x, y - size), (x, y + size), color, thickness)
    
    return image

In [None]:
import cv2
import json
import numpy as np
from PIL import Image
from deepface import DeepFace

with open('ratings_merged_first_place.json') as f:
    rat_dict = json.load(f)
print("Summary of ratings:")
ratings = rat_dict['ratings']
summary = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
for k, (rating, a, b) in enumerate(list(ratings.values())):
    if k % 20 != 0:
        continue
    
    if a[-2:] == 'db':
        continue
    
    summary[int(rating)] += 1
    if int(rating) == 1:
        im_a = cv2.imread(a)
        im_b = cv2.imread(b)
        
        print(a)
        print(b)
        if im_a is None or im_b is None:
            continue
        
        im_b = cv2.cvtColor(im_b, cv2.COLOR_BGR2RGB)
        im_ext_b = DeepFace.extract_faces(im_b, (224, 224), detector_backend='retinaface', align=False, enforce_detection=True)
        im_ext_a = DeepFace.extract_faces(im_a, (224, 224), detector_backend='retinaface', align=False, enforce_detection=True)
        im_ext_b = cv2.cvtColor((255 * im_ext_b[0]['face']).astype(np.uint8), cv2.COLOR_BGR2RGB)
        im_ext_a = (255 * im_ext_a[0]['face']).astype(np.uint8)
        
#         display(Image.fromarray(np.concatenate([im_a, im_b], axis=1 )))
#         display(Image.fromarray(np.concatenate([im_ext_a, im_ext_b], axis=1 )))
        
        # --------------
#         print((im_ext_b.shape[1], im_ext_b.shape[0]))
        shape = predictor(im_ext_b, dlib.rectangle(0,0,im_ext_b.shape[1]-1, im_ext_b.shape[0]-1))#[(0,0), (im_ext_b.shape[1]-1, im_ext_b.shape[0]-1)])
        dl_ni_landmarks = np.array([[shape.part(p).x, shape.part(p).y] for p in range(shape.num_parts)]).astype(int)
#         dl_ni_landmarks = np.array([[shape.part(p).x, shape.part(p).y] for p in range(27,48)]).astype(int)
        shape = predictor(im_ext_a, dlib.rectangle(0,0,im_ext_a.shape[1], im_ext_a.shape[0]))# [(0,0), (im_ext_a.shape[1], im_ext_a.shape[0])])
        dl_vl_landmarks = np.array([[shape.part(p).x, shape.part(p).y] for p in range(shape.num_parts)]).astype(int)
#         dl_vl_landmarks = np.array([[shape.part(p).x, shape.part(p).y] for p in range(27,48)]).astype(int)
    
        H, mask = cv2.findHomography(dl_ni_landmarks, dl_vl_landmarks)
        dl_ni_img_new = cv2.warpPerspective(im_ext_b, H, (im_ext_b.shape[1],im_ext_b.shape[1]), borderValue=[0,0,0])
        
        img1 = np.copy(im_ext_a)
        img2 = np.copy(dl_ni_img_new)
        alpha = 0.5
        dst = cv2.addWeighted(img1, alpha, img2, 1 - alpha, 0)
        
        display(Image.fromarray(np.concatenate([print_crosses_on_image(np.copy(dl_ni_img_new),dl_vl_landmarks),dl_ni_img_new,dst], axis=1)))
        # --------------
        
        img1 = np.copy(im_ext_b)
        img2 = np.copy(im_ext_a)

#         gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
#         ret, mask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)

        # Combine the two images with transparency
        alpha = 0.7
        dst = cv2.addWeighted(img1, alpha, img2, 1 - alpha, 0)

#         display(Image.fromarray(dst))
        display(Image.fromarray(np.concatenate([im_ext_a, im_ext_b, dst], axis=1 )))
        
        
for rating, count in summary.items():
    print(f"Number of pairs with rating {rating}: {count}")

In [None]:
import cv2

# Load the two images
img1 = cv2.imread('image1.jpg')
img2 = cv2.imread('image2.jpg')

# Compute the difference between the two images
difference = cv2.subtract(img1, img2)

# Create a mask by thresholding the difference image
gray = cv2.cvtColor(difference, cv2.COLOR_BGR2GRAY)
ret, mask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)

# Highlight the differences between the two images
img1[mask != 0] = [0, 0, 255]
img2[mask != 0] = [0, 0, 255]

# Combine the two images vertically
combined = cv2.vconcat([img1, img2])

# Display the result
cv2.imshow('Comparison', combined)
cv2.waitKey(0)
cv2.destroyAllWindows()