<a href="https://www.kaggle.com/code/bilelhaddad/emotion-detection-using-fine-tuned-bert?scriptVersionId=211721845" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

<a href="https://www.kaggle.com/code/bilelhaddad/emotion-detection-using-fine-tuned-bert?scriptVersionId=211721191" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

## Importing all the necessary libraries

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # df processing, CSV file I/O (e.g. pd.read_csv)

# Input df files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import re
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
 
# remove digits and special characters
def remove_digits(text):
    pattern = r'[^a-zA-z.,!?/:;\"\'\s]' 
    #when the ^ is on the inside of []; we are matching any character that is not included in this expression within the []
    return re.sub(pattern, '', text)

def remove_special_characters(text):
    # define the pattern to keep
    pat = r'[^a-zA-z0-9.,!?/:;\"\'\s]' 
    return re.sub(pat, '', text)



In [None]:
df_filtered = pd.read_csv('/kaggle/input/emotion-from-emogo/emotion_preprocessed.csv')
df_filtered.sample(2)
#df_filtered['cleanText_NoPunct'] = df_filtered['cleanText'].apply(remove_punct)

In [None]:
!pip install transformers 

In [None]:
from helper_prabowo_ml import clean_html, remove_links, non_ascii, lower, email_address, removeStopWords, punct, remove_, remove_special_characters, remove_digits
import matplotlib.pyplot as plt
import seaborn as sns
import warnings, re
warnings.filterwarnings("ignore")
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, TFBertModel
from tensorflow.keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras.layers import Dense, Input, GlobalMaxPool1D, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.initializers import TruncatedNormal
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import CategoricalAccuracy
from sklearn.metrics import classification_report
from tensorflow.keras.utils import plot_model

In [None]:
def preprocess(text):
    """function to perform pre-processing lowercasing,removing special characters,keeping only important anonym tokens etc.. 
    text:str
    return:str
    """
    #Select english StopWords
    cachedStopWords = set(stopwords.words("english"))
    #add custom words
    cachedStopWords.update(('and','I','A','http','And','So','arnt','This','When','It','many','Many','so','cant','Yes','yes','No','no','These','these','mailto','regards','ayanna','like','email'))
    #remove stop words
    new_str = ' '.join([word for word in text.split() if word not in cachedStopWords]) 
    email = re.compile(r'[\w\.-]+@[\w\.-]+')
    links = re.compile(r'http')
    new_str = email.sub(r'',new_str)
    new_str = links.sub(r'',new_str)
    new_str = re.sub(r'\[.*?\]', '', new_str)
    new_str = re.sub(r'http\S+', '', new_str) # remove http links
    new_str = re.sub(r'bit.ly/\S+', '',new_str) #remove bit.ly links
    html = re.compile('<.*?>')
    new_str = html.sub(r'',new_str)
    new_str = remove_digits(new_str)
    new_str = remove_special_characters(new_str)
    new_str = new_str.lower()#lowercasing the text
    new_str = new_str.replace("  "," ")
    return new_str

## Combining the train, validation and test datasets into a single dataframe

In [None]:
df_filtered['num_words'] = df_filtered.text.apply(len)
df_filtered['cleanWords'] = df_filtered['cleanText'].apply(len)

In [None]:
df_filtered.head()

In [None]:
sns.displot(df_filtered.num_words,height=8,aspect=1.5)

In [None]:
#encoded_labels = {'anger': 0, 'fear': 1, 'joy': 2, 'love': 3, 'sadness': 4, 'surprise': 5} 

In [None]:
encoded_labels = {
    'anger': 0,
    'disgust': 1,
    'fear': 2,
    'happiness': 3,
    'sadness': 4,
    'surprise': 5,
    'neutral': 6
}
enc2label = {v:k for k,v in encoded_labels.items()}
enc2label

## Train-Test Split

In [None]:
#Drop Nan Values First
df_filtered.dropna(inplace=True)
df_filtered.to_csv('clean_df.csv',index=False)

In [None]:
train_data, test_data = train_test_split(df_filtered,test_size=0.3,random_state=101,shuffle=True,stratify=df_filtered.label)

## Loading the Tokenizer class and pretrained BERT model

In [None]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
bert_model = TFBertModel.from_pretrained("bert-base-uncased")

In [None]:
sns.displot(df_filtered.cleanWords,height=8,aspect=1.5)

In [None]:
df_filtered['cleanWords'].quantile(0.8)

In [None]:
max_len = 60


## Text Tokenization

In [None]:
tokenizer(text='So Dick , how about getting some coffee for tonight aaaaaaaaa kifech ?',
                   add_special_tokens=True,
                   return_tensors='tf',
                   max_length=max_len,
                   padding='max_length',
                   truncation=True,
                   return_token_type_ids=False,
                   return_attention_mask=True,
                   verbose=True
                   )

In [None]:
X_train = tokenizer(text=train_data.cleanText.tolist(),
                   add_special_tokens=True,
                   return_tensors='tf',
                   max_length=max_len,
                   padding='max_length',
                   truncation=True,
                   return_token_type_ids=False,
                   return_attention_mask=True,
                   verbose=True
                   )

