In [12]:
import numpy as np
import os
import random
import pandas as pd
import re
from collections import defaultdict
import math

# Utils

In [119]:
def clean_data(review):
    if isinstance(review, str):
        no_punc = re.sub(r'[^\w\s]', '', review)
        return ''.join([i for i in no_punc if not i.isdigit()])
    return ''

def read_dataset():
    df = pd.read_csv("./data/emotions_labeled.csv")
    return df

def train_test_split(set1, set2, test_size, random_state=21):
    np.random.seed(random_state)
    indices = np.arange(len(set1))
    np.random.shuffle(indices)
    
    test_size = int(test_size * len(set1))
    test_indices = indices[:test_size]
    train_indices = indices[test_size:]
    
    if isinstance(set1, pd.DataFrame):
        x_train = set1.iloc[train_indices]
        x_test = set1.iloc[test_indices]
    else:
        x_train = set1[train_indices]
        x_test = set1[test_indices]

    y_train = set2[train_indices]
    y_test = set2[test_indices]
    
    return x_train, x_test, y_train, y_test

emotions_label = {
    0: 'sadness',
    1: 'joy',
    2: 'love', 
    3: 'anger', 
    4: 'fear',
    5: 'surprise'
}

# Vectorizer

In [88]:
class TfVectorizer:
    def __init__(self, max_features= None):
        self.max_features = max_features
        self.words = {}
        self.idFreq = {}
        
    def _tokenize(self, text):
        return re.findall(r'\b\w+\b', text.lower())
    
    def _extract_punctuation_features(self, text):
        exclam_count = text.count('!') / len(text)
        question_count = text.count('?') / len(text)
        return [exclam_count, question_count]
    
    def fit(self, raw_texts):
        dict = defaultdict(int)
        texts_count = len(raw_texts)
        token_count = []
        for text in raw_texts:
            tokenized = set(self._tokenize(text))
            token_count += [tokenized]
            for token in tokenized:
                dict[token] += 1
        
        sorted_terms = sorted(dict.items(), key= lambda x: -x[1])
        if self.max_features:
            sorted_terms = sorted_terms[:self.max_features]
            
        self.words = {term: idx for idx, (term, _) in enumerate(sorted_terms)}
        self.idFreq = {
            term: math.log(1 + texts_count) / (1 + dict[term]) + 1
            for term in self.words
        }
        
        return self
    
    def transform(self, raw_texts):
        rows = []
        for text in raw_texts:
            dict = defaultdict(int)
            tokenized = self._tokenize(text)
            for token in tokenized:
                if token in self.words:
                    dict[token] += 1
                    
            max_dict = max(dict.values()) if dict else 1
            row = np.zeros(len(self.words))
            
            for token, count in dict.items():
                if token in self.words:
                    dict_val = count / max_dict
                    idFreq_val = self.idFreq[token]
                    idx = self.words[token]
                    row[idx] = dict_val * idFreq_val
                    
            punctuation_features = self._extract_punctuation_features(text)
            row = np.concatenate([row, punctuation_features])
                    
            rows.append(row)
        return np.array(rows)
    
    def fit_transform(self, raw_texts):
        self.fit(raw_texts)
        return self.transform(raw_texts)
                
        

# Train test split

In [89]:
vectorizer = TfVectorizer(max_features=1000)
dataset = read_dataset()

x_train, x_test, y_train, y_test = train_test_split(dataset['text'], dataset['label'],  test_size=0.2, random_state=21)

x_train = vectorizer.fit_transform(x_train)
x_test = vectorizer.transform(x_test)

# ANN

In [90]:
class ANN:
    def __init__(self, 
                 input_size, 
                 hidden_size,
                 output_size,
                 learning_rate= .01,
                 epochs= 100,
                 print= True):
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2. / input_size)
        self.b1 = np.zeros((1, hidden_size))
        self.W2 = np.random.randn(hidden_size, output_size)  * np.sqrt(2. / hidden_size)
        self.b2 = np.zeros((1, output_size))
        self.print = print
        self.loss_list = []
        
    def fit(self, X, y):
        for epoch in range(self.epochs):
            z1 = np.dot(X, self.W1) + self.b1
            a1 = relu(z1)
            z2 = np.dot(a1, self.W2) + self.b2
            a2 = self.softmax(z2)
            
            loss = -np.mean(np.sum(y * np.log(a2 + 1e-8), axis=1))
            self.loss_list += [loss]
            if self.print:
                print(f"Epoch {epoch}) -> Loss: {loss:.6f}")
            
            dz2 = a2 - y
            dW2 = np.dot(a1.T, dz2)
            db2 = np.sum(dz2, axis=0, keepdims=True)

            dz1 = np.dot(dz2, self.W2.T) * relu_deriv(a1)
            dW1 = np.dot(X.T, dz1)
            db1 = np.sum(dz1, axis=0, keepdims=True)

            self.W2 -= self.learning_rate * dW2
            self.b2 -= self.learning_rate * db2
            self.W1 -= self.learning_rate * dW1
            self.b1 -= self.learning_rate * db1
    
    def predict(self, X):
        z1 = np.dot(X, self.W1) + self.b1
        a1 = relu(z1)
        z2 = np.dot(a1, self.W2) + self.b2
        a2 = self.softmax(z2)
        return np.argmax(a2, axis= 1)
    
    def softmax(self, z):
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))  
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

def relu(x):
    return np.maximum(0, x)

def relu_deriv(x):
    return (x > 0).astype(float)

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_deriv(x):
    return x * (1 - x)

def to_one_hot(labels, num_classes):
    return np.eye(num_classes)[labels.reshape(-1)]


# Train model

In [109]:
# y_train = y_train.to_numpy().reshape(-1, 1)
# 
from sklearn.preprocessing import OneHotEncoder

encoder = OneHotEncoder(sparse_output=False)
y_train_onehot = encoder.fit_transform(y_train.reshape(-1, 1))

classifier = ANN(input_size=1002, hidden_size=128, epochs= 10, output_size= 6, learning_rate= 1e-5)
classifier.fit(x_train, y_train_onehot)

Epoch 0) -> Loss: 1.786638
Epoch 1) -> Loss: 1.641715
Epoch 2) -> Loss: 1.681869
Epoch 3) -> Loss: 1.584947
Epoch 4) -> Loss: 1.569892
Epoch 5) -> Loss: 1.568530
Epoch 6) -> Loss: 1.566445
Epoch 7) -> Loss: 1.569076
Epoch 8) -> Loss: 1.568035
Epoch 9) -> Loss: 1.574361


# Test trained model

In [110]:
y_predict = classifier.predict(x_test)
# y_test = y_test.to_numpy().reshape(-1, 1)
y_test_idx = np.argmax(y_test, axis=1)

accuracy = np.mean(y_predict == y_test_idx)
print(f"Accuracy: {accuracy:.4f}")

Accuracy: 0.9929


# Given the text, the model predicted 

In [120]:
phrase = 'By choosing a bike over a car, I’m reducing my environmental footprint. Cycling promotes eco-friendly transportation, and I’m proud to be part of that movement..'

new_data = pd.DataFrame({'text': [phrase]})
new_data['text'] = new_data['text'].apply(clean_data)

x_new = vectorizer.transform(new_data['text'])

predictions = classifier.predict(x_new)

print("Predicted emotion: ", emotions_label[predictions[0]])

Predicted emotion:  joy
