In [None]:
%%capture
%pip install  pandas transformers datasets matplotlib seaborn scikit-learn


In [None]:
import pandas as pd
import numpy as np

train = pd.read_parquet("go_emotions_train.parquet")
test = pd.read_parquet("go_emotions_test.parquet")

In [None]:
emotions = train.columns[1:29].values.tolist()
emotions

In [None]:
def decode_emotions_to_string(array_of_labels):
    """Decode the emotions from the row to a list of strings."""
    # active labels are 1, inactive are 0
    return [emotion for i, emotion in enumerate(emotions) if array_of_labels[i] == 1]


def decode_emotions_to_index(row):
    """Decode the emotions from the row to a list of indices."""
    return [i for i, emotion in enumerate(emotions) if row[emotion] == 1]

def decode_logits(row, k=3, alphabetic_sort=False):
    """Decode the logits from the row to a list of strings."""
    top = np.argsort(row)[-k:]
    if alphabetic_sort:
        top.sort()
    return [emotions[i] for i in top]



In [None]:
import matplotlib
train["length_text"] = train["text"].apply(len)
train['length_text'].plot(kind='hist', title='Text length distribution')

test["length_text"] = test["text"].apply(len)
test['length_text'].plot(kind='hist', title='Text length distribution')

In [None]:
max_sequence_length = 200

In [None]:
# remove rows with text length > max_sequence_length   
train = train[train["length_text"] <= max_sequence_length]
test = test[test["length_text"] <= max_sequence_length]


In [None]:
import matplotlib
train["length_text"] = train["text"].apply(len)
train['length_text'].plot(kind='hist', title='Text length distribution')

test["length_text"] = test["text"].apply(len)
test['length_text'].plot(kind='hist', title='Text length distribution')

In [None]:
additional_tokens = [ 
    # smileys
    ":)", ";)", ":P", ":D", ":(", ":'(", ":O", ":/", ":|", ":*", ":@", ">:(", 
    # emojis and their unicode representation
    ":thumbsup:", "👍", ":thumbsdown:", "👎", ":clap:", "👏", ":wave:", "👋", ":pray:", "🙏", 
    ":smile:", "😄", ":grinning:", "😀", ":laughing:", "😆", ":sweat_smile:", "😅", ":rofl:", "🤣", 
    ":blush:", "😊", ":innocent:", "😇", ":wink:", "😉", ":relieved:", "😌", ":heart_eyes:", "😍", 
    ":kissing_heart:", "😘", ":kissing:", "😗", ":kissing_smiling_eyes:", "😙", ":kissing_closed_eyes:", "😚", 
    ":yum:", "😋", ":stuck_out_tongue:", "😛", ":stuck_out_tongue_winking_eye:", "😜", 
    ":stuck_out_tongue_closed_eyes:", "😝", ":money_mouth_face:", "🤑", ":hugs:", "🤗", ":smirk:", "😏", 
    ":unamused:", "😒", ":disappointed:", "😞", ":pensive:", "😔", ":worried:", "😟", ":confused:", "😕", 
    ":persevere:", "😣", ":confounded:", "😖", ":tired_face:", "😫", ":weary:", "😩", ":cry:", "😢", 
    ":sob:", "😭", ":frowning:", "☹️", ":anguished:", "😧", ":fearful:", "😨", ":cold_sweat:", "😰", 
    ":disappointed_relieved:", "😥", ":sweat:", "😓", ":hugging_face:", "🤗", ":thinking:", "🤔", 
    ":shushing_face:", "🤫", ":lying_face:", "🤥", ":no_mouth:", "😶", ":neutral_face:", "😐", 
    ":expressionless:", "😑", ":grimacing:", "😬", ":rolling_eyes:", "🙄", ":hushed:", "😯", 
    ":frowning2:", "☹️", ":anguished:", "😧", ":open_mouth:", "😮", ":astonished:", "😲", 
    ":sleeping:", "😴", ":drooling_face:", "🤤", ":sleepy:", "😪", ":dizzy_face:", "😵", 
    ":zipper_mouth_face:", "🤐", ":nauseated_face:", "🤢", ":sneezing_face:", "🤧", ":mask:", "😷", 
    ":thermometer_face:", "🤒", ":head_bandage:", "🤕", ":smiling_imp:", "😈", ":imp:", "👿", 
    ":japanese_ogre:", "👹", ":japanese_goblin:", "👺", ":skull:", "💀", ":ghost:", "👻", ":alien:", "👽", 
    ":robot:", "🤖", ":poop:", "💩", ":smiley_cat:", "😺", ":smile_cat:", "😸", ":joy_cat:", "😹", 
    ":heart_eyes_cat:", "😻", ":smirk_cat:", "😼", ":kissing_cat:", "😽", ":scream_cat:", "🙀", 
    ":crying_cat_face:", "😿", ":pouting_cat:", "😾", ":raised_hands:", "🙌", ":clap:", "👏", 
    ":wave:", "👋", 
    ]

