In [3]:
!pip install gensim



In [4]:
from __future__ import annotations # For annotation purposes
import re
from math import log

In [5]:
class SentimentClassifier:
    """
    Class abstraction for any sentiment analyser
    """
    def Train(self, data : list[tuple[str,int]]):
        """
        Train the classifier.

        Parameters:
        data : Train data. Should be a list of tuples of (input paragraph, sentiment class)
        """
        pass

    def Predict(self, sentance : str) -> int:
        """
        Predict class for a string
        """
        pass
            
    def Test(self, data : list[str]) -> list[int]:
        """
        Test the classifier on a list of strings. 
        Returns a same size list of ints each predicting the class of the corresponding input
        """
        return [self.Predict(x) for x in data]

    

### 1.a
# Naive Bayes Classifier

In [6]:
def ExtractWords(s : str) -> list[str]:
    """Extract each word from a sentance"""
    return re.findall(r'\b\w+\b', s)

ExtractWords(" Hello! How are you? This is a simple test. It checks if the function works properly!!")

['Hello',
 'How',
 'are',
 'you',
 'This',
 'is',
 'a',
 'simple',
 'test',
 'It',
 'checks',
 'if',
 'the',
 'function',
 'works',
 'properly']

In [7]:
STOPWORDS = set()
try:
    with open("stopwords.txt") as words:
        STOPWORDS.add(words.readline().strip())
except FileNotFoundError:
    STOPWORDS = None

In [8]:
## Bag of words as a datatype
class BagOfWords:
    """
    A bag of words, holds information about the frequency of each word in a document
    """

    def __init__(self, s : str = "", stopwordRemoval : bool = False) -> dict[str:int]:
        """
        Converts a string into a bag of words
        """
        self.bow_dict = {}

        for word in ExtractWords(s):
            if stopwordRemoval and STOPWORDS:
                if word in STOPWORDS:
                    continue
            if word in self.bow_dict:
                self.bow_dict[word] += 1
            else:
                self.bow_dict[word] = 1

    # Modifiers
    def Add(self, s : str | BagOfWords):
        """
        Add 2 bag of words or a bag of words to a string
        """
        if isinstance(s, str):
            s = BagOfWords(s)

        if not isinstance(s, BagOfWords):
            raise TypeError("BagOfWords can only be added to strings or BagOfWords")

        s_dict = s.GetFrequencyDict()
        return_bow = self.copy()
        for word in s_dict:
            if word in self.bow_dict:
                return_bow.bow_dict[word] += s_dict[word]
            else:
                return_bow.bow_dict[word] = s_dict[word]
        
        return return_bow
    
    def __add__(self, other : str | BagOfWords):
        return self.Add(other)

    def RemoveWord(self, word : str):
        if word not in self.bow_dict:
            raise KeyError("Word not in bag of words")

        del self.bow_dict[word]
        
    # Queries
    def GetFrequencyDict(self) -> dict[str:int]:
        """
        Return the bag of words as a frequency dict
        """
        return self.bow_dict
    
    def GetFrequency(self, word) -> int:
        """
        Return the frequency of a word
        """
        if word not in self.bow_dict:
            return 0
        return self.bow_dict[word]

    def GetWords(self) -> list:
        """
        Get all types in this bag of words
        """
        return list(self.bow_dict.keys())
    
    def WordCount(self) -> int:
        """
        Total number of tokens in this bag of words
        """
        return sum(self.bow_dict.values())
    
    # Overrides
    def __str__(self):
        return str(self.bow_dict)
    
    def copy(self) -> BagOfWords:
        new_bow = BagOfWords()
        new_bow.bow_dict = self.bow_dict.copy()
        return new_bow
        
    

