In [38]:
import numpy as np
import os

In [39]:
os.environ["CUDA_VISIBLE_DEVICES"]="-1"

In [40]:
import torch
torch.cuda.is_available()

False

In [42]:
from tensorflow.keras.layers import Input, Dense, Dropout, Flatten, concatenate
from tensorflow.keras.models import Model

In [43]:
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import log_loss
from sklearn.metrics import f1_score
from sklearn.metrics import precision_recall_fscore_support
from keras import backend as K

In [44]:
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers

IMAGEBIND EMBEDDINGS WITH EMOTION AND SENTIMENT SCORES

In [51]:
num_classes = 3

In [57]:
# Define input shapes for each modality
image_input_shape = (1024,)
text_input_shape = (1024,)
audio_input_shape = (1024,)
emotion_input_shape = (10,)
sentiment_input_shape = (3,)
caption_input_shape = (1024,)
cross_input_shape = (256,)

In [58]:
def create_image_encoder():
    image_input = tf.keras.layers.Input(shape=image_input_shape, name="image_input")
    
    l = tf.keras.layers.Dense(1024, activation='relu')(image_input)
    l = tf.keras.layers.Dropout(0.2)(l)
    l = tf.keras.layers.Dense(512, activation='relu')(l)
    l = tf.keras.layers.Dropout(0.2)(l)
    l = tf.keras.layers.Dense(256, activation='relu')(l)
    l = tf.keras.layers.Dropout(0.2)(l)
    outputs = l
    model = keras.Model(inputs=image_input, outputs=outputs)
    return model

In [59]:
# Text Encoder
def create_text_encoder():
    text_input = tf.keras.layers.Input(shape=text_input_shape, name="text_input")

    m = tf.keras.layers.Dense(1024, activation='relu')(text_input)
    m = tf.keras.layers.Dropout(0.2)(m)
    m = tf.keras.layers.Dense(512, activation='relu')(m)
    m = tf.keras.layers.Dropout(0.2)(m)
    m = tf.keras.layers.Dense(256, activation='relu')(m)
    m = tf.keras.layers.Dropout(0.2)(m)
    outputs = m
    
    model = keras.Model(inputs=text_input, outputs=outputs)
    return model

In [60]:
# Audio Encoder
def create_audio_encoder():
    audio_input = tf.keras.layers.Input(shape=audio_input_shape, name="audio_input")
    n = tf.keras.layers.Dense(1024, activation='relu')(audio_input)
    n = tf.keras.layers.Dropout(0.2)(n)
    n = tf.keras.layers.Dense(512, activation='relu')(n)
    n = tf.keras.layers.Dropout(0.2)(n)
    n = tf.keras.layers.Dense(256, activation='relu')(n)
    n = tf.keras.layers.Dropout(0.2)(n)
    outputs = n
    model = keras.Model(inputs=audio_input, outputs=outputs)
    return model

In [61]:
# Audio Encoder
def create_cross_encoder():
    cross_input = tf.keras.layers.Input(shape=cross_input_shape, name="cross_input")
    n = tf.keras.layers.Dense(256, activation='relu')(cross_input)
    n = tf.keras.layers.Dropout(0.2)(n)
    n = tf.keras.layers.Dense(128, activation='relu')(n)
    n = tf.keras.layers.Dropout(0.2)(n)
    outputs = n
    model = keras.Model(inputs=cross_input, outputs=outputs)
    return model

In [62]:
# Emotion and Sentiment Encoder
def create_emotion_sentiment_encoder():
    emotion_inputs = tf.keras.layers.Input(emotion_input_shape, name="emotion_input")
    sentiment_inputs = tf.keras.layers.Input(sentiment_input_shape, name="sentiment_input")
    
    #Concatenate the features
    inputs = tf.keras.layers.concatenate([emotion_inputs, sentiment_inputs],axis=1)

    m = tf.keras.layers.Dense(32, activation='relu', name="text_output1")(inputs)
    m = tf.keras.layers.Dropout(0.2, name="text_dropout1")(m)
    m = tf.keras.layers.Dense(16, activation='relu', name="text_output2")(m)
    m = tf.keras.layers.Dropout(0.2, name="text_dropout2")(m)
    outputs = m
    
    model = keras.Model(inputs=[emotion_inputs, sentiment_inputs], outputs=outputs)
    return model

