In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
tf.config.run_functions_eagerly(True)

from mtcnn import MTCNN
import cv2
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.imagenet_utils import preprocess_input

import os
import re

from keras.layers import Embedding
from keras.models import Sequential
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Dense, Input, Flatten, Dropout
from tensorflow.keras.models import Model

from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()

from nltk.stem import WordNetLemmatizer
from transformers import BertTokenizer, TFBertForSequenceClassification, TFBertModel


In [2]:
df = pd.read_csv('MELD.Raw/dev_sent_emo.csv',encoding='utf-8')
df['Utterance'] = df.Utterance.str.replace('',"'")
df = df.sample(n=5,ignore_index=True)


In [3]:
face_model = tf.keras.applications.VGG16(input_shape=(224,224,3),include_top=False,weights="imagenet") #include_top=True for predictions, False for embeddings


The following produces detector embeddings without the use of a NN model.

In [4]:
detector = MTCNN()

def preprocess_image(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (224, 224))
    img = img / 255.0 
    return img

def extract_face_embeddings(frame):
    global face_embedding
    faces = detector.detect_faces(frame)
    face_embeddings = []
    for face in faces:
        x, y, w, h = face['box']
        x1, y1 = max(x, 0), max(y, 0)
        x2, y2 = min(x + w, frame.shape[1]), min(y + h, frame.shape[0])
        cropped_face = frame[y1:y2, x1:x2]
        
        # Preprocess
        preprocessed_face = preprocess_image(cropped_face)
        preprocessed_face = np.expand_dims(preprocessed_face, axis=0)

        return preprocessed_face

        # face_embedding = face_model.predict(preprocessed_face)      # uncomment for predictions
        # face_embeddings.append(np.squeeze(face_embedding))
        
        # return face_embeddings

# Read video
folder_path = 'MELD.Raw/dev_splits_complete/'
one_face_videos = {}

for idx, row in df.iterrows():
    file_name = 'dia' + str(row['Dialogue_ID']) + '_utt' + str(row['Utterance_ID']) + '.mp4'
    video_path = folder_path + file_name
    if os.path.isfile(video_path):
        video_capture = cv2.VideoCapture(video_path)

        single_video_embeddings = []  # List to store embeddings for all frames

        frame_counter = 0

        while frame_counter < 2:
            ret, frame = video_capture.read()
            if not ret:
                break

            # check for more than 1 face
            if len(detector.detect_faces(frame)) == 1:

                # extract face embeddings from each frame
                extracted_embeddings = np.squeeze(extract_face_embeddings(frame))

                single_video_embeddings.append(extracted_embeddings)  # Append embeddings for this frame

                # bounding boxes
                for face in detector.detect_faces(frame):
                    x, y, w, h = face['box']
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

                cv2.imshow('Video', frame)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                frame_counter += 1

            else:
                continue

        video_capture.release()
        cv2.destroyAllWindows()

        if single_video_embeddings != []:            
            one_face_videos[file_name] = np.squeeze(single_video_embeddings)
        else:
            one_face_videos[file_name] = 'too many faces'






In [5]:
df['many_faces'] = one_face_videos.values()
df['wrong_shape'] = [tf.constant(value).shape for value in one_face_videos.values()]

df = df[(df.many_faces != 'too many faces') & (df.wrong_shape == (2,224,224,3))]


  result = libops.scalar_compare(x.ravel(), y, op)


In [6]:
ohe = OneHotEncoder(categories=[df.Emotion.unique()])
labels = ohe.fit_transform(df.Emotion.to_numpy().reshape(-1,1)).toarray()

The following uses detector embeddings to fine tune the VGG16 model

In [7]:
#this includes transfer learning for vgg16. Switch the top layers to something appropriate. it works tho

for layer in face_model.layers:
    layer.trainable=True

base = face_model.output

flat = Flatten(name="flatten")(base)
fc = (Dense(256, activation='relu'))(flat)
fc_two = (Dense(64, activation='relu'))(fc)
output = (Dense(len(df.Emotion.unique()), activation='softmax'))(fc_two)
vid_pred_model = Model(inputs=face_model.input,outputs=output)

vid_pred_model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])

vid_train_data = []
for video in [value for value in one_face_videos.values() if tf.constant(value).shape == (2,224,224,3)]:
    video = tf.reduce_mean(video,axis=0)
    vid_train_data.append(video)

vid_train_data = np.array(vid_train_data)

vid_pred_model.fit(vid_train_data,labels)





<keras.callbacks.History at 0x2a60b0041c0>

In [21]:
# video predictions

video_predictions = vid_pred_model.predict(vid_train_data)





In [8]:
# this is designed to extract the embeddings from the fine tuned model

video_embedding_layers = [layer for layer in vid_pred_model.layers[:-4]]