In [9]:
class NaiveBayesClassifier(SentimentClassifier):
    """
    A Naive Bayes sentiment classifier
    """

    def __init__(self):
        self._priors = {}
        self._likelihoods = {}

        self.classes = set()
        self.words = set()
        self.n = 0

    def Prior(self, c) -> float[0,1]:
        """
        Get prior of a class c
        """
        return self._priors[c]/self.n
    
    def Likelihood(self, d, c) -> float[0,1]:
        """
        Get likelihood of a word d given a class c
        """
        return self._likelihoods[c][d]
    
    def CalculatePriors(self, class_data : list[int]):
        """
        Calculates priors
        """
        # Note that the full prior is not calculated here. 
        # The values here have to be divided by total number of classes to get prior
        # Refer function Prior for that
        for cl in class_data:
            self.classes.add(cl)

            if cl in self._priors:
                self._priors[cl] += 1
            else:
                self._priors[cl] = 1

    def CalculateLikelihood(self, data : list[tuple[str,int]], add_one = True):
        """
        Calculates likelihood

        To be called only after calculating priors
        """
        assert(self.classes)

        class_documents = {}

        # Creating per class super-documents
        for sen, cl in data:
            if cl in class_documents:
                class_documents[cl] += sen
            else:
                class_documents[cl] = BagOfWords(sen)

        # Entire document as a single bag of words
        full_document = sum([bow for bow in class_documents.values()], start = BagOfWords())
        self.words = full_document.GetWords()

        for cl in self.classes:
            word_count = class_documents[cl].WordCount()

            # Smoothening
            if add_one:
                word_count += len(full_document.GetWords())

            self._likelihoods[cl] = {}

            for w in self.words:
                # P(w/cl) = count(w, cl)/Σ count(wi, cl)
                # With Smoothening: P(w/cl) = count(w, cl) + 1 / Σ count(wi, cl) + |V|
                self._likelihoods[cl][w] = ( class_documents[cl].GetFrequency(w) + int(add_one) ) / word_count


        self.class_documents = class_documents

        
    def Train(self, data : list[tuple[str,int]]):
        """
        Train the Bayes classifier.

        Parameters:
        data : Train data. Should be a list of tuples of (input paragraph, sentiment class)
        """
        self.n = len(data)

        self.CalculatePriors([cl for _, cl in data])

        self.CalculateLikelihood(data)

        # PrettyPrint(self.n)
        # PrettyPrint(self._priors)
        # PrettyPrint(self._likelihoods)

    def Predict(self, sentance : str) -> int:
        """
        Predict class for a string
        """
        max_class = None
        max_class_prob = float("-inf")

        words = ExtractWords(sentance)

        for cl in self.classes:
            log_likelihood = 0
            for word in words:
                if word not in self.words:
                    continue
                # log(P(wi/c))
                log_likelihood += log(self.Likelihood(word, cl))

            # log(P(c/w1, w2.. wm)) = log(P(c))   +   Σ log(P(wi/c))
            class_prob = log(self.Prior(cl)) + log_likelihood

            if class_prob > max_class_prob:
                max_class_prob = class_prob
                max_class = cl

        return max_class


### 1.b
# Logistic Regression Classifier

In [10]:
import numpy as np

In [11]:
class FeatureExtractor:
    """
    The abstract class that the logistic regression classifier will use to extract features from a string
    """

    def Extract(self, s : str) -> list:
        """
        Extract the features from a string and return them as a list
        """
        pass

    def Train(self, data : list[str]):
        """
        Compute all features from the given dataset
        """
        pass

    def GetFeatureCount(self) -> int:
        """
        Returns number of features
        """
        pass

class BagOfWordsExtractor(FeatureExtractor):
    """
    A FeatureExtractor that treats the frequency of each word as a feature
    """
    def __init__(self) -> None:
        self.features = []
    
    def Train(self, data: list[str]) -> list:
        self.features = sum([BagOfWords(s, stopwordRemoval=True) for s in data], start=BagOfWords()).GetWords()
    
    def Extract(self, s: str) -> list:
        bow = BagOfWords(s, stopwordRemoval=True)
        features = self.features.copy()
        for i,feature in enumerate(features):
            features[i] = bow.GetFrequency(feature)

        return features
    
    def GetFeatureCount(self) -> int:
        """
        Returns number of features
        """
        return len(self.features)

In [34]:
# Word2Vec
from gensim.models import Word2Vec