In [None]:
import tensorflow as tf
import keras

from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd


# Tokenize the text data
from transformers import BertTokenizerFast

# Initialize the tokenizer
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

# Add additional tokens to the tokenizer
tokenizer.add_tokens(additional_tokens)

max_sequence_length = 200

train_sequences = tokenizer(train['text'].tolist(), 
    padding='max_length', truncation=True, 
    max_length=max_sequence_length, return_tensors='np', 
    return_token_type_ids=False, 
    return_attention_mask=False)
train_labels = train[emotions].values
print(len(train_sequences['input_ids']))

test_sequences = tokenizer(test['text'].tolist(),
    padding='max_length', truncation=True, 
    max_length=max_sequence_length, return_tensors='np', 
    return_token_type_ids=False, 
    return_attention_mask=False)
test_labels = test[emotions].values
print(len(test_sequences['input_ids']))

In [None]:
test_labels

In [None]:
test_sequences[:5], test_labels[:5]

In [None]:
emotion_to_coarse = {
    'admiration': 'positive',
    'amusement': 'positive',
    'anger': 'negative',
    'annoyance': 'negative',
    'approval': 'positive',
    'caring': 'positive',
    'confusion': 'negative',
    'curiosity': 'positive',
    'desire': 'positive',
    'disappointment': 'negative',
    'disapproval': 'negative',
    'disgust': 'negative',
    'embarrassment': 'negative',
    'excitement': 'positive',
    'fear': 'negative',
    'gratitude': 'positive',
    'grief': 'negative',
    'joy': 'positive',
    'love': 'positive',
    'nervousness': 'negative',
    'optimism': 'positive',
    'pride': 'positive',
    'realization': 'positive',
    'relief': 'positive',
    'remorse': 'negative',
    'sadness': 'negative',
    'surprise': 'positive',
    'neutral': 'neutral'
}

# Create lists of emotions for each coarse category
positive_emotions = [emotion for emotion, category in emotion_to_coarse.items() if category == 'positive']
negative_emotions = [emotion for emotion, category in emotion_to_coarse.items() if category == 'negative']
neutral_emotions =  [emotion for emotion, category in emotion_to_coarse.items() if category == 'neutral']

# Sum the values for positive and negative emotions for each row
train['positive_sum'] = train[positive_emotions].sum(axis=1)
train['negative_sum'] = train[negative_emotions].sum(axis=1)
train['neutral_sum'] = train[neutral_emotions].sum(axis=1)

train


In [None]:
X_train = train_sequences['input_ids']
y_train = train_labels.astype(np.float32)
X_test = test_sequences['input_ids']
y_test = test_labels.astype(np.float32)


print(type(X_train[0]), type(y_train[0]))
print(type(X_train[0][0]), type(y_train[0][0]))
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)


In [None]:
import tensorflow as tf ; print("Num GPUs Available: ", tf.config.experimental.list_physical_devices("GPU"))


In [None]:
num_classes = 28  # Adjust based on your dataset
embedding_dim = 256  # Embedding dimension
max_sequence_length = 200  # Example sequence length, adjust as needed
tokenizer_vocab_size = tokenizer.vocab_size  # Example vocabulary size, adjust as needed
tokenizer_vocab_size

