## Sentiment Analysis with Transformers
In this notebook, we will fine-tune a pre-trained transformer model for sentiment analysis using a custom dataset of tweets.

In [None]:
# import tensorflow as tf
# import numpy as np
# import pandas as pd
# from transformers import BertTokenizer, TFBertForSequenceClassification
# from sklearn.model_selection import train_test_split
# import warnings
# warnings.filterwarnings("ignore")

In [None]:
# # Load the dataset
# df = pd.read_csv('./ML Assignment Dataset - Train.csv')
# df.rename(columns={
#     'tweet_text': 'text',
#     'emotion_in_tweet_is_directed_at': 'brand',
#     'is_there_an_emotion_directed_at_a_brand_or_product': 'emotion'
# }, inplace=True)

# # Map the emotion labels to categories
# def map_to_categories(label):
#     if label in ['Negative emotion', 'negative']:
#         return 'negative'
#     elif label in ['Positive emotion', 'positive']:
#         return 'positive'
#     else:
#         return 'neutral'
# df['emotion'] = df['emotion'].apply(map_to_categories)

In [None]:
# df.drop('brand', axis=1, inplace=True)
# df = df.dropna(subset=['text'])

In [None]:
# df.to_csv('wysa.csv', index=False)

### Run the code from here once csv is saved locally

In [1]:
# import os
# from google.colab import drive
# drive.mount('/content/drive')

# # Change directory to the desired folder within the mounted drive
# os.chdir('/content/drive/My Drive/Wysa')


In [2]:
import tensorflow as tf
from transformers import TFAutoModelForSequenceClassification, AutoTokenizer
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import nltk
from nltk.corpus import wordnet
import random

nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\rohan\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\rohan\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [3]:
# Load CSV data
df = pd.read_csv('./wysa.csv')
df.shape

(8588, 2)

In [4]:
df.emotion.value_counts()

neutral     5397
positive    2672
negative     519
Name: emotion, dtype: int64

Custom augmentation of df using synonym replacement meanwhile also handling data imbalance

In [5]:
# Calculate the number of augmentations needed for each class to balance the dataset
emotion_counts = df['emotion'].value_counts()
max_count = emotion_counts.max()
augmentation_factors = {emotion: max_count // count for emotion, count in emotion_counts.items()}

print("Augmentation factors per class:", augmentation_factors)

# Function to augment text data using synonym replacement
def synonym_replacement(text, n):
    words = text.split()
    new_words = words.copy()
    random_word_list = list(set([word for word in words if wordnet.synsets(word)]))
    random.shuffle(random_word_list)
    num_replaced = 0
    for random_word in random_word_list:
        synonyms = wordnet.synsets(random_word)
        if len(synonyms) >= 1:
            synonym = random.choice(synonyms).lemmas()[0].name()
            new_words = [synonym if word == random_word else word for word in new_words]
            num_replaced += 1
        if num_replaced >= n:
            break

    sentence = ' '.join(new_words)
    return sentence

# Augment the dataset with custom augmentation factor
augmented_texts = []
augmented_labels = []

for _, row in df.iterrows():
    text, emotion = row['text'], row['emotion']
    augmented_texts.append(text)
    augmented_labels.append(emotion)

    # Determine how many times to augment each text based on its class
    n_augmentations = augmentation_factors[emotion]

    for _ in range(n_augmentations):  # Augment text n_augmentations times
        aug_text = synonym_replacement(text, n=2)  # Replace up to 2 words
        augmented_texts.append(aug_text)
        augmented_labels.append(emotion)

augmented_df = pd.DataFrame({'text': augmented_texts, 'emotion': augmented_labels})

Augmentation factors per class: {'neutral': 1, 'positive': 2, 'negative': 10}


In [6]:
augmented_df.shape

(24519, 2)

In [7]:
from transformers import TFAutoModel, AutoConfig

# Encode the labels (emotion) into numerical format
label_encoder = LabelEncoder()
augmented_df['label'] = label_encoder.fit_transform(augmented_df['emotion'])

# Splitting the dataset into training and testing sets
train_df, test_df = train_test_split(augmented_df, test_size=0.2)

# Load pre-trained RoBERTa model and tokenizer
roberta_model_name = "cardiffnlp/twitter-roberta-base-emotion-multilabel-latest"

config = AutoConfig.from_pretrained(roberta_model_name)
model = TFAutoModelForSequenceClassification.from_pretrained(roberta_model_name, config = config)

tokenizer = AutoTokenizer.from_pretrained(roberta_model_name)

All model checkpoint layers were used when initializing TFRobertaForSequenceClassification.

All the layers of TFRobertaForSequenceClassification were initialized from the model checkpoint at cardiffnlp/twitter-roberta-base-emotion-multilabel-latest.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaForSequenceClassification for predictions without further training.


In [8]:
# Tokenization and dataset preparation
def tokenize_and_format(df):
    tokenized = tokenizer(list(df['text']), padding=True, truncation=True, max_length=512, return_tensors='tf')
    return tokenized.data, tf.convert_to_tensor(df['label'])

train_data, train_labels = tokenize_and_format(train_df)
test_data, test_labels = tokenize_and_format(test_df)

In [9]:
# Create TensorFlow datasets
BATCH_SIZE = 16
train_dataset = tf.data.Dataset.from_tensor_slices((train_data, train_labels)).shuffle(len(train_df)).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_data, test_labels)).batch(BATCH_SIZE)

