In [None]:
import glob
import numpy as np
from numpy import linalg as LA
import pandas as pd
from scipy import stats
import os
import tqdm
import matplotlib.pyplot as plt
from PIL import Image
from imageio.v2 import imread
import pingouin as pg
import cleese_stim as cleese
from cleese_stim.engines import FaceWarp
import cv2
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler

In [None]:
# def significant_landmarks(data):
#     test = data.groupby('idx').apply(lambda x: pd.Series({
#         'defX_ttest': pg.ttest(x[x.response==True].defX, x[x.response==False].defX)['p-val'].iloc[0], 
#         'defY_ttest': pg.ttest(x[x.response==True].defY, x[x.response==False].defY)['p-val'].iloc[0]
#     }), include_groups=False)
#     sig = test.loc[(test.defX_ttest < 0.05) | (test.defY_ttest < 0.05)].reset_index()
    
#     return data.loc[data.idx.isin(sig.idx.to_list())]
    

def extract_single_kernel(data_df, feature_id = 'feature', value_id = 'value', response_id = 'response'):
    feature_average = data_df.groupby([feature_id,response_id])[value_id].mean().reset_index()
    positives = feature_average.loc[feature_average[response_id] == True].reset_index()
    negatives = feature_average.loc[feature_average[response_id] == False].reset_index()
    kernels = pd.merge(positives, negatives, on=feature_id, suffixes=('_true','_false'))
    kernels['kernel_value'] = kernels['%s_true'%value_id] - kernels['%s_false'%value_id]
    kernels = kernels[[feature_id,'kernel_value']].set_index(feature_id)
    kernels.index.names = ['feature']
    
    return kernels


def normalize_kernel(kernel):
    rms = np.sqrt((kernel.kernel_value**2).mean())
    normalized_kernel = kernel.copy()
    normalized_kernel['kernel_value'] = normalized_kernel['kernel_value'].apply(lambda x: x/rms)
        
    return normalized_kernel

In [None]:
def compute_kernel(data, normalize=True):
    kernel_x = extract_single_kernel(
        data,
        feature_id = 'idx',
        value_id = 'defX',
        response_id = 'response'
    )   
    kernel_y = extract_single_kernel(
        data,
        feature_id = 'idx',
        value_id = 'defY',
        response_id = 'response'
    )

    if normalize:
        kernel_x = normalize_kernel(kernel_x)
        kernel_y = normalize_kernel(kernel_y)

    kernel = pd.DataFrame({
        'index': kernel_x.index,
        'posX': data.posX[:len(kernel_x.index)],
        'posY': data.posY[:len(kernel_x.index)],
        'defX': kernel_x.kernel_value.values,
        'defY': kernel_y.kernel_value.values
    }).reset_index(drop=True)
    
    return kernel      


def save_dfmxy(filename, kernel):
    # formatting for saving as dfmxy
    txt_rows = []
    for el in kernel.to_string(index=False, index_names=False).split('\n'):
        txt_rows.append(",".join(el.split()))
    
    # write to text file
    with open(filename, 'a') as f:
        for row in txt_rows:
            f.write(row+"\n")

In [None]:
def get_transformed_img(data1, data2, ntrials, au, scale):
    kernel1 = compute_kernel(data1, normalize=True)
    kernel2 = compute_kernel(data2, normalize=True)
    # rescale
    kernel1.defX = scale * kernel1.defX
    kernel1.defY = scale * kernel1.defY
    # norm_defs_1 = MinMaxScaler().fit_transform(kernel1.iloc[:, [3, 4]].to_numpy())
    # kernel1['defX'] = norm_defs_1[:, 0]
    # kernel1['defY'] = norm_defs_1[:, 1]
    kernel1.set_index('index').to_csv(f'./outputs/2000/AM04NES_{au}.dfmxy', index_label = 'index', header=None)
    
    kernel2.defX = scale * kernel2.defX
    kernel2.defY = scale * kernel2.defY
    # norm_defs_2 = MinMaxScaler().fit_transform(kernel2.iloc[:, [3, 4]].to_numpy())
    # kernel2['defX'] = norm_defs_2[:, 0]
    # kernel2['defY'] = norm_defs_2[:, 1]
    kernel2.set_index('index').to_csv(f'./outputs/{ntrials}/AM04NES_{au}.dfmxy', index_label = 'index', header=None)

    dot_products = []
    for i in range(len(kernel1)):
        dot_products.append(
            (np.dot([kernel1.iloc[i].defX, kernel1.iloc[i].defY], [kernel2.iloc[i].defX, kernel2.iloc[i].defY]) + 1) / 2
        )
        
    return np.mean(dot_products)

In [None]:
au_list = ['AU12'] 
           #, 'AU14', 'AU15', 'AU17', 'AU23', 'AU24', 'AU25', 'AU26', 'AU28', 'AU43']
ideal = pd.read_csv('./outputs/2000/df_M.csv')

ntrials = []
aus = []
sim = []
for n in tqdm.tqdm(np.arange(100, 1100, step=100)):
    d = pd.read_csv(f'./outputs/{n}/df_M.csv')
    for au in au_list:
        # ideal_data = significant_landmarks(data=ideal.loc[ideal.au == au])
        # au_data = significant_landmarks(data=d.loc[d.au == au])
        # au_data = d.loc[(d.au==au) & (d.idx.isin(ideal_data.idx.to_list()))]
        ideal_data = ideal.loc[ideal.au==au]
        au_data = d.loc[d.au==au]
        dot_prod = get_transformed_img(ideal_data, au_data, n, au, scale=1)
        
        ntrials.append(n)
        aus.append(au)
        sim.append(dot_prod)

In [None]:
df = pd.DataFrame({'ntrials': ntrials, 'au': aus, 'diff': sim})
df

In [None]:
sns.lineplot(data=df, y='diff', x='ntrials')
plt.ylim(0, 1)

In [None]:
def show_transform(dfmxy_file, kernel, img_file, config_file, scale):
    kernel = kernel.copy()
    
    # rescale
    kernel.defX = scale * kernel.defX
    kernel.defY = scale * kernel.defY
    kernel.set_index('index').to_csv(dfmxy_file, index_label = 'index', header=None)
    
    dfmxy = FaceWarp.load_dfmxy(dfmxy_file)
    transformed = cleese.process_file(
                                        FaceWarp,
                                        img_file,
                                        config_file,
                                        dfmxy=dfmxy
                                    )
    original_img = Image.open(img_file)
    transformed_img = Image.fromarray(transformed)
    # diff_img = Image.fromarray(np.asarray(original_img) - transformed)
    transformed_img.save(dfmxy_file.replace('dfmxy', 'jpg'))
    
    fig, axs = plt.subplots(ncols=2)
    axs[0].imshow(original_img)
    axs[1].imshow(transformed_img, zorder=0)
    # axs[2].imshow(diff_img, zorder=0)
    axs[0].axis('off')
    axs[1].axis('off')
    # axs[2].axis('off')

kernel = pd.read_csv('./outputs/500/AM04NES_AU12.dfmxy', header=None)
kernel.columns = ['index', 'posX', 'posY', 'defX', 'defY']
show_transform('./outputs/500/test.dfmxy', kernel, './images/AM04NES.jpg', './configs/mediapipe.toml', 5)