## Convultional Neural Network (CNN)

In [None]:
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Embedding, Conv1D, GlobalMaxPooling1D, Dense, Dropout, Input, BatchNormalization, LSTM, Bidirectional, Reshape
from keras.optimizers import Adam

# Number of classes for multi-label classification

# Define the CNN model architecture
model = Sequential([
    Input(shape=(max_sequence_length,)),
    Embedding(input_dim=tokenizer_vocab_size, output_dim=embedding_dim, input_length=max_sequence_length),  # Embedding layer
    Conv1D(filters=128, kernel_size=32, activation='relu'),  # Conv1D layer
    Dropout(0.2),  # Dropout for regularization
    Dense(2048, activation='relu'),  # Dense layer
    Dense(1512, activation='relu'),  # Dense layer
    Dense(1024, activation='relu'),  # Dense layer
    Conv1D(filters=128, kernel_size=16, activation='relu'),  # Conv1D layer
    Dropout(0.2),  # Dropout for regularization
    Dense(256, activation='relu'),  # Dense layer
    Dense(128, activation='relu'),  # Dense layer
    Conv1D(filters=96, kernel_size=8, activation='relu'),  # Conv1D layer
    Dropout(0.2),  # Dropout for regularization
    Dense(128, activation='relu'),  # Dense layer
    Conv1D(filters=64, kernel_size=5, activation='relu'),  # Conv1D layer
    Conv1D(filters=64, kernel_size=8, activation='relu'),  # Conv1D layer
    GlobalMaxPooling1D(),  # Global max pooling
    Dropout(0.2),  # Dropout for regularization
    Dense(128, activation='relu'),  # Dense layer
    Dropout(0.2),  # Dropout for regularization
    Dense(64, activation='relu'),  # Dense layer
    Dropout(0.2),  # Dropout for regularization
    Dense(num_classes, activation='sigmoid')  # Output layer for multi-label classification
])
# Summary of the model
model.summary()


## Transformer Encoder

In [None]:
import tensorflow as tf
from keras.models import Model
from keras.layers import Input, Dense, Dropout, LayerNormalization, Embedding, MultiHeadAttention, GlobalAveragePooling1D
from keras.optimizers import Adam

embedding_dim = 256  # Embedding dimension
max_sequence_length = 200  # Example sequence length, adjust as needed
tokenizer_vocab_size = tokenizer.vocab_size  # Example vocabulary size, adjust as needed
num_classes = 28  # Adjust based on your dataset

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = LayerNormalization(epsilon=1e-6)(inputs)
    x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = LayerNormalization(epsilon=1e-6)(res)
    x = Dense(ff_dim, activation="relu")(x)
    x = Dropout(dropout)(x)
    x = Dense(inputs.shape[-1])(x)
    return x + res

def build_transformer_model():
    inputs = Input(shape=(200,))
    embedding_layer = Embedding(input_dim=tokenizer_vocab_size, output_dim=embedding_dim, input_length=200)(inputs)
    x = transformer_encoder(embedding_layer, head_size=256, num_heads=8, ff_dim=384, dropout=0.3)
    x = transformer_encoder(x, head_size=256, num_heads=256, ff_dim=384, dropout=0.3)
    x = GlobalAveragePooling1D()(x)
    x = Dropout(0.3)(x)
    
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(num_classes, activation='sigmoid')(x)

    model = Model(inputs, outputs)
    return model

# Build the model
model = build_transformer_model()

# Summary of the model
model.summary()


In [None]:
result = model.predict(X_train[0:1])
decoded = decode_logits(result[0], k=3, alphabetic_sort=True)
y_train[0]

print(result)
print(decoded)
print(decode_emotions_to_string(y_train[0]))

## Hierarchical Model

In [None]:

from keras.models import Model
from keras.layers import Input, Dense, Embedding, LSTM, Dropout, concatenate, Flatten