X_test = tokenizer(text=test_data.cleanText.tolist(),
                   add_special_tokens=True,
                   return_tensors='tf',
                   max_length=max_len,
                   padding='max_length',
                   truncation=True,
                   return_token_type_ids=False,
                   return_attention_mask=True,
                   verbose=True
                  )

## Defining the model architecture

In [None]:
input_ids = Input(shape=(max_len,),name='input_ids',dtype=tf.int32)
attention_mask = Input(shape=(max_len,),name='attention_mask',dtype=tf.int32)

In [None]:
embeddings = bert_model(input_ids,attention_mask=attention_mask)[0] # 0: final hidden state, 1: pooling output
output = GlobalMaxPool1D()(embeddings)
output = Dense(units=128,activation='relu')(output)
output = Dropout(0.1)(output)
output = Dense(units=64,activation='relu')(output)
output = Dense(units=32,activation='relu')(output)
y = Dense(units=7,activation='softmax')(output)

model = Model(inputs=[input_ids,attention_mask],outputs=y)
model.layers[2].trainable = True

## Compiling the model

In [None]:
model.compile(loss=CategoricalCrossentropy(),
              optimizer=Adam(learning_rate=5e-5,epsilon=1e-8,decay=0.01,clipnorm=1.0),
              metrics=CategoricalAccuracy('balanced_accuracy'))

## Encoding the emotion labels

In [None]:
#train_data['Label'] = train_data.Sentiment.map(encoded_labels)
#test_data['Label'] = test_data.Sentiment.map(encoded_labels)

## Generating the model summary and plot

In [None]:
model.summary()

In [None]:
plot_model(model,'model.png',show_shapes=True,dpi=100)

## Training and fine-tuning the pretrained BERT model 

In [None]:
X_train['input_ids'],X_train['attention_mask']

In [None]:
to_categorical(test_data.label)

In [None]:
def create_model_checkpoint(model_name, save_path="./model_checkpoints"):
  # create filepath to save model
  return tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(save_path, model_name+'.keras'),
                                            monitor="val_loss",
                                            verbose=0, 
                                            save_best_only=True)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(
    monitor='val_balanced_accuracy',
    patience=2,
    restore_best_weights=True,
    verbose=1
)

In [None]:
r = model.fit(x={'input_ids': X_train['input_ids'], 'attention_mask': X_train['attention_mask']},
             y=to_categorical(train_data.label),
             epochs=10,
             batch_size=128,
             validation_data=({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']},to_categorical(test_data.label))
              ,callbacks = [early_stopping,
                  create_model_checkpoint(model_name=model.name)]
             
             ) 

In [None]:
encoded_labels = {
    'anger': 0,
    'disgust': 1, 
    'fear': 2,
    'happiness': 3,
    'sadness': 4,
    'surprise': 5,
    'neutral': 6
}
map_num2label = {v:k for k,v in encoded_labels.items()}

## Analyzing model performance

In [None]:
plt.figure(figsize=(12,8))
plt.plot(r.history['loss'],'r',label='train loss')
plt.plot(r.history['val_loss'],'b',label='test loss')
plt.xlabel('No. of Epochs')
plt.ylabel('Loss')
plt.title('Loss Graph')
plt.legend();

In [None]:
plt.figure(figsize=(12,8))
plt.plot(r.history['balanced_accuracy'],'r',label='train accuracy')
plt.plot(r.history['val_balanced_accuracy'],'b',label='test accuracy')
plt.xlabel('No. of Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy Graph')
plt.legend();

## Saving the model

In [None]:
model.save("emotion_detector-v2.h5")

## Evaluating the model on the test dataset

In [None]:
loss, acc = model.evaluate({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']},to_categorical(test_data.Label))
print("Test Categorical Cross-Entropy Loss:",loss)
print("Test Categorical Accuracy:",acc)

In [None]:
test_predictions = model.predict({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']})
test_predictions = np.argmax(test_predictions,axis=1)
print(classification_report(test_data.Label,test_predictions))

The result is quite promising as we've obtained an excellent f1-score of almost 80% for all the 6 emotion classes.

In [None]:
os.path.exists('/kaggle/working/emotion_detector.h5')

In [None]:
bert_model = TFBertModel.from_pretrained("bert-base-uncased")

# Define the path to your saved model
model_path = "/kaggle/working/emotion_detector.h5"

# Load the model with the necessary custom objects
model = tf.keras.models.load_model(
    model_path,
    custom_objects={"TFBertModel": TFBertModel}
)

In [None]:
model.summary()

In [None]:
from transformers import TFBertModel, BertTokenizer
import tensorflow as tf
tokenizer = BertTokenizer.from_pretrained("/kaggle/working/tokenizer")
#tokenizer.save_pretrained('/kaggle/working/tokenizer')

In [None]:
max_len = 100
text = 'Go away you stupid'
def predict(model,tokenizer,text,max_len=60):
    text = preprocess(text)
    inputs = tokenizer(text,max_length=max_len,padding='max_length',truncation=True,return_tensors='tf',return_token_type_ids=False,
                       return_attention_mask=True,
                       verbose=True)
    preds = model.predict([inputs["input_ids"],inputs["attention_mask"]])
    predicted_classes = tf.argmax(preds, axis=1)
    print(map_num2label[predicted_classes[0].numpy()])
predict(model,tokenizer,text)