In [63]:
# Caption Encoder
def create_caption_encoder():
    caption_input = tf.keras.layers.Input(shape=caption_input_shape, name="caption_input")
    n = tf.keras.layers.Dense(1024, activation='relu')(caption_input)
    n = tf.keras.layers.Dropout(0.2)(n)
    n = tf.keras.layers.Dense(512, activation='relu')(n)
    n = tf.keras.layers.Dropout(0.2)(n)
    n = tf.keras.layers.Dense(256, activation='relu')(n)
    n = tf.keras.layers.Dropout(0.2)(n)
    outputs = n
    model = keras.Model(inputs=caption_input, outputs=outputs)
    return model

In [65]:
# Set hyperparameters
learning_rates = [0.01, 0.001, 0.0001]
batch_sizes = [16, 32, 64]
hidden_units = 512
projection_units = 128
projection_units_merge = 1024
num_of_epochs = [20,30, 50]
dropout_rate = 0.5
temperature = 0.05

In [66]:
#Pretraining the encoder
encoderII = create_image_encoder()
encoderTT = create_text_encoder()
encoderAA = create_audio_encoder()
encoderIT = create_cross_encoder()
encoderIA = create_cross_encoder()
encoderTI = create_cross_encoder()
encoderTA = create_cross_encoder()
encoderAI = create_cross_encoder()
encoderAT = create_cross_encoder()
encoderCP = create_caption_encoder()
encoderES = create_emotion_sentiment_encoder()

