In [None]:
#import modules
import pandas as pd
import numpy as np
from sklearn.utils import shuffle
import seaborn as sns
import tensorflow as tf
from transformers import BertTokenizer
from transformers import AutoTokenizer
from transformers import AutoModelForMaskedLM
from transformers import TFBertModel
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from keras.layers import Dropout
from tqdm.auto import tqdm
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import OneHotEncoder
import warnings
warnings.filterwarnings('ignore')

In [None]:
#upload the TwIT Dataset
twit=pd.read_csv(r"C:\Users\ridol\OneDrive\Desktop\Emotion Recognition\Dataset\TwIT.csv",
                 sep=';',header=0, encoding='utf8', dtype={'Text':'str','Emozione':'int'})

twit.set_index('Id',drop=True,inplace=True) #I set as row index the id

twit.rename(mapper={'Emozione':'Emotion'},axis='columns',inplace=True) #rename the colums 'Emozione' in 'Emotion'

##Data Exploration
twit.info()
print(twit.isnull().sum()) #there is no null value
freq = twit.groupby(['Emotion']).count()
print(freq)
#Result --> Happiness(0): 549;  Trust(1):504; Sadness(2):479; Anger(3):513; Fear(4):518; Disgust(5):545;



In [None]:
#Train and Test split
twit_shuffled=shuffle(twit)
Training_set= twit_shuffled.iloc[623:] #80%
Testing_set= twit_shuffled.iloc[:623] #20%

In [None]:
#download the BERT tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-cased") #download From Huggingface library

#Create the input id and attention mask
X_input_ids = np.zeros((len(Training_set), 64))
X_attn_masks = np.zeros((len(Training_set), 64))
X_input_ids_test = np.zeros((len(Testing_set), 64))
X_attn_masks_test = np.zeros((len(Testing_set), 64))


#Create the function for training data
def generate_training_data(df, ids, masks, tokenizer):
    for i, text in tqdm(enumerate(Training_set['Text'])):
        tokenized_text = tokenizer.encode_plus(
            text,
            max_length=64,
            truncation=True,
            padding='max_length',
            add_special_tokens=True,
            return_tensors='tf'
        )
        ids[i, :] = tokenized_text.input_ids
        masks[i, :] = tokenized_text.attention_mask
    return ids, masks

X_input_ids, X_attn_masks = generate_training_data(Training_set, X_input_ids, X_attn_masks, tokenizer)

#one hot encoding about labels
labels= np.zeros((len(Training_set), 6))
labels[np.arange(len(Training_set)), Training_set['Emotion'].values] = 1   #one-hot encoded target tensor

#creating a data pipeline using tensorflow dataset utility, creates batches of data for easy loading
dataset = tf.data.Dataset.from_tensor_slices((X_input_ids, X_attn_masks, labels))

#create the map function
def SentimentDatasetMapFunction(input_ids, attn_masks, labels):
    return {
        'input_ids': input_ids,
        'attention_mask': attn_masks
    }, labels

dataset = dataset.map(SentimentDatasetMapFunction)


# batch size, drop any left out tensor
dataset = dataset.shuffle(1000).batch(8, drop_remainder=True)

p =0.8
train_size = int((len(Training_set)//8)*p) #divide the dataset into 16 batch and for each batch we will take the 80% of data

#Train and validation set
train_dataset = dataset.take(train_size)
val_dataset = dataset.skip(train_size)



In [None]:
#BERT model
bert_model= TFBertModel.from_pretrained('bert-base-multilingual-cased') # bert base model with pretrained weights

# defining 2 input layers for input_ids and attn_masks
input_ids = tf.keras.layers.Input(shape=(64,), name='input_ids', dtype='int32')
attention_masks = tf.keras.layers.Input(shape=(64,), name='attention_mask', dtype='int32')

bert_embds = bert_model.bert(input_ids, attention_mask=attention_masks)[1] # 0 -> activation layer (3D), 1 -> pooled output layer (2D)
intermediate_layer = tf.keras.layers.Dense(64, activation='relu', name='intermediate_layer')(bert_embds)
intermediate_layer = Dropout(0.2)(intermediate_layer)
output_layer = tf.keras.layers.Dense(6, activation='softmax', name='output_layer')(intermediate_layer) # softmax -> calcs probs of classes


emotional_model = tf.keras.Model(inputs=[input_ids, attention_masks], outputs=output_layer)
emotional_model.summary()

#optimizer, loss-function and accuracy of the emotional model
optim = tf.keras.optimizers.Adam(learning_rate=2e-5, decay=1e-6)
loss_func = tf.keras.losses.CategoricalCrossentropy()
acc = tf.keras.metrics.CategoricalAccuracy('accuracy')

emotional_model.compile(optimizer=optim, loss=loss_func, metrics=[acc])



#Training the model
###### ALERT ##### the following code can request good CPU or GPU. otherwise it will take a long time to train the model

hist = emotional_model.fit(
    train_dataset,
    validation_data= val_dataset,
    epochs=6  #8-10 epochs are better but need a good GPU
)

emotional_model.save('emotional_model')


In [None]:
#Prediction
emotional_model = tf.keras.models.load_model('emotional_model')

tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-cased")

#Create the dictionary for test set
dictionary_test_set = dict([(k,v) for k,v in zip(Testing_set['Text'], Testing_set['Emotion'])])
y_true = list(dictionary_test_set.values())
y_true =list(y_true)

y_predict=[]
for k in dictionary_test_set:

    def prepare_data(input_text, tokenizer):
        token = tokenizer.encode_plus(
            input_text,
            max_length=64,
            truncation=True,
            padding='max_length',
            add_special_tokens=True,
            return_tensors='tf'
        )
        return {
            'input_ids': tf.cast(token.input_ids, tf.float64),
            'attention_mask': tf.cast(token.attention_mask, tf.float64)
        }

    tokenized_input_text= prepare_data(k, tokenizer)
    probs= emotional_model.predict(tokenized_input_text)
    y_predict.append(np.argmax(probs[0]))
    print(k, np.argmax(probs[0]))


In [None]:
#Print the Classification report
print('\tClassification Report for BERT:\n\n',classification_report(y_true,y_predict,target_names=['Happiness', 'Trust', 'Sadness','Anger','Fear','Disgust']))

#Print Confusion Matrix
labels_6 = ['Happiness','Trust','Sadness','Anger','Fear','Disgust']
def print_cf1(y_test, y_hat):
    cm = confusion_matrix(y_test, y_hat)
    sns.set(font_scale= 1.4, color_codes=True, palette="deep")
    sns.heatmap(pd.DataFrame(cm, index=labels_6, columns=[0,1,2,3,4,5]),
                annot = True,
                annot_kws = {"size":16},
                fmt="d",
                cmap="YlGnBu")
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted Value")
    plt.xticks([0,1,2,3,4,5], labels_6, rotation=45)
    plt.ylabel("True Value")
    plt.show()

print_cf1(y_true, y_predict)