class Word2VecExtractor(FeatureExtractor):
    """
    A FeatureExtractor that uses Word2Vec embeddings from gensim
    """
    def __init__(self, vector_size=100, window=5, min_count=1, sg=0):
        # Initialize Word2Vec parameters
        self.vector_size = vector_size  # Dimensionality of word vectors
        self.window = window  # Maximum distance between the current and predicted word
        self.min_count = min_count  # Ignores all words with total frequency lower than this
        self.sg = sg  # Training algorithm: 0 for CBOW, 1 for Skip-gram
        self.model = None

    def Train(self, data: list[str]):
        # Train a Word2Vec model on the provided dataset
        tokenized_data = [s.split() for s in data]
        self.model = Word2Vec(
            tokenized_data,
            vector_size=self.vector_size,
            window=self.window,
            min_count=self.min_count,
            sg=self.sg
        )
        self.model.train(tokenized_data, total_examples=len(tokenized_data), epochs=10)  # Adjust epochs as needed

    def Extract(self, s):
        """
        Extract Word2Vec embeddings for the words in the input string `s` and return a list of feature vectors.
        """
        doc = s.split()
        words = [word for word in doc if word in self.model.wv]
        if not words:
            return np.zeros(self.model.vector_size)
        return np.mean([self.model.wv[word] for word in words], axis=0)

    def GetFeatureCount(self):
        """
        Returns the size of the Word2Vec embeddings, which is the length of the feature vectors.
        """
        if self.model is None:
            return 0
        return self.model.vector_size



In [13]:
def SoftMax(c, class_scores) -> float:
    """
    Calculates the softmax of a certain class given the values for all classes
    """
    return np.exp(class_scores[c], sum(np.exp(x) for x in class_scores))

def SoftMaxV(class_scores : list[float]) -> np.ndarray[float]:
    """
    Vectorised SoftMax
    """
    exp_z = np.exp(class_scores - np.max(class_scores, axis=1, keepdims=True))
    return exp_z / exp_z.sum(axis=1, keepdims=True)

def Sigmoid(x):
    """Sigmoid function. Already vectorized"""
    return 1/1+np.exp(x)


The classifier here uses cross entropy loss as objective function and performs gradient descend. Feature extraction and classification function can be provided as input to the model.

Also note that feature extraction is done as a part of training in the model itself and is not done in preprocessing. This allows for increased flexibility in the model (Any arbitrary feature extraction algorithm can be used) but also adds the training overhead of the feature extractor onto the model

In [24]:
class LogisticRegressionClassifier(SentimentClassifier):
    """
    Logistic regression for multiple classes
    """
    def __init__(self, 
                 featureExtractor : FeatureExtractor = BagOfWordsExtractor(), 
                 classificationFunction : callable[[list[float]], list[float]] = SoftMaxV,
                 epochs = 1000, learningRate = 0.1):
        
        self.featureExtractor = featureExtractor
        self.classificationFunction = classificationFunction
        # self.objectiveFunction = 
        self.epochs = epochs
        self.learningRate = learningRate

        self.classes = None
        self.weights = None
        self.bias = None

    def GradientDescend(self, X : np.ndarray, Y : np.ndarray):
        """
        Perform gradient descent on the given dataset to find weights and biases
        """
        for epoch in range(self.epochs):

            # Z = X.W + B
            Z = np.dot(X,self.weights) + self.bias

            # H = σ(W) = Y_calc
            H = self.classificationFunction(Z)

            # c = (-1/M) Σ Yi log(Hi)
            # cost = (-1/self.N)*(sum([np.dot(Y.T,np.log(H))]))     # Cross entropic loss

            if not epoch%100: print(f"[GradDesc] epochs = {epoch}")

            # Gradient = X . (Y_calc - Y_actual)
            dcost = np.dot(X.T, (H - Y)) / self.N       # Derivative of cross entropic loss
            
            # Gradient descent updation
            self.weights -= self.learningRate * dcost
            self.bias -= self.learningRate * np.sum(H - Y, axis=0) / self.N


    def Train(self, data: list[tuple[str, int]]):
        self.N = len(data)

        sentances, classes = list(zip(*data))

        # Extract classes and features
        self.classes = list(set(classes))
        self.featureExtractor.Train(sentances)

        # Initialize weights and biases
        self.weights = np.zeros((self.featureExtractor.GetFeatureCount(), len(self.classes)))
        self.bias = np.zeros((1, len(self.classes)))

        # Initialise input and output matrises for gradient descent
        ## Y_calc = σ(X.W + B)
        Y = np.eye(len(self.classes))[[self.classes.index(y) for y in classes]]
        X = np.array([self.featureExtractor.Extract(x) for x in sentances])

        self.GradientDescend(X, Y)

    def Predict(self, s : str):
        x = self.featureExtractor.Extract(s)
        z = self.classificationFunction(np.dot(x, self.weights) + self.bias)
        return self.classes[np.argmax(z, axis=1)[0]]

        

