In [40]:
import tensorflow as tf
import os
import pandas as pd
import sys
import numpy as np
project_root = os.path.dirname(os.getcwd())
sys.path.append("..")  # Adds the parent directory to the path

In [41]:
# Define custom loss
weights = pd.Series([0.104978, 0.328745,0.566277])
class WeightedCategoricalCrossEntropy(tf.keras.losses.Loss):
    def __init__(self, weights=weights, name='weighted_categorical_crossentropy', **kwargs):
        super(WeightedCategoricalCrossEntropy, self).__init__()
        self.weights = tf.cast(weights, tf.float32)
        
    def call(self, y_true, y_pred):
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.cast(y_pred, tf.float32)
        # Clip y_pred to avoid log(0)
        y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())
        weighted_losses = -self.weights * y_true * tf.math.log(y_pred)
        return tf.reduce_mean(tf.reduce_sum(weighted_losses, axis=1))
    
    
# Define custom metrics
class PrecisionMulticlass(tf.keras.metrics.Metric):
    def __init__(self, name='precision', n_class=3, **kwargs):
        super(PrecisionMulticlass, self).__init__(name=name, **kwargs)
        self.precision = self.add_weight(
            shape=(n_class,),
            name='precision',
            initializer='zeros')
        self.n_class = n_class
        self.true_positives = self.add_weight(name='true_positives', shape=(self.n_class,), initializer='zeros')
        self.false_positives = self.add_weight(name='false_positives', shape=(self.n_class,), initializer='zeros')
        
        
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.int64)
        y_pred = tf.cast(tf.one_hot(tf.argmax(y_pred, axis=1), self.n_class), tf.int64)
        
        for i in range(self.n_class):
            true_positive = tf.reduce_sum(y_true[:, i] * y_pred[:, i])
            false_positive = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true[:, i], 0), tf.equal(y_pred[:, i], 1)), tf.int64))
            
            index = [[i]]  # Index for the class we are updating
            self.true_positives.assign(tf.tensor_scatter_nd_add(self.true_positives, index, [true_positive]))
            self.false_positives.assign(tf.tensor_scatter_nd_add(self.false_positives, index, [false_positive]))
            
    def result(self):
        precision_per_class = self.true_positives / (self.true_positives + self.false_positives + tf.keras.backend.epsilon())
        return tf.reduce_mean(precision_per_class)
    
    def reset_state(self):
        self.true_positives.assign(tf.zeros(self.n_class))
        self.false_positives.assign(tf.zeros(self.n_class))
        
        

class RecallMulticlass(tf.keras.metrics.Metric):
    def __init__(self, name='recall', n_class=3, **kwargs):
        super(RecallMulticlass, self).__init__(name=name, **kwargs)
        self.recall = self.add_weight(
            shape=(n_class,),
            name='recall',
            initializer='zeros')
        self.n_class = n_class
        self.true_positives = self.add_weight(name='true_positives', shape=(self.n_class,), initializer='zeros')
        self.false_negatives = self.add_weight(name='false_negatives', shape=(self.n_class,), initializer='zeros')
        
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.int64)
        y_pred = tf.cast(tf.one_hot(tf.argmax(y_pred, axis=1), self.n_class), tf.int64)
        
        for i in range(self.n_class):
            true_positive = tf.reduce_sum(y_true[:, i] * y_pred[:, i])
            false_negative = tf.reduce_sum(tf.cast(tf.logical_and(tf.equal(y_true[:, i], 1), tf.equal(y_pred[:, i], 0)), tf.int64))
            
            index = [[i]]  # Index for the class we are updating
            self.true_positives.assign(tf.tensor_scatter_nd_add(self.true_positives, index, [true_positive]))
            self.false_negatives.assign(tf.tensor_scatter_nd_add(self.false_negatives, index, [false_negative]))
            
    def result(self):
        recall_per_class = self.true_positives / (self.true_positives + self.false_negatives + tf.keras.backend.epsilon())
        return tf.reduce_mean(recall_per_class)
    
    def reset_state(self):
        self.true_positives.assign(tf.zeros(self.n_class))
        self.false_negatives.assign(tf.zeros(self.n_class))
            

In [None]:
model = tf.keras.models.load_model(os.path.join(project_root, 'models', 'bi_lstm_model'), custom_objects={"WeightedCategoricalCrossEntropy": WeightedCategoricalCrossEntropy,
                                                                                                                 "PrecisionMulticlass": PrecisionMulticlass,
                                                                                                                 "RecallMulticlass": RecallMulticlass})
model.summary()

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_4 (Embedding)     (None, None, 300)         8861400   
                                                                 
 bidirectional_2 (Bidirecti  (None, None, 128)         186880    
 onal)                                                           
                                                                 
 lstm_layer (LSTM)           (None, 64)                49408     
                                                                 
 dense1 (Dense)              (None, 64)                4160      
                                                                 
 dense2 (Dense)              (None, 16)                1040      
                                                                 
 output (Dense)              (None, 3)                 51        
                                                      

### Try to predict from text

In [80]:
input_text = "However this sentence is very insulting you should go fuck yourself bitch this is a test sentence just to see if the model works but you still are a dumb ass !"