In [67]:
def create_classifier(trainable=True):
    
    for layer in encoderES.layers:
        layer.trainable = trainable
        
    for layer in encoderCP.layers:
        layer.trainable = trainable
        
    # Inputs for each modality
    image_input = tf.keras.layers.Input(shape=image_input_shape, name="image_input")
    text_input = tf.keras.layers.Input(shape=text_input_shape, name="text_input")
    audio_input = tf.keras.layers.Input(shape=audio_input_shape, name="audio_input")
    emotion_input = tf.keras.layers.Input(shape=emotion_input_shape, name="emotion_input")
    sentiment_input = tf.keras.layers.Input(shape=sentiment_input_shape, name="sentiment_input")
    caption_input = tf.keras.layers.Input(shape=caption_input_shape, name="caption_input")
        
    #Merge emotion and sentiment inputs
    merge_emotion_sentiment_input = tf.keras.layers.concatenate([emotion_input, sentiment_input], axis=1)
    
    # Extract features using the encoder
    image_features_II = encoderII(image_input)
    audio_features_AA = encoderAA(audio_input)
    text_features_TT = encoderTT(text_input)
    

    #Cross encoders
    image_features_IT = encoderIT(image_features_II)
    image_features_IA = encoderIA(image_features_II)
    text_features_TI = encoderTI(text_features_TT)
    text_features_TA = encoderTA(text_features_TT)
    audio_features_AI = encoderAI(audio_features_AA)
    audio_features_AT = encoderAT(audio_features_AA)
    emotion_features_ES = encoderES([emotion_input, sentiment_input])
    caption_features_CP = encoderCP(caption_input)
     
    #Concatenate the features
    merge = tf.keras.layers.concatenate([image_features_IT, image_features_IA, text_features_TI, text_features_TA, audio_features_AI,
                                        audio_features_AT, emotion_features_ES, caption_features_CP],axis=1)

    
    # Add classification layers on top of the encoder
    x = tf.keras.layers.Dense(1024, activation="relu")(merge)
    x = tf.keras.layers.Dropout(0.3)(x)
    x = tf.keras.layers.Dense(512, activation="relu")(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    x = tf.keras.layers.Dense(256, activation="relu")(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    outputs = tf.keras.layers.Dense(num_classes, activation="softmax")(x)

    # Define the classifier model
    model = keras.Model(inputs=[image_input, text_input, audio_input, emotion_input, sentiment_input, caption_input], outputs=outputs, name="multimodal_classifier")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model

In [68]:
class SupervisedContrastiveLoss_cross(keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super().__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        
        #split
        zi, zj = tf.split(feature_vectors, num_or_size_splits=2, axis = 0)
        
        # Normalize feature vectors
        zi_normalized = tf.math.l2_normalize(zi, axis=1)
        zj_normalized = tf.math.l2_normalize(zj, axis=1)
       
        return SupervisedContrastiveLoss_cross.supervised_NT_Xent_tf(zi_normalized, zj_normalized, tf.squeeze(labels))
    

    # Define the supervised contrastive loss function
    def supervised_NT_Xent_tf(zi, zj, labels):
        """
        Calculates the supervised contrastive loss of the input data using NT_Xent.

        Args:
            zi: One half of the input data, shape = (batch_size, feature_1, feature_2, ..., feature_N)
            zj: Other half of the input data, must have the same shape as zi
            labels: Tensor of shape (batch_size,) with integer labels representing the class of each pair
            tau: Temperature parameter (a constant), default = 1.

        Returns:
            loss: The complete supervised NT_Xent contrastive loss
        """
        epsilon = 1e-8
        batch_size = tf.shape(zi)[0]

        # Compute cosine similarities between all pairs in zi and zj
        cosine_sim = tf.keras.losses.CosineSimilarity(axis=-1, reduction=tf.keras.losses.Reduction.NONE)
        sim_matrix = -cosine_sim(tf.expand_dims(zi, 1), tf.expand_dims(zj, 0))  # Shape: (batch_size, batch_size)

        # Optional normalization of the similarity matrix
        sim_matrix = (sim_matrix - tf.reduce_min(sim_matrix)) / (tf.reduce_max(sim_matrix) - tf.reduce_min(sim_matrix) + epsilon)

        # Create positive and negative masks
        positive_mask = tf.equal(tf.expand_dims(labels, 1), tf.expand_dims(labels, 0))
        negative_mask = tf.logical_not(positive_mask)

        # Calculate loss for each sample pair
        loss = 0
        for i in range(batch_size):
            # Select positive and negative samples
            positive_sim = tf.boolean_mask(sim_matrix[i], positive_mask[i])
            negative_sim = tf.boolean_mask(sim_matrix[i], negative_mask[i])

            # Calculate numerator and denominator with stability enhancements
            numerator = tf.reduce_sum(tf.math.exp(positive_sim / temperature))
            denominator = tf.reduce_sum(tf.math.exp(negative_sim / temperature)) + epsilon  # Avoid division by zero

            # Safeguard against log(0) or log(very small value)
            fraction = numerator / (denominator + epsilon)
            loss_ij = -tf.math.log(tf.maximum(fraction, epsilon))  # Ensure log argument is never too small
            loss += loss_ij

        # Average loss over all samples
        loss /= tf.cast(batch_size, tf.float32)

        return loss

In [69]:
class SupervisedContrastiveLoss_self(keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super().__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        
        # Normalize feature vectors
        zi_normalized = tf.math.l2_normalize(feature_vectors, axis=1)

       
        return SupervisedContrastiveLoss_self.supervised_NT_Xent_tf(zi_normalized, zi_normalized, tf.squeeze(labels))
    

    # Define the supervised contrastive loss function
    def supervised_NT_Xent_tf(zi, zj, labels):
        """
        Calculates the supervised contrastive loss of the input data using NT_Xent.

        Args:
            zi: One half of the input data, shape = (batch_size, feature_1, feature_2, ..., feature_N)
            zj: Other half of the input data, must have the same shape as zi
            labels: Tensor of shape (batch_size,) with integer labels representing the class of each pair
            tau: Temperature parameter (a constant), default = 1.

        Returns:
            loss: The complete supervised NT_Xent contrastive loss
        """
        epsilon = 1e-8
        batch_size = tf.shape(zi)[0]

        # Compute cosine similarities between all pairs in zi and zj
        cosine_sim = tf.keras.losses.CosineSimilarity(axis=-1, reduction=tf.keras.losses.Reduction.NONE)
        sim_matrix = -cosine_sim(tf.expand_dims(zi, 1), tf.expand_dims(zj, 0))  # Shape: (batch_size, batch_size)

        # Optional normalization of the similarity matrix
        sim_matrix = (sim_matrix - tf.reduce_min(sim_matrix)) / (tf.reduce_max(sim_matrix) - tf.reduce_min(sim_matrix) + epsilon)

        # Create positive and negative masks
        positive_mask = tf.equal(tf.expand_dims(labels, 1), tf.expand_dims(labels, 0))
        negative_mask = tf.logical_not(positive_mask)

        # Calculate loss for each sample pair
        loss = 0
        for i in range(batch_size):
            # Select positive and negative samples
            positive_sim = tf.boolean_mask(sim_matrix[i], positive_mask[i])
            negative_sim = tf.boolean_mask(sim_matrix[i], negative_mask[i])

            # Calculate numerator and denominator with stability enhancements
            numerator = tf.reduce_sum(tf.math.exp(positive_sim / temperature))
            denominator = tf.reduce_sum(tf.math.exp(negative_sim / temperature)) + epsilon  # Avoid division by zero

            # Safeguard against log(0) or log(very small value)
            fraction = numerator / (denominator + epsilon)
            loss_ij = -tf.math.log(tf.maximum(fraction, epsilon))  # Ensure log argument is never too small
            loss += loss_ij

        # Average loss over all samples
        loss /= tf.cast(batch_size, tf.float32)

        return loss

In [70]:
def add_projection_head_cross(encoder1,encoder2,encoder3, encoder4):
    # Inputs for each modality
    inputs_1 = tf.keras.layers.Input(shape=(1024,), name="input_1")
    inputs_2 = tf.keras.layers.Input(shape=(1024,), name="input_2")
        
    # Extract features using the encoder
    features_1 = encoder1(inputs_1)
    features_2 = encoder2(inputs_2)
    
    features_3 = encoder3(features_1)
    features_4 = encoder4(features_2)
    
    outputs_1 = tf.keras.layers.Dense(projection_units, activation="relu")(features_3)
    outputs_2 = tf.keras.layers.Dense(projection_units, activation="relu")(features_4)
    output_concat = tf.concat([outputs_1, outputs_2], axis = 0)
    model = tf.keras.models.Model(inputs=[inputs_1,inputs_2], outputs=output_concat)
    return model

In [71]:
def add_projection_head_merge(encoder1, encoder2, encoder3):
    # Inputs for each modality
    inputs_1 = tf.keras.layers.Input(shape=(1024,), name="input_1")
    inputs_2 = tf.keras.layers.Input(shape=(1024,), name="input_2")
    inputs_3 = tf.keras.layers.Input(shape=(1024,), name="input_3")
        
    # Extract features using the encoder
    features_1 = encoder1(inputs_1)
    features_2 = encoder2(inputs_2)
    features_3 = encoder3(inputs_3)

    #Merging features
    features_concat = tf.keras.layers.concatenate([features_1, features_2, features_3], axis=1)
    
    outputs_1 = tf.keras.layers.Dense(projection_units_merge, activation="relu")(features_concat)
    model = tf.keras.models.Model(inputs=[inputs_1,inputs_2, inputs_3], outputs=outputs_1)
    return model

In [72]:
def add_projection_head_emotion_sentiment(encoder1):
    # Inputs for each modality
    inputs_1 = tf.keras.layers.Input(shape=(10,), name="input_1")
    inputs_2 = tf.keras.layers.Input(shape=(3,), name="input_2")
        
    # Extract features using the encoder
    features_1 = encoder1([inputs_1, inputs_2])

    
    outputs_1 = tf.keras.layers.Dense(projection_units, activation="relu")(features_1)
    model = tf.keras.models.Model(inputs=[inputs_1,inputs_2], outputs=outputs_1)
    return model

In [73]:
def add_projection_head_caption(encoder1):
    # Inputs for each modality
    inputs_1 = tf.keras.layers.Input(shape=(1024,), name="input_1")
        
    # Extract features using the encoder
    features_1 = encoder1(inputs_1)

    outputs_1 = tf.keras.layers.Dense(projection_units, activation="relu")(features_1)
    model = tf.keras.models.Model(inputs=inputs_1, outputs=outputs_1)
    return model

In [74]:
def get_f1(y_true, y_pred): #taken from old keras source code
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    recall = true_positives / (possible_positives + K.epsilon())
    f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
    return f1_val