## Testing

In [15]:
import csv
import time


In [16]:
def PrettyPrint(o, depth=0, end="\n"):
    """
    For prettyprinting datatypes
    """
    if isinstance(o, dict):
        print("\t"*depth + "{")
        for key in o:
            PrettyPrint(key, depth+1, end=":\n")
            PrettyPrint(o[key], depth+2)
        print("\t"*depth + "}")
    elif isinstance(o, list):
        print("\t"*depth + "[")
        for value in o:
            PrettyPrint(value, depth+1)
        print("\t"*depth + "]")
    elif isinstance(o, BagOfWords):
        PrettyPrint(o.bow_dict)
    else:
        print("\t"*depth, end="")
        print(o, end=end)

In [17]:
def ConfusionMatrix(predicted : list, expected : list, classes : set) -> dict[dict]:
    """
    Creates an nxn confusion matrix (n being the number of classes)

    :param predicted: List of predicted labels
    :param expected: List of true labels
    :param classes: Set of unique class labels
    :return: A 2D dictionary representing the confusion matrix
    """
    confusion_matrix = {true_class: {predicted_class: 0 for predicted_class in classes} for true_class in classes}
    
    for true_label, predicted_label in zip(expected, predicted):
        confusion_matrix[predicted_label][true_label] += 1

    return confusion_matrix

def GetPrecisions(confusion_matrix : dict[dict]) -> dict:
    precisions = {}

    for true_class in confusion_matrix:
        precisions[true_class] = confusion_matrix[true_class][true_class] / sum( confusion_matrix[cl][true_class] for cl in confusion_matrix )

    return precisions

def GetRecalls(confusion_matrix : dict[dict]) -> dict:
    recalls = {}

    for true_class in confusion_matrix:
        recalls[true_class] = confusion_matrix[true_class][true_class] / sum(confusion_matrix[true_class].values())

    return recalls

def GetAccuracy(confusion_matrix : dict[dict]) -> float:
    total = 0
    correct = 0
    for predicted_label in confusion_matrix:
        for true_label in confusion_matrix[predicted_label]:
            if true_label == predicted_label:
                correct += confusion_matrix[predicted_label][true_label]
            total += confusion_matrix[predicted_label][true_label]

    return correct/total

def PrintConfusionMatrix(confusion_matrix : dict[dict]):
    """
    Prints a Confusion matrix as an n x n table.

    :param confusion_matrix: A 2D dictionary representing the confusion matrix
    """
    LEN = 10

    classes = sorted(confusion_matrix.keys())
    
    # Print the header row
    header = [' '*LEN] + classes
    header_line = " | ".join(label.center(LEN) for label in header)
    print(header_line)
    print("-" * len(header_line))
    
    for true_class in classes:
        row = [ true_class.ljust(LEN) ] + [str(confusion_matrix[true_class][predicted_class]).center(LEN) for predicted_class in classes]
        row_line = " | ".join(str(cell) for cell in row)
        print(row_line)

    print()
    print("Precisions:")
    PrettyPrint(GetPrecisions(confusion_matrix), depth=1)
    print("Recalls:")
    PrettyPrint(GetRecalls(confusion_matrix), depth=1)
    print("Accuracy:")
    PrettyPrint(GetAccuracy(confusion_matrix))


In [18]:
# Toy example
train_filename = "data/toy_train_1.csv"
test_filename = "data/toy_test_1.csv"

with open(train_filename) as file:
    csv_file = csv.reader(file)
    train_data = []
    for line in csv_file:
        train_data.append(tuple(line))

with open(test_filename) as file:
    csv_file = csv.reader(file)
    test_data = []
    for line in csv_file:
        test_data.append(tuple(line))

test_sentences, test_classes = list(zip(*test_data))

In [39]:
# Test file
filename = "data/cleaned_data_150k.csv" # This version is cut down to 10k entries

with open(filename) as file:
    csv_file = csv.reader(file)
    data = []
    for line in csv_file:
        data.append(tuple(line))

data.pop(0) # Header

n = len(data)

# Train test split in 5:1 ratio
split_fraction = 0.2
test_data, train_data = data[:int(n*split_fraction)], data[int(n*split_fraction):]

test_sentences, test_classes = list(zip(*test_data))