In [None]:
# Add the parent directory to sys.path
import sys
import os
import re
from utils.artifacts import REGEX_REMOVE, REGEX_REPLACE
from textblob import TextBlob
import signal
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords, wordnet, words, webtext, gutenberg, brown
from utils.cleaning import process_batch
from utils.artifacts import slang_dict

# NLTK resources
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('words', quiet=True)
nltk.download('punkt_tab', quiet=True)
nltk.download('webtext', quiet=True)
nltk.download('gutenberg', quiet=True)
nltk.download('brown', quiet=True)
stop_words = stopwords.words('english')
combined_corpus = set(words.words()) | set(wordnet.words()) | set(webtext.words()) | set(gutenberg.words()) | set(brown.words())
combined_corpus = {word.lower() for word in combined_corpus}

def clean_data(batch, stop_words = stop_words, slang_dict=slang_dict):
        def clean_text(text: str) -> str:
            # Apply REGEX_REMOVE and REGEX_REPLACE
            for pattern in REGEX_REMOVE:
                text = re.sub(pattern, "", text)
            for pattern, repl in REGEX_REPLACE.items():
                text = re.sub(pattern, repl, text)
            
            # Apply additionnal text cleaning steps
            text = re.sub(r'^RT @\w+: ', '', text)
            text = re.sub(r'http\S+', ' ', text)
            text = re.sub(r'\b\w*jpeg\w*\b|\b\w*jpg\w*\b', '', text)
            text = re.sub(r'\n', ' ', text)
            text = re.sub(r'@\w+', '<PERSON>', text)
            text = re.sub(r'[^\w\s]', '', text)
            text = re.sub(r'\d+', '', text)
            text = re.sub(r'\b(\w+)\b\s+\1\b', '', text)
            text = text.strip().lower()
            text = re.sub(r'[^\x00-\x7F]+', ' ', text)
            text = re.sub(r'[\x80-\xFF]', '', text)
            return text

        
        def correct_text(text: str, stop_words, slang_dict: dict) -> str:
            tokens = text.split()
            tokens = [slang_dict.get(word, word) for word in tokens]
            #tokens = [word for word in tokens if word not in stop_words]
            tokens = [word for word in tokens if len(word) < 15]
            text = ' '.join(tokens)
            corrected_text = str(TextBlob(text).correct())
            return corrected_text
        
        
        def lemma_text(tokens: list) -> list:
            lemmatizer = WordNetLemmatizer()
            
            ls = [lemmatizer.lemmatize(token, 'v') for token in tokens]
            ls = [lemmatizer.lemmatize(token, 'n') for token in ls]
            ls = [lemmatizer.lemmatize(token, 'a') for token in ls]
            return ls
        
        def replace_unknown_tokens(tokens: list) -> list:
            return [token if token in combined_corpus else '<UNK>' for token in tokens]
        
        
        def combined_cleaning(text: str) -> list:
            text = clean_text(text)
            corrected_text = correct_text(text, stop_words, slang_dict)
            return corrected_text
        
        def tokenize(text: str) -> list:
            tokens = word_tokenize(text, preserve_line=True)
            tokens = lemma_text(tokens)
            tokens = replace_unknown_tokens(tokens)
            return tokens
        
        # Process each text in the batch
        batch['corrected_text'] = batch['text'].apply(combined_cleaning)
        batch['tokens'] = batch['corrected_text'].apply(tokenize)
        return batch

In [93]:
dataframe = pd.DataFrame([input_text], columns=['text'])

# Clean the text
cleaned_data = clean_data(dataframe, stop_words, slang_dict)
cleaned_data

Unnamed: 0,text,corrected_text,tokens
0,However this sentence is very insulting you sh...,however this sentence is very insulting you sh...,"[however, this, sentence, be, very, insult, yo..."


In [None]:
#def feature_engineering(data, max_length:int=170, vocab_size:int=29538):

data = cleaned_data.copy()

# Vocabulary size and vocabulary list for embedding)
embedding_matrix = np.load(os.path.join(project_root, 'models', 'embedding_matrix_300.npy'))
vocab = list(np.load(os.path.join(project_root, 'models', 'vocab.npy'), allow_pickle=True))
vocab.append("<PAD>")
vocab_size = len(vocab)
# Token to index
token_to_index = {token: idx for idx, token in enumerate(vocab)}
# Convert text tokens to index
data['tokens_index'] = data['tokens'].apply(lambda x: [token_to_index.get(token, token_to_index["<UNK>"]) for token in x])


# Pad sequences
from tensorflow.keras.utils import pad_sequences

max_length = 170

pad_sequences = pad_sequences(data.tokens_index, maxlen=max_length, padding='post', truncating='post')
data['padded_tokens'] = [list(row) for row in pad_sequences]

# Convert to tensor for inference
data_tensor = tf.ragged.constant(data["padded_tokens"], dtype=tf.int32)
data_tensor = data_tensor.to_tensor(default_value=0)

In [95]:
model.predict(data_tensor)



array([[0.98801464, 0.0088652 , 0.00312017]], dtype=float32)