input_layer = Input(shape=(max_sequence_length,))
embedding_layer = Embedding(input_dim=tokenizer_vocab_size, output_dim=max_sequence_length)(input_layer)
dropout_layer = Dropout(rate=0.5)(embedding_layer)
flat_layer = Flatten()(dropout_layer)

# Coarse-grained classification layer
coarse_output_layer = Dense(units=3, activation='softmax')(flat_layer)

# Fine-grained classification layer
fine_output_layer = Dense(units=128, activation='relu')( concatenate([coarse_output_layer, flat_layer]))
fine_output_layer = Dropout(rate=0.5)(fine_output_layer)
fine_output_layer = Dense(units=28, activation='sigmoid')(fine_output_layer)


model = Model(inputs=input_layer, outputs=fine_output_layer)
model.summary()

## Train Model

In [None]:
X_train.shape, y_train.shape

In [None]:
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Assuming y_train is your multi-label binary matrix (samples x labels)
# Flatten y_train to a 1D array for each label
class_weights = []
for i in range(y_train.shape[1]):
    class_weight = compute_class_weight(class_weight='balanced', classes=np.array([0, 1]), y=y_train[:, i])
    class_weights.append(class_weight)

# Convert to a dictionary format suitable for Keras
class_weight_dict = {i: {0: class_weights[i][0], 1: class_weights[i][1]} for i in range(len(class_weights))}

print(class_weight_dict)



In [None]:
def multi_label_weighted_binary_crossentropy(class_weight_dict):
    def loss(y_true, y_pred):
        # Initialize the total loss
        total_loss = 0.0
        
        # Loop through each label and compute the weighted binary crossentropy
        for i in range(y_true.shape[1]):
            bce = tf.keras.losses.binary_crossentropy(y_true[:, i], y_pred[:, i])
            weights = y_true[:, i] * class_weight_dict[i][1] + (1 - y_true[:, i]) * class_weight_dict[i][0]
            weighted_bce = weights * bce
            total_loss += tf.reduce_mean(weighted_bce)
        
        # Return the average loss over all labels
        return total_loss / y_true.shape[1]
    return loss

# Compile the model with the custom multi-label loss function


In [None]:
def scheduler(epoch, lr, warmup=3, steps=10):
    if epoch < 3:
        return lr * 1.3
    elif epoch % steps == 0:
        return lr * 0.9
    else:
        return lr * 0.99


In [None]:
lr = 1e-4
lrs = []
for epoch in range(20):
    lr = scheduler(epoch, lr, 3, 5)
    lrs.append(lr)
    print(epoch, lr)

# plot learning rate schedule
import matplotlib.pyplot as plt
plt.plot(lrs)
plt.xlabel('Epoch')
plt.ylabel('Learning rate')
plt.title('Learning rate schedule')
plt.show()

In [None]:
(X_test, y_test)

In [None]:
from matplotlib import pyplot as plt    

from keras.callbacks import LearningRateScheduler

# add tensorboard callback
import datetime
log_dir = "./logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1, write_images=True, embeddings_freq=1)


# Learning rate scheduler callback
lr_scheduler = keras.callbacks.LearningRateScheduler(scheduler)
initial_lr = 1e-4

auc = keras.metrics.AUC(name='auc')
model.compile(optimizer=Adam(learning_rate=initial_lr), loss=multi_label_weighted_binary_crossentropy(class_weight_dict), metrics=['accuracy', 'Precision', 'Recall'])


In [None]:

# Train the model
history = model.fit(X_train, y_train, epochs=100, batch_size=16, validation_data=(X_test, y_test), callbacks=[lr_scheduler])


In [None]:
history.history

In [None]:

# plot the history
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.plot(history.history['val_loss'], label = 'val_loss')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0, 1])
plt.legend(loc='lower right')
plt.show()



In [None]:

# Evaluate the model on the test set
loss, accuracy, precision, recall  = model.evaluate(X_test, y_test)
print(f'Test Loss: {loss}')
print(f'Test Accuracy: {accuracy}')


In [None]:

print(f'Test AUC: {auc_ths.tolist()}')


In [None]:
auc