In [1]:
# Please replace with the correct path to the .h5 file and .json file
model_folder = "./saved_model"
# Please provide a decision boundary, should be stated in the release
decision_boundary = 0.01

In [2]:
# For Docker image tensorflow/tensorflow:2.14.0-gpu-juptyer and latest-gpu-jupyter 
# About the latest tag: sha256:efc25f8ad0ec337e8f4e2de9e7e8e391e6729481c7a7cae4bdea3137da7822c6
!pip install -q emoji
!pip install -q nltk
!pip install -q scikit-learn
!pip install -q transformers
!pip install -q tf-keras

[0m

In [3]:
from typing import Union
from emoji import demojize, is_emoji
from nltk.tokenize import TweetTokenizer
import numpy as np
from numpy import ndarray
from sklearn.metrics import f1_score, recall_score
import tensorflow as tf
from transformers import (
    BertConfig,
    BertTokenizer,
    TFBertForSequenceClassification,
)

2024-05-05 12:58:28.790797: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [4]:
def load_text_file(file_path: str) -> list[str]:
    """
    Load a text file and return an array of lines from the file.

    Args:
        file_path: str: The path to the file to load.

    Returns:
        list[str]: An array of lines from the file.
    """
    with open(file_path, "r", encoding="utf-8") as file:
        lines = file.readlines()
    return [line.strip() for line in lines]

In [5]:
# Please replace the path to your test dataset
test_text_path: str = "../dataset/test_text.txt"
test_label_path: str = "../dataset/test_labels.txt"

test_text: list[str] = load_text_file(test_text_path)
test_label: list[str] = load_text_file(test_label_path)

In [6]:
tokenizer = TweetTokenizer()

def text_processing(text: list[str]) -> list[str]:
    """
    Process text data:
    - Remove "@user"
    - Remove "#".
    - Replace "’" and triple dots in one character (…).
    - Tokenize and lowercase.
    - Normalize the tokens and join the line.
    - Replace specific strings.
    - Remove excess space after processing.

    Args:
        text: list[str]: A list of text data.

    Returns:
        list[str]: A list of processed sentences
    """
    processed_text = []
    for line in text:
        # Remove @user
        line = line.replace("@user", "")
        # Remove #
        line = line.replace("#", "")
        # Replace "’" and triple dots in one character (…)
        line = line.replace("’", "'").replace("…", "...")
        # Tokenize and lowercase
        tokens = tokenizer.tokenize(line.lower())
        # Normalize the tokens and join the line
        line = " ".join([normalize_token(token) for token in tokens])
        # Replace specific strings
        line = (
            line.replace("cannot", "can not")
            .replace("can't", "can not")
            .replace("n't ", " not ")
            # Handle cases in English, where when "n't" is replace with
            # " not ", the meaning of the word will be invalid
            .replace("wo not ", "will not ")
            .replace("sha not", "shall not")
            # "ain't" can be "am/is/are not", so it stays
            .replace("ai not ", "ain't ")
        )
        line = (
            line.replace("'m ", " am ")
            .replace("'re ", " are ")
            .replace("'ll ", " will ")
            .replace("'ve ", " have ")
            # 's can mean ownership or "is"
            .replace("'s ", " 's ")
            # 'd can mean "would" or "had"
            .replace("'d ", " 'd ")
        )
        # Remove excess spaces
        line = " ".join(line.split())
        processed_text.append(line)

    return processed_text


def normalize_token(token: str) -> str:
    """
    Normalize the provided token.
    - Replace URLs with "HTTPURL".
    - Replace emojis with respective string.

    Args:
        token: str: The string being normalized

    Return:
        str: The normalized string
    """
    if token.startswith("http") or token.startswith("www"):
        return "HTTPURL"
    elif is_emoji(token):
        return demojize(token)
    else:
        return token

In [7]:
test_text: list[str] = text_processing(test_text)
test_labels = [int(x) for x in test_label]

In [8]:
model = TFBertForSequenceClassification.from_pretrained(model_folder)
model.summary()

2024-05-05 12:58:31.055054: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:984] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-05-05 12:58:31.059547: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:984] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-05-05 12:58:31.059586: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:984] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-05-05 12:58:31.061818: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:984] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-05-05 12:58:31.061857: I external/local_xla/xla/stream_executor

Model: "tf_bert_for_sequence_classification"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bert (TFBertMainLayer)      multiple                  109482240 
                                                                 
 dropout_37 (Dropout)        multiple                  0 (unused)
                                                                 
 classifier (Dense)          multiple                  2307      
                                                                 
Total params: 109484547 (417.65 MB)
Trainable params: 109484547 (417.65 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [9]:
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
test_encodings = tokenizer(
    test_text, padding=True, truncation=True, return_tensors="tf"
)
test_labels = tf.convert_to_tensor(tf.one_hot(test_labels, 3))



In [10]:
def modify_decision_boundary(prediction: ndarray[float], boundary: float) -> list[int]:
    """
    Adjust decision method, make the model tend to predict
    "Netural" more, according to the distribution of the dataset

    If either the score of positive or negative class is in the
    +-boundary range of the netural score, and the remaining class is
    not significant higher that the netural class
    It will be classified as netural

    Args:
        prediction: ndarray[float]: The prediction with socres for each entry.
        boundary: float: The boundary, should be between 0 and 1.

    Return:
        ndarray[int]: The new prediction
    """
    final_result: list[int] = []
    for row in prediction:
        # If the class with the highest score is netural
        if np.max(row) == row[1]:
            final_result.append(1)
            continue

        negative: float = row[0]
        netural: float = row[1]
        positive: float = row[2]

        # Get the netural score range
        min_netural: float = netural * (1 - boundary)
        max_netural: float = netural * (1 + boundary)

        # If score for negative is in netural range and it is the class with highest score
        if min_netural <= negative <= max_netural and np.max(row) == negative:
            final_result.append(1)
        # If score for positive is in netural range and it is the class with highest score
        elif min_netural <= positive <= max_netural and np.max(row) == positive:
            final_result.append(1)
        elif np.max(row) == negative:
            final_result.append(0)
        elif np.max(row) == positive:
            final_result.append(2)

    return final_result

In [11]:
test_prediction = model.predict(test_encodings)
test_labels = np.argmax(test_labels, axis=1)
prediction_matrix: ndarray[float] = test_prediction.logits



#### F1 Score

In [12]:
final_result = modify_decision_boundary(prediction_matrix, decision_boundary)
f1_scores_processed = f1_score(test_labels, final_result, average=None)
macro_average_f1_processed = np.mean(f1_scores_processed)

print("F1 scores for each class:", f1_scores_processed)
print("Macro-averaged F1 score:", macro_average_f1_processed)

F1 scores for each class: [0.72588415 0.67988511 0.68693009]
Macro-averaged F1 score: 0.6975664506096498


#### Recall

In [13]:
recall_per_class = recall_score(test_labels, final_result, average=None)
macro_average_recall = sum(recall_per_class) / len(recall_per_class)

print("Recall for each class:", recall_per_class)
print("Macro-averaged recall:", macro_average_recall)

Recall for each class: [0.77769386 0.61798888 0.76126316]
Macro-averaged recall: 0.7189819660560369