classifier = model
classifier.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
                   loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                   metrics=['accuracy'])

classifier.fit(train_dataset, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x1c31175b3a0>

In [10]:
# Evaluate the model
classifier.evaluate(test_dataset)



[0.30172494053840637, 0.8874388337135315]

Code to save model weights

In [11]:
# import os

# # Define the directory and the file name separately
# directory = '/content/drive/My Drive/Wysa/models'
# file_name = 'roberta_emotion_classifier_weights.h5'
# weights_save_path = os.path.join(directory, file_name)

# # Create the directory if it does not exist
# if not os.path.isdir(directory):
#     os.makedirs(directory)

# # Now save the weights to the specified file path
# classifier.save_weights(weights_save_path)


Code to save the entire model (its architecture, the weights, and even the training configuration)

In [12]:
import os

# Define the directory and the file name separately
directory = '/content/drive/My Drive/Wysa/models'
file_name = 'roberta_emotion_classifier.h5'
model_save_path = os.path.join(directory, file_name)

# Create the directory if it does not exist
if not os.path.isdir(directory):
    os.makedirs(directory)

# Now save the entire model to the specified file path
classifier.save(model_save_path)


NotImplementedError: Saving the model to HDF5 format requires the model to be a Functional model or a Sequential model. It does not work for subclassed models, because such models are defined via the body of a Python method, which isn't safely serializable. Consider saving to the Tensorflow SavedModel format (by setting save_format="tf") or using `save_weights`.

In [14]:
import os

# Define the directory and the file name separately
directory = './models'
file_name = 'roberta_emotion_classifier'
model_save_path = os.path.join(directory, file_name)

# Create the directory if it does not exist
if not os.path.isdir(directory):
    os.makedirs(directory)

# Now save the entire model in the TensorFlow SavedModel format
classifier.save(model_save_path, save_format="tf")



INFO:tensorflow:Assets written to: ./models\roberta_emotion_classifier\assets


INFO:tensorflow:Assets written to: ./models\roberta_emotion_classifier\assets


In [15]:
def predict_emotion(text, model, tokenizer, label_encoder):
    # Tokenize the input text
    tokenized_input = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors='tf')

    # Predict
    prediction = model(tokenized_input)

    # Convert logits to probabilities using softmax
    probabilities = tf.nn.softmax(prediction.logits, axis=1)

    # Get the index of the maximum value
    predicted_label_index = tf.argmax(probabilities, axis=1).numpy()[0]

    # Check if the predicted label index is within the known range
    if predicted_label_index >= len(label_encoder.classes_):
        print(f"Warning: Predicted label index {predicted_label_index} is out of known range.")
        return "Unknown"

    # Convert the index to the corresponding emotion label
    predicted_label = label_encoder.inverse_transform([predicted_label_index])

    return predicted_label

# Example usage
sample_text = "today is a okayish monday."
predicted_emotion = predict_emotion(sample_text, classifier, tokenizer, label_encoder)
print(f"Predicted Emotion: {predicted_emotion}")


Predicted Emotion: ['positive']


In [16]:
def predict_emotion(text, model, tokenizer, label_encoder):
    # Tokenize the input text
    tokenized_input = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors='tf')
    model_path = './models/roberta_emotion_classifier'

    # Load the model
    model = tf.keras.models.load_model(model_path)

    # Predict
    prediction = model(tokenized_input)

    # Convert logits to probabilities using softmax
    probabilities = tf.nn.softmax(prediction.logits, axis=1)

    # Get the index of the maximum value
    predicted_label_index = tf.argmax(probabilities, axis=1).numpy()[0]

    # Check if the predicted label index is within the known range
    if predicted_label_index >= len(label_encoder.classes_):
        print(f"Warning: Predicted label index {predicted_label_index} is out of known range.")
        return "Unknown"

    # Convert the index to the corresponding emotion label
    predicted_label = label_encoder.inverse_transform([predicted_label_index])

    return predicted_label

# Example usage
sample_text = "today is a okayish monday."
predicted_emotion = predict_emotion(sample_text, classifier, tokenizer, label_encoder)
print(f"Predicted Emotion: {predicted_emotion}")

ValueError: Exception encountered when calling layer "tf_roberta_for_sequence_classification" (type TFRobertaForSequenceClassification).

Could not find matching concrete function to call loaded from the SavedModel. Got:
  Positional arguments (11 total):
    * {'input_ids': <tf.Tensor 'input_ids_1:0' shape=(1, 10) dtype=int32>, 'attention_mask': <tf.Tensor 'input_ids:0' shape=(1, 10) dtype=int32>}
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * False
  Keyword arguments: {}

 Expected these arguments to match one of the following 2 option(s):

Option 1:
  Positional arguments (11 total):
    * {'input_ids': TensorSpec(shape=(None, 5), dtype=tf.int32, name='input_ids/input_ids')}
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * False
  Keyword arguments: {}

Option 2:
  Positional arguments (11 total):
    * {'input_ids': TensorSpec(shape=(None, 5), dtype=tf.int32, name='input_ids/input_ids')}
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * None
    * True
  Keyword arguments: {}

Call arguments received:
  • args=({'input_ids': 'tf.Tensor(shape=(1, 10), dtype=int32)', 'attention_mask': 'tf.Tensor(shape=(1, 10), dtype=int32)'},)
  • kwargs={'training': 'None'}