video_embedding_model_input = vid_pred_model.input
video_embedding_model_output = video_embedding_layers[-1].output
video_embedding_model = Model(video_embedding_model_input, video_embedding_model_output)

video_embedding_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [9]:

def preprocess_image(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (224, 224))
    img = img / 255.0 
    return img

def extract_face_embeddings(frame):
    global face_embedding
    faces = detector.detect_faces(frame)
    face_embeddings = []
    for face in faces:
        x, y, w, h = face['box']
        x1, y1 = max(x, 0), max(y, 0)
        x2, y2 = min(x + w, frame.shape[1]), min(y + h, frame.shape[0])
        cropped_face = frame[y1:y2, x1:x2]
        
        # Preprocess
        preprocessed_face = preprocess_image(cropped_face)
        preprocessed_face = np.expand_dims(preprocessed_face, axis=0)


        face_embedding = video_embedding_model.predict(preprocessed_face) 
        face_embeddings.append(np.squeeze(face_embedding))
        
        return face_embeddings

# Read video
folder_path = 'MELD.Raw/dev_splits_complete/'
video_embeddings = {}

for idx, row in df.iterrows():
    file_name = 'dia' + str(row['Dialogue_ID']) + '_utt' + str(row['Utterance_ID']) + '.mp4'
    video_path = folder_path + file_name
    if os.path.isfile(video_path):
        video_capture = cv2.VideoCapture(video_path)

        single_video_embeddings = []  # List to store embeddings for all frames

        frame_counter = 0

        while frame_counter < 2:
            ret, frame = video_capture.read()
            if not ret:
                break

            # check for more than 1 face
            if len(detector.detect_faces(frame)) == 1:

                # extract face embeddings from each frame
                extracted_embeddings = np.squeeze(extract_face_embeddings(frame))

                single_video_embeddings.append(extracted_embeddings)  # Append embeddings for this frame

                # bounding boxes
                for face in detector.detect_faces(frame):
                    x, y, w, h = face['box']
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

                cv2.imshow('Video', frame)

                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                frame_counter += 1

            else:
                continue

        video_capture.release()
        cv2.destroyAllWindows()

        if single_video_embeddings != []:            
            video_embeddings[file_name] = np.squeeze(single_video_embeddings)
        else:
            video_embeddings[file_name] = 'too many faces'




In [10]:
def preprocess(text):
    t = text.lower()
    t = re.sub('\d+',r'',t)
    t = re.sub(r'\W+',r' ',t)
    return t

lemmatizer = WordNetLemmatizer()



df['prepro'] = [' '.join([lemmatizer.lemmatize(preprocess(txt))])
                 .strip() for txt in df['Utterance']]

In [11]:
#THIS GETS PREDICTIONS

model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = TFBertModel.from_pretrained(model_name)

input_ids = tf.keras.Input(shape=(None,), dtype=tf.int32, name='input_ids')
token_type_ids = tf.keras.Input(shape=(None,), dtype=tf.int32, name='token_type_ids')
attention_mask = tf.keras.Input(shape=(None,), dtype=tf.int32, name='attention_mask')

bert_outputs = model(
    input_ids=input_ids,
    token_type_ids=token_type_ids,
    attention_mask=attention_mask
)

pooled_output = bert_outputs.pooler_output
dense_layer = tf.keras.layers.Dense(256, activation='relu')(pooled_output)
output_layer = tf.keras.layers.Dense(len(df.Emotion.unique()), activation='softmax')(dense_layer)


text_pred_model = tf.keras.Model(
    inputs=[input_ids, token_type_ids, attention_mask],
    outputs=output_layer
)


# Data and labels
train_texts = list(df.prepro.values)
train_labels = list(labels)

# Tokenize
train_encodings = tokenizer(train_texts, truncation=True, padding=True)

# Convert labels to tensors
train_labels = tf.convert_to_tensor(train_labels)

# Split inputs
train_inputs = {
    'input_ids': np.array(train_encodings['input_ids']),
    'token_type_ids': np.array(train_encodings['token_type_ids']),
    'attention_mask': np.array(train_encodings['attention_mask'])
}

# Create TensorFlow Dataset
train_dataset = tf.data.Dataset.from_tensor_slices((
    train_inputs,
    train_labels
)).shuffle(len(train_inputs)).batch(3,drop_remainder=True)  # Adjust batch size as needed. will return error if batch_size > len(train_inputs)

# Optimizer and loss function
optimizer = tf.keras.optimizers.Adam(learning_rate=3e-5)
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=False)

# Compiling
text_pred_model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'], run_eagerly=True)

# Fine-tuning
text_pred_model.fit(train_dataset, epochs=3, verbose=1)


