In [None]:
import glob
from itertools import chain
import os
import random
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from tqdm.notebook import tqdm

print(f"Torch: {torch.__version__}")
device = 'cuda'
use_cuda = torch.cuda.is_available()
print(use_cuda)

In [None]:
PATH='enet_b0_8_best_vgaf.pt'
IMG_SIZE=224
    
test_transforms = transforms.Compose(
    [
        transforms.Resize((IMG_SIZE,IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    ]
)

print(PATH)
feature_extractor_model = torch.load('../face-emotion-recognition/models/affectnet_emotions/'+PATH)

In [None]:
if True:
    classifier_weights=feature_extractor_model.classifier[0].weight.cpu().data.numpy()
    classifier_bias=feature_extractor_model.classifier[0].bias.cpu().data.numpy()
else:
    classifier_weights=feature_extractor_model.classifier.weight.cpu().data.numpy()
    classifier_bias=feature_extractor_model.classifier.bias.cpu().data.numpy()
print(classifier_weights.shape,classifier_weights)
print(classifier_bias.shape,classifier_bias)

In [None]:
feature_extractor_model.classifier=torch.nn.Identity()
feature_extractor_model=feature_extractor_model.to(device)
feature_extractor_model.eval()

In [None]:
def get_probab(features, logits=True):
    x=np.dot(features,np.transpose(classifier_weights))+classifier_bias
    if logits:
        return x
    e_x = np.exp(x - np.max(x,axis=0))
    return e_x / e_x.sum(axis=1)[:,None]

In [None]:
DATA_DIR = '../../../Data/ABAW4/'

In [None]:
print(test_transforms)
data_dir=os.path.join(DATA_DIR,'cropped_aligned')
#data_dir=os.path.join(DATA_DIR,'cropped_aligned')
print(data_dir)
img_names=[]
X_global_features=[]
imgs=[]
for filename in tqdm(os.listdir(data_dir)):
    frames_dir=os.path.join(data_dir,filename)    
    for img_name in os.listdir(frames_dir):
        if img_name.lower().endswith('.jpg'):
            img = Image.open(os.path.join(frames_dir,img_name))
            img_tensor = test_transforms(img)
            if img.size:
                img_names.append(filename+'/'+img_name)
                imgs.append(img_tensor)
                if len(imgs)>=96: #48: #64: #32:        
                    features = feature_extractor_model(torch.stack(imgs, dim=0).to(device))
                    features=features.data.cpu().numpy()
                    
                    if len(X_global_features)==0:
                        X_global_features=features
                    else:
                        X_global_features=np.concatenate((X_global_features,features),axis=0)
                    imgs=[]

if len(imgs)>0:        
    features = feature_extractor_model(torch.stack(imgs, dim=0).to(device))
    features=features.data.cpu().numpy()

    if len(X_global_features)==0:
        X_global_features=features
    else:
        X_global_features=np.concatenate((X_global_features,features),axis=0)

    imgs=[]

In [None]:
X_scores=get_probab(X_global_features)

In [None]:
filename2featuresAll={img_name:(global_features,scores) for img_name,global_features,scores in zip(img_names,X_global_features,X_scores)}
print(len(filename2featuresAll))

In [None]:
def get_image2all(filename):
    with open(os.path.join(DATA_DIR, filename)) as f:
        mtl_lines = f.read().splitlines()
    num_missed=0
    X,y_va,y_expr,y_aus=[],[],[],[]
    masks_va,masks_expr,masks_aus=[],[],[]
    for line in mtl_lines[1:]:
        splitted_line=line.split(',')
        imagename=splitted_line[0]
        valence=float(splitted_line[1])
        arousal=float(splitted_line[2])
        expression=int(splitted_line[3])
        aus=list(map(int,splitted_line[4:]))
        
        mask_VA=(valence>-5 and arousal>-5)
        if not mask_VA:
            valence=arousal=0
            
        mask_expr=(expression>-1)
        if not mask_expr:
            expression=0
            
        mask_aus=min(aus)>=0
        if not mask_aus:
            aus=[0]*len(aus)
        if mask_VA or mask_expr or mask_aus:
            if imagename in filename2featuresAll:
                #X.append(filename2featuresAll[imagename][0])
                X.append(np.concatenate((filename2featuresAll[imagename][0],filename2featuresAll[imagename][1])))
                y_va.append((valence,arousal))
                masks_va.append(mask_VA)
                
                y_expr.append(expression)
                masks_expr.append(mask_expr)
                
                y_aus.append(aus)
                masks_aus.append(mask_aus)
            else:
                num_missed+=1
    X=np.array(X)
    y_va=np.array(y_va)
    y_expr=np.array(y_expr)
    y_aus=np.array(y_aus)
    masks_va=np.array(masks_va).astype(np.float32)
    masks_expr=np.array(masks_expr).astype(np.float32)
    masks_aus=np.array(masks_aus).astype(np.float32)
    print(X.shape,y_va.shape,y_expr.shape,y_aus.shape,masks_va.shape,num_missed)
    return X,y_va,y_expr,y_aus,masks_va,masks_expr,masks_aus

X_train,y_va_train,y_expr_train,y_aus_train,masks_va_train,masks_expr_train,masks_aus_train=get_image2all('training_set_annotations.txt')
X_val,y_va_val,y_expr_val,y_aus_val,masks_va_val,masks_expr_val,masks_aus_val=get_image2all('validation_set_annotations.txt')
TRAIN_VAL=False

In [None]:
(unique, counts) = np.unique(y_expr_train[masks_expr_train==1.].astype(int), return_counts=True)
num_classes=len(unique)
emo_cw=1/counts
emo_cw/=emo_cw.min()
emo_class_weights = {i:cwi for i,cwi in zip(unique,emo_cw)}
print(counts, emo_class_weights, num_classes, unique)
print(emo_cw)

In [None]:
num_labels=y_aus_train.shape[1]
print(num_labels)
from sklearn.utils.class_weight import compute_class_weight
aus_class_weights = np.empty([num_labels, 2])
for i in range(num_labels):
    neg, pos = np.bincount(y_aus_train[masks_aus_train==1., i])
    total = neg + pos
    weight_for_0 = (1 / neg) * (total / 2.0)
    weight_for_1 = (1 / pos) * (total / 2.0)

    aus_class_weights[i][0]=weight_for_0
    aus_class_weights[i][1]=weight_for_1
print(aus_class_weights)

In [None]:
def loss_va(y_true, y_pred):
    res=1-0.5*(CCC(y_true[:,0], y_pred[:,0])+CCC(y_true[:,1], y_pred[:,1]))
    return res

In [None]:
class WeightedExprSCCE(tf.keras.losses.Loss):
    def __init__(self, class_weight, from_logits=False, name='expr_scce'):
        if class_weight is None or all(v == 1. for v in class_weight):
            self.class_weight = None
        else:
            self.class_weight = tf.convert_to_tensor(class_weight,
                dtype=tf.float32)
        self.reduction = tf.keras.losses.Reduction.NONE
        self.unreduced_scce = tf.keras.losses.SparseCategoricalCrossentropy(
            from_logits=from_logits, name=name,
            reduction=self.reduction)

    def __call__(self, y_true, y_pred, sample_weight=None):
        loss = self.unreduced_scce(y_true, y_pred, sample_weight)
        if self.class_weight is not None:
            weight_mask = tf.gather(self.class_weight, y_true)
            loss = tf.math.multiply(loss, weight_mask)
        loss=K.mean(loss)
        return loss
loss_expr=WeightedExprSCCE(emo_cw)

In [None]:
def get_weighted_loss_aus(weights):
    def weighted_loss(y_true, y_pred):
        y_true=tf.cast(y_true, tf.float32)
        ce=K.binary_crossentropy(y_true, y_pred)
        res=K.mean((weights[:,0]**(1-y_true))*(weights[:,1]**(y_true))*ce)
        return res
    return weighted_loss
loss_aus=get_weighted_loss_aus(aus_class_weights)

In [None]:
metrics=[tf.keras.metrics.AUC(multi_label=True,name='auc'), tf.keras.metrics.BinaryAccuracy(),tf.keras.metrics.Recall(),tf.keras.metrics.Precision()] # 

In [None]:
batch_size=256 #128
img = tf.keras.Input(shape=X_train.shape[1:])
mask1 = tf.keras.Input(shape=(1,))
mask2 = tf.keras.Input(shape=(1,))
mask3 = tf.keras.Input(shape=(1,))
x=img
#x=Dense(128, activation='relu',kernel_regularizer=tf.keras.regularizers.l2(1.0/batch_size))(x)
va_out = Dense(2, activation='tanh',kernel_regularizer=tf.keras.regularizers.l2(1.0/batch_size))(x)
va_out_masked=tf.keras.layers.Multiply(name='va_out')([va_out,mask1])

#x=va_out

expr_out=Dense(8, activation='softmax',kernel_regularizer=tf.keras.regularizers.l2(1.0/batch_size))(x)
expr_out_masked=tf.keras.layers.Multiply(name='expr_out')([expr_out,mask2])
aus_out=Dense(12, activation='sigmoid',kernel_regularizer=tf.keras.regularizers.l2(1.0/batch_size))(x)
aus_out_masked=tf.keras.layers.Multiply(name='aus_out')([aus_out,mask3])

mtlModel=tf.keras.Model(inputs=[img,mask1,mask2,mask3], outputs=[va_out_masked, expr_out_masked,aus_out_masked])

In [None]:
mtlModel.compile(optimizer=Adam(lr=1e-3), loss=[loss_va,loss_expr,loss_aus], metrics=[["mean_absolute_error"], ["accuracy"],metrics])
mtlModel.summary()

save_best_model = SaveBestModel('val_loss',False)
mtlModel.fit([X_train,masks_va_train,masks_expr_train,masks_aus_train],
             [y_va_train,y_expr_train,y_aus_train], batch_size=batch_size, epochs=(1 if TRAIN_VAL else 20), 
             verbose=1, callbacks=[save_best_model], 
             validation_data=([X_val,masks_va_val,masks_expr_val,masks_aus_val],[y_va_val,y_expr_val,y_aus_val]))
best_model_weights = save_best_model.best_model_weights
print(save_best_model.best)

In [None]:
def print_all():
    y_pred_va,y_pred_expr,y_pred_aus=mtlModel.predict([X_val,masks_va_val,masks_expr_val,masks_aus_val])
    print('\nAV')
    gt_V=y_va_val[masks_va_val==1,0]
    gt_A=y_va_val[masks_va_val==1,1]
    pred_V=y_pred_va[masks_va_val==1,0]
    pred_A=y_pred_va[masks_va_val==1,1]
    ccc_V,ccc_A,ccc_VA=metric_for_VA(gt_V,gt_A,pred_V,pred_A)
    print(gt_V.shape,ccc_V,ccc_A,ccc_VA)
    
    print('\nExpression')
    print(y_expr_val[masks_expr_val==1].shape)
    y_pred=np.argmax(y_pred_expr,axis=1)
    print((y_pred==y_expr_val)[masks_expr_val==1].mean())
    f1_expr=f1_score(y_true=y_expr_val[masks_expr_val==1],y_pred=y_pred[masks_expr_val==1], average="macro")
    print(f1_expr)
    print(metric_for_Exp(y_expr_val[masks_expr_val==1],y_pred[masks_expr_val==1]))
    
    print('\nAUs')
    new_pred = ((y_pred_aus >= 0.5) * 1)
    print(new_pred[masks_aus_val==1,:].shape)
    f1_au=np.mean([f1_score(y_true=y_aus_val[masks_aus_val==1,i],y_pred=new_pred[masks_aus_val==1,i]) for i in range(y_pred_aus.shape[1])])
    print(f1_au)
    print(f1_score_max(y_aus_val[masks_aus_val==1,:],y_pred_aus[masks_aus_val==1,:],thresh=np.arange(0.1,1,0.1)))
    
    total=ccc_VA+f1_expr+f1_au
    print('\nTotal',ccc_VA,f1_expr,f1_au,total)

print_all()