In [40]:
# Naive Bayes
print("Naive Bayes Classifier")
print("Initializing model")
model = NaiveBayesClassifier()
print("Training model")
start_time = time.time()
model.Train(train_data)
print(f"Training dataset size: {len(train_data)}")
print(f"Training time: {time.time() - start_time}s")

print("Testing")
start_time = time.time()
predictions = model.Test(test_sentences)
print(f"Training dataset size: {len(test_sentences)}")
print(f"Testing time: {time.time() - start_time}s")

# print(model.n)
# PrettyPrint(model._priors)
# print()
# PrettyPrint(model._likelihoods)
# [print(x.ljust(40), "|", y.ljust(9), "|", z) for x,y,z in (zip(test_sentences, test_classes, predictions))]

# is_correct = [int(x == test_classes[i]) for i,x in enumerate(predictions)]

print("Confusion Matrix")
PrintConfusionMatrix(ConfusionMatrix(predictions, test_classes, model.classes))

Naive Bayes Classifier
Initializing model
Training model
Training dataset size: 120000
Training time: 50.095990896224976s
Testing
Training dataset size: 30000
Testing time: 369.9198033809662s
Confusion Matrix
           |     0      |     1      |     2     
-------------------------------------------------
0          |    8642    |    829     |    273    
1          |    626     |    8154    |    789    
2          |    619     |    1004    |    9064   

Precisions:
	{
		1:
			0.8164613998197657
		2:
			0.8951214694844953
		0:
			0.8740770709011834
	}
Recalls:
	{
		1:
			0.8521266590030306
		2:
			0.8481332459998129
		0:
			0.8869047619047619
	}
Accuracy:
0.862


Logistic regression with Word2Vec feature extraction and softmax classification function

In [41]:
# Logistic regressiong

print("Logistic Regression Classifier")
print("Initializing model")
model = LogisticRegressionClassifier( featureExtractor=Word2VecExtractor(), 
                                     epochs=1000,
                                     learningRate=0.01)
print("Training model")
start_time = time.time()
model.Train(train_data)
print(f"Training dataset size: {len(train_data)}")
print(f"Training time: {time.time() - start_time}s")

print("Testing")
start_time = time.time()
predictions = model.Test(test_sentences)
print(f"Training dataset size: {len(test_sentences)}")
print(f"Testing time: {time.time() - start_time}s")

# print(model.n)
# PrettyPrint(model._priors)
# print()
# PrettyPrint(model._likelihoods)
# [print(x.ljust(40), "|", y.ljust(9), "|", z) for x,y,z in (zip(test_sentences, test_classes, predictions))]

# is_correct = [int(x == test_classes[i]) for i,x in enumerate(predictions)]
print("Confusion Matrix")
PrintConfusionMatrix(ConfusionMatrix(predictions, test_classes, model.classes))

Logistic Regression Classifier
Initializing model
Training model
[GradDesc] epochs = 0
[GradDesc] epochs = 100
[GradDesc] epochs = 200
[GradDesc] epochs = 300
[GradDesc] epochs = 400
[GradDesc] epochs = 500
[GradDesc] epochs = 600
[GradDesc] epochs = 700
[GradDesc] epochs = 800
[GradDesc] epochs = 900
Training dataset size: 120000
Training time: 282.44109439849854s
Testing
Training dataset size: 30000
Testing time: 11.4303560256958s
Confusion Matrix
           |     0      |     1      |     2     
-------------------------------------------------
0          |    9049    |    988     |    567    
1          |    388     |    7796    |    675    
2          |    450     |    1203    |    8884   

Precisions:
	{
		1:
			0.7806147992390107
		2:
			0.8773454473632234
		0:
			0.9152422372812784
	}
Recalls:
	{
		1:
			0.880009030364601
		2:
			0.8431242289076587
		0:
			0.8533572236891739
	}
Accuracy:
0.8576333333333334


# Some observations
- It can be seen that the naive bayes model has a slightly higher accuracy got this dataset while being slower over all. However, it is to note that the logistic regression has many hyperparameters (epochs, learning rate, number of feature vectors etc) the tuning of which will likely improve the model performace considerably. Perhaps a cross validation algorithm may prove useful
- The Naive Bayes model spends more time during training than tesing. While the logistic regression model spends most of its time training and testing in rather quick. This may likely be a difference between a generative and a discriminative model.