Some layers from the model checkpoint at bert-base-uncased were not used when initializing TFBertModel: ['nsp___cls', 'mlm___cls']
- This IS expected if you are initializing TFBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFBertModel were initialized from the model checkpoint at bert-base-uncased.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions without further training.


Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x2a648c37b80>

In [12]:
#Prediction format

input_texts = df.Utterance.values 

input_encodings = tokenizer.batch_encode_plus(input_texts, truncation=True, padding='max_length', max_length=128, return_tensors="tf")

text_predictions = text_pred_model.predict({
    'input_ids': input_encodings['input_ids'],
    'token_type_ids': input_encodings['token_type_ids'],
    'attention_mask': input_encodings['attention_mask']
})

predicted_emotions = np.argmax(text_predictions, axis=1)

emotion_labels = df.Emotion.unique() 

predicted_emotion_labels = [emotion_labels[idx] for idx in predicted_emotions]

for text, label in zip(input_texts, predicted_emotion_labels):
    print(f"Text: {text}, Predicted emotion: {label}")





Text: Ah! Ah! I forgot my jacket!, Predicted emotion: neutral
Text: G-sharp., Predicted emotion: neutral
Text: I just wanna clarify this: are you outing Mr. Peanut?, Predicted emotion: neutral
Text: Ow!, Predicted emotion: anger
Text: Oh! I thought you guys got married in uh, January?, Predicted emotion: neutral


In [13]:
# THIS GETS EMBEDDINGS

text_embeddings_model = tf.keras.Model(inputs=text_pred_model.input, outputs=bert_outputs.last_hidden_state)

text_embeddings = text_embeddings_model.predict(train_inputs)





In [14]:
con_ten = []
video_embeddings_array = np.array([embedding for embedding in one_face_videos.values() if embedding != 'too many faces'])

for i in range(len(df)):
    con_ten.append(np.concatenate([text_embeddings.ravel(),video_embeddings_array.ravel()]))

df['concat'] = con_ten

  video_embeddings_array = np.array([embedding for embedding in one_face_videos.values() if embedding != 'too many faces'])


In [15]:
fusion_data = np.squeeze(np.array([con_ten]))
fusion_data

array([[0.03952489, 0.34696823, 0.22742915, ..., 0.30196078, 0.18431373,
        0.11764706],
       [0.03952489, 0.34696823, 0.22742915, ..., 0.30196078, 0.18431373,
        0.11764706],
       [0.03952489, 0.34696823, 0.22742915, ..., 0.30196078, 0.18431373,
        0.11764706],
       [0.03952489, 0.34696823, 0.22742915, ..., 0.30196078, 0.18431373,
        0.11764706],
       [0.03952489, 0.34696823, 0.22742915, ..., 0.30196078, 0.18431373,
        0.11764706]])

In [16]:
# Model for early fusion

fusion_model = Sequential()


fusion_model.add(Dense(256, activation='relu'))
fusion_model.add(Dropout(0.3))
fusion_model.add(Dense(128, activation='relu'))
fusion_model.add(Dropout(0.3))
fusion_model.add(Dense(64, activation='relu'))
fusion_model.add(Dropout(0.3))
fusion_model.add(Dense(len(df.Emotion.unique()), activation='softmax'))

fusion_model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])

fusion_model.fit(fusion_data,labels)





<keras.callbacks.History at 0x2a648dff100>

In [40]:
fusion_model.predict(fusion_data)





array([[1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.]], dtype=float32)

In [35]:
# Late fusion

def final_preds(text_model_predictions, visual_model_predictions, df):
    final_predictions = []

    for text_pred, vis_pred in zip(text_model_predictions, visual_model_predictions):
        text_max = np.argmax(text_pred)
        vis_max = np.argmax(vis_pred)

        # Equal predictions
        if text_max == vis_max:
            final_prediction = df.Emotion.unique()[text_max]
            print('Equal predictions')

        # Different predictions, lean towards higher confidence
        else:
            final_prediction_value = np.maximum(a := np.round(text_pred[text_max], 8), 
                                                b := np.round(vis_pred[vis_max], 8))

            try:
                print(f'Text: {a}, Visual: {b}')
                final_prediction = df.Emotion.unique()[np.where(np.round(text_pred, 8) == final_prediction_value)[0][0]]
                print('Text wins')
            except IndexError:
                final_prediction = df.Emotion.unique()[np.where(np.round(vis_pred, 8) == final_prediction_value)[0][0]]
                print('Visual wins')

        final_predictions.append(final_prediction)

    return final_predictions


In [39]:
final_preds(text_predictions, video_predictions, df)

Equal predictions
Equal predictions
Equal predictions
Text: 0.37251195311546326, Visual: 0.6713597774505615
Visual wins
Equal predictions


['neutral', 'neutral', 'neutral', 'neutral', 'neutral']