# CSC5035Z Natural Language Processing
# Assignment 1: Naïve Bayes for Text Classification

**Author: Roger Bukuru**

## How to guide
- First Ensure the following packages are installed on your system
  - **numpy** 
    - installation command: **pip install numpy**
  - **pandas** 
    - installation command: **pip install pandas**
  - **sklearn** 
    - installation command: **pip install sklearn**
- Then sequentially run each code block, the code blocks perform the following tasks:
   - Downloads the Afrisenti dataset
   - Selects the language of choice (Kinyarwanda)
   - Cleans the data
   - Creates a vocabulary
   - Applies word-based and BPE tokenization
   - Implements a simple binary text vectorization
   - Implements the Naïve Bayes classifier
   - Trains classifier based on word-based and BPE features obtained from simple binary text vectorization
   - Predict on both word-based and BPE features
   - Evaluate model on a development data set



# Installations, Imports and Downloads


In [1]:
import os
import warnings
import re
import numpy as np
warnings.filterwarnings("ignore", category = UserWarning)
from collections import defaultdict
import pandas as pd
from sklearn.metrics import classification_report

FS = (8,4) # figure size
RS = 264


In [2]:
PROJECT_DIR = os.getcwd() + '/afrisent-semeval-2023'
print('Current directiory: ', PROJECT_DIR)
PROJECT_GITHUB_URL = 'https://github.com/afrisenti-semeval/afrisent-semeval-2023.git'

if not os.path.isdir(PROJECT_DIR):
  !git clone {PROJECT_GITHUB_URL}
else:
  %cd {PROJECT_DIR}
  !git pull {PROJECT_GITHUB_URL}

Current directiory:  /Users/rogerbukuru/Documents/UCT Masters/MSc Statistics and Data Science/NLP-CSC5035Z/NLPTutsAssignments/Assignment-I/afrisent-semeval-2023
/Users/rogerbukuru/Documents/UCT Masters/MSc Statistics and Data Science/NLP-CSC5035Z/NLPTutsAssignments/Assignment-I/afrisent-semeval-2023
From https://github.com/afrisenti-semeval/afrisent-semeval-2023
 * branch            HEAD       -> FETCH_HEAD
Already up to date.


# Data Loading


In [3]:
# Choose language 
language = 'kin'

In [4]:
# Load data
DATA_DIR = f'{PROJECT_DIR}/data/{language}'
print('Data directory: ', DATA_DIR)

train_df = pd.read_csv(f'{DATA_DIR}/train.tsv', sep='\t', names=['text', 'label'], header=0)
dev_df = pd.read_csv(f'{DATA_DIR}/dev.tsv', sep='\t', names=['text', 'label'], header=0)
test_df = pd.read_csv(f'{DATA_DIR}/test.tsv', sep='\t', names=['text', 'label'], header=0)

print('Train shape: ', train_df.shape)
print('Dev shape: ', dev_df.shape)
print('Test shape: ', test_df.shape)

# Display data
train_df.sample(n=100)

Data directory:  /Users/rogerbukuru/Documents/UCT Masters/MSc Statistics and Data Science/NLP-CSC5035Z/NLPTutsAssignments/Assignment-I/afrisent-semeval-2023/data/kin
Train shape:  (3302, 2)
Dev shape:  (827, 2)
Test shape:  (1026, 2)


Unnamed: 0,text,label
50,@user Wa nkozi y ibibi we Umwanzi w Imana😂😂😂😂,negative
2892,@user Ni byagaciri Kuri nyirabyo ni igihugu mu...,positive
3050,"Isabukuru nziza databuja wanjye ! My blood,Uwi...",positive
1372,@user @user Umuntu ufite @user ikaba yizimya i...,neutral
2071,@user Barayampompa cyane. Uzambwire aho wumvis...,neutral
...,...,...
3291,@user Congratulations... Amatora y'inzego z'ib...,positive
3063,Uri ikimenyetso cy'uko ubuzima bwatsinze urupf...,positive
1364,Turi kuganira ku Iterambere ry'umujyi wa Musan...,neutral
2636,Umunsi mwiza w'ubwigenge (independance day). K...,positive


# Data Cleaning

In [5]:
# Discard neutral examples
train_df = train_df[train_df['label'] != 'neutral']
dev_df = dev_df[dev_df['label'] != 'neutral']
test_df = test_df[test_df['label'] != 'neutral']

In [6]:
def clean(text):
    # Replace URLS with [URL]
    text = re.sub(r'http\S+', '[URL]', text)

    # Replace numbers with [NUM]
    text = re.sub(r'\d+', '[NUM]', text)
    
    # Replace @user with ""
    text = re.sub(r'@user\b', '[USR]', text)

    # Remove trailing spaces
    text = text.strip()

    return text

train_df['text'] = train_df['text'].apply(clean)
dev_df['text'] = dev_df['text'].apply(clean)
test_df['text'] = test_df['text'].apply(clean)

# Construct Vocabulary

In [7]:
# Count number of tokens in corpus
def count_tokens(sentences):
    """
    Count number of tokens in corpus

    param: sentences: list of list of tokens e.g. [['This', 'is', 'a', 'sentence'], ['This', 'is', 'another', 'sentence'], ...]
    return:
        count: number of tokens in corpus
    """
    total_tokens = 0
    for sentence in sentences:
        total_tokens += len(sentence)
    return total_tokens

In [8]:
# Collect type counts in corpus
def create_type_counts(sentences):
    """
    Count number of types in corpus

    param: sentences: list of list of tokens e.g. [['This', 'is', 'a', 'sentence'], ['This', 'is', 'another', 'sentence'], ...]
    return:
        type2count: dictionary of type counts in corpus e.g. {'This': 2, 'sentence': 2, ...}
    """
    type2count = {}
    for sentence in sentences:
        for type_ in sentence:
            if type_ not in type2count:
                type2count[type_] = 1
            else:
                current_count = type2count[type_]
                type2count[type_] = current_count +1
    return type2count

In [9]:
# Create vocabulary
def create_vocabulary(type2count, min_count):
    """
    This function creates an indexed vocabulary from vocabulary counts and returns it as a list and a dictionary.

    param:
        type2count: dictionary of type counts in corpus e.g. {'This': 2, 'sentence': 2, ...}
        min_count: minimum count of a word to be included in the vocabulary
    return:
        index2type: list of words in the vocabulary e.g. ['word1', 'word2', 'word3', ...]
        type2index: dictionary mapping words to their index in the index2type vocabulary e.g. {'word1': 0, 'word2': 1, 'word3': 2, ...}
    """
    index2type = []
    type2index = {}
    for type_, count in type2count.items():
        if(count >= min_count):
            index2type.append(type_)
            type2index[type_] = len(index2type) - 1
    return index2type, type2index




# Tokenization

## Word Based Tokenization

In [10]:
def whitespace_tokenize(sentences):
    return [sentence.split() for sentence in sentences]

## Byte-Pair Encoding (BPE) Tokenization


In [11]:
class BPETokenizer():

    def __init__(self, sentences, vocab_size):
        """
        Initialize the BPE tokenizer.

        param:
            sentences (list[str]): list of list of tokens e.g. [['This', 'is', 'a', 'sentence'], ['This', 'is', 'another', 'sentence'], ...]
            vocab_size (int): The desired vocabulary size after training.
        """
        self.sentences = sentences
        self.vocab_size = vocab_size
        self.word_freqs = defaultdict(int)
        self.splits = {}
        self.merges = {}


    def train(self):
        """
        Train the BPE tokenizer by iteratively merging the most frequent pairs of symbols.

        return:
            dict: A dictionary of merges in the format {(a, b): 'ab'}, where 'a' and 'b' are symbols merged into 'ab'.
        """
        # Split corpus
        for sentence in self.sentences:
            for word in sentence:
                self.splits[word] = [char for char in word]
                    
        for i in range(self.vocab_size):
            self.compute_pair_freqs() # compute adjacent pair frequencies
            pair, _ = list(self.word_freqs.items())[0] # most frequent pair
            self.merge_pair(pair[0], pair[1])
            self.merges[pair] = pair[0] + pair[1]
        return self.merges


    def compute_pair_freqs(self):
        """
        Compute the frequency of each pair of symbols in the corpus.

        return:
            dict: A dictionary of pairs and their frequencies in the format {(a, b): frequency}.
        """
        pair_freqs = defaultdict(int)
        for _, split in self.splits.items():
            for i in range(len(split)-1):
                pair = (split[i], split[i+1])
                if pair not in pair_freqs:
                    pair_freqs[pair] = 1
                else:
                    pair_freqs[pair] += 1
        self.word_freqs = pair_freqs
        self.word_freqs = dict(sorted(self.word_freqs.items(), key=lambda x: x[1], reverse=True)) # sort from max to min count
        return self.word_freqs
        
    def merge_pair(self, a, b):
        """
        Merge the given pair of symbols in all words where they appear adjacent.

        param:
            a (str): The first symbol in the pair.
            b (str): The second symbol in the pair.

        return:
            dict: The updated splits dictionary after merging.
        """
        pair = (a,b)
        # Check if valid pair
        if pair in self.word_freqs:
            new_token = a+b
            for word, split in self.splits.items():
                for i in range(len(split)-1):
                    if split[i] == a and split[i+1] == b:
                       split[i] = new_token
                       new_split = list(filter(lambda x: x not in [b], split))
                       self.splits[word] = new_split
        return self.splits

    def tokenize(self, text):
        """
        Tokenize a given text using the trained BPE tokenizer.

        param:
            text (str): The text to be tokenized.

        return:
            list[str]: A list of tokens obtained after applying BPE tokenization.
        """

        pre_tokenized_text = text.split()
        splits_text = [[l for l in word] for word in pre_tokenized_text]

        for pair, merge in self.merges.items():
            for idx, split in enumerate(splits_text):
                i = 0
                while i < len(split) - 1:
                    if split[i] == pair[0] and split[i + 1] == pair[1]:
                        split = split[:i] + [merge] + split[i + 2 :]
                    else:
                        i += 1
                splits_text[idx] = split
        result = sum(splits_text, [])
        return result

# Feature Extraction

## Simple Binary Text Vectorization

In [12]:
def simple_binary_vectorization(sentences, type2index):
    """
    Binary text-vectorization of a list of sentences.

    param: 
     sentences:  list of list of tokens e.g. [['This', 'is', 'a', 'sentence'], ['This', 'is', 'another', 'sentence'], ...]
     type2index: dictionary mapping words to their index in the vocabulary e.g. {'word1': 0, 'word2': 1, 'word3': 2, ...}
    return: 2D NumPy array of a simple-binary encoded sentences.
    """
    vectors = []
    unk_index = type2index['<UNK>']
    for sentence in sentences:
        vector = np.zeros(len(type2index.items()))
        for word in sentence:
            if word in type2index: 
             word_index = type2index[word]
             vector[word_index] = 1
            else: # If word does not exist we use the index of the UNK token.
                vector[unk_index] = 1
        vectors.append(np.array(vector))
    return np.array(vectors)

     

### Word-Based Tokenization Feature Extraction


In [13]:
# Store training data text as list of tweets
train_corpus = train_df['text'].tolist()
tokenized_train_corpus = whitespace_tokenize(train_corpus)
num_tokens = count_tokens(tokenized_train_corpus)
print('Number of tokens in corpus: ', num_tokens)
type2count = create_type_counts(tokenized_train_corpus)

# Sort types by counts
type2count = dict(sorted(type2count.items(), key=lambda x: x[1], reverse=True))
index2type, type2index = create_vocabulary(type2count, min_count=1)
print("Vocab size:", len(index2type))

# We add a special token for unknown words and padding (to make all sentences in the training batch the same length)
type2index['<UNK>'] = len(index2type)
index2type.append('<UNK>')
type2index['<PAD>'] = len(index2type)
index2type.append('<PAD>')
x_train_word_based_feature_matrix = simple_binary_vectorization(tokenized_train_corpus, type2index)

dev_corpus = dev_df['text'].tolist()
tokenized_dev_corpus = whitespace_tokenize(dev_corpus)
x_dev_word_based_feature_matrix = simple_binary_vectorization(tokenized_dev_corpus, type2index)

test_corpus = test_df['text'].tolist()
tokenized_test_corpus = whitespace_tokenize(test_corpus)
x_test_word_based_feature_matrix = simple_binary_vectorization(tokenized_test_corpus, type2index)



Number of tokens in corpus:  30894
Vocab size: 13674


### BPE Tokenization Feature Extraction

In [14]:
# Train BPE
bpe = BPETokenizer(tokenized_train_corpus, vocab_size=1000 )
merges = bpe.train()


# Apply to our dataset
train_df['bpe_text'] = train_df['text'].apply(lambda x: ' '.join(bpe.tokenize(x)))
dev_df['bpe_text'] = dev_df['text'].apply(lambda x: ' '.join(bpe.tokenize(x)))
test_df['bpe_text'] = test_df['text'].apply(lambda x: ' '.join(bpe.tokenize(x)))
train_df.head()

bpe_train_corpus = train_df['bpe_text'].tolist()
tokenized_bpe_train_corpus = whitespace_tokenize(bpe_train_corpus)
num_tokens = count_tokens(tokenized_bpe_train_corpus)
print('Number of tokens in corpus: ', num_tokens)
bpe_type2count = create_type_counts(tokenized_bpe_train_corpus)

# Sort types by counts
bpe_type2count = dict(sorted(bpe_type2count.items(), key=lambda x: x[1], reverse=True))
bpe_index2type, bpe_type2index = create_vocabulary(bpe_type2count, min_count=1)
print("Vocab size:", len(bpe_index2type))

bpe_type2index['<UNK>'] = len(bpe_index2type)
bpe_index2type.append('<UNK>')
bpe_type2index['<PAD>'] = len(bpe_index2type)
bpe_index2type.append('<PAD>')

x_train_bpe_feature_matrix = simple_binary_vectorization(tokenized_bpe_train_corpus, bpe_type2index)

bpe_dev_corpus = dev_df['bpe_text'].tolist()
tokenized_bpe_dev_corpus = whitespace_tokenize(bpe_dev_corpus)
x_dev_bpe_feature_matrix = simple_binary_vectorization(tokenized_bpe_dev_corpus, bpe_type2index)

bpe_test_corpus = test_df['bpe_text'].tolist()
tokenized_bpe_test_corpus = whitespace_tokenize(bpe_test_corpus)
x_test_bpe_feature_matrix = simple_binary_vectorization(bpe_test_corpus, bpe_type2index)


Number of tokens in corpus:  80362
Vocab size: 1171


# Model Training


In [15]:
class NaiveBayesClassifier():
    
    def __init__(self, x_train, y_train, type2index):
        self.class_priors = {}
        self.likelihoods_probs = {}
        self.word_counts = {}
        self.vocabulary = type2index
        self.Y_train = y_train
        self.X_train = x_train
        self.vocab_size = len(type2index.items())
    
    def train(self):
        """
         Trains the Naive Bayes model by computing class priors and likelihood probabilities.
        """
        # Calculate class prior probabilities
        self.compute_prior_probs()
        # Calculate likelihood probabilities
        self.compute_likelihood_probs() 
    
    def compute_prior_probs(self):
        """
        Calculates the prior probability for each class in the training data.
        """
        positive_sentences = 0
        for label in self.Y_train:
            if label == 1:
                positive_sentences += 1
        self.class_priors[1] = positive_sentences/len(self.Y_train)
        self.class_priors[0] = 1 - self.class_priors[1]
        
    def compute_likelihood_probs(self):
        """
        Calculates the likelihood probabilities for each word in the vocabulary, given each class, using Laplace smoothing.
        """
        self.word_counts = {0: np.zeros(self.vocab_size), 1: np.zeros(self.vocab_size)}
        for sentence_vector, label  in zip(self.X_train, self.Y_train):
            self.word_counts[label] += sentence_vector
        for class_ in self.word_counts:
            # Compute based on Laplace Transform
            self.likelihoods_probs[class_] = (self.word_counts[class_] + 1)/ (sum(self.word_counts[class_]) + self.vocab_size)
            
         
    def predict(self,x):
        """
        Predicts the class labels for a set of input documents.

        param:
            x: A list or array of document feature vectors (binary vectors representing word occurrences).

        return:
            A list of predicted class labels (0 or 1).
        """
        predictions = []
        for sentence_vector in x: # text vectorization of sentence
            posteriors = {}
            for class_  in self.class_priors:
                log_posterior = np.log(self.class_priors[class_])
                for i in range(len(sentence_vector)):
                    if sentence_vector[i] == 1:
                        log_posterior += np.log(self.likelihoods_probs[class_][i])
                posteriors[class_] = log_posterior
            predicted_class = max(posteriors, key=posteriors.get)
            predictions.append(predicted_class)
        return predictions
    
        

## Word-Based Tokenization: Model Training

In [16]:
y_train = train_df["label"]
y_train = np.array(y_train.map({"positive": 1, "negative": 0}))
x_train = x_train_word_based_feature_matrix

# Train Model
nb_word_based_tokenization = NaiveBayesClassifier(x_train,y_train, type2index)
nb_word_based_tokenization.train()


## BPE Tokenization: Model Training

In [17]:
x_train_bpe = x_train_bpe_feature_matrix

nb_bpe = NaiveBayesClassifier(x_train_bpe,y_train, bpe_type2index)
nb_bpe.train()

# Prediction

In [18]:
def prediction_accuracy(predictions, y_test):
    """
    Calculates the accuracy of a set of predictions against the true labels.
    
    param:
        predictions: A list or array containing predicted class labels (e.g., 0 or 1).
        y_test: A list or array containing the true class labels.

    return:
        float: The accuracy of the predictions, expressed as a percentage (0 to 100).
    """
    correct_predictions = 0
    for i in range(len(predictions)):
        if wb_predictions[i] == y_test[i]:
            correct_predictions += 1
    prediction_acc = correct_predictions/len(y_test)
    return prediction_acc*100

## Word-Based Tokenization Prediction

In [19]:
y_test = test_df["label"]
y_test = np.array(y_test.map({"positive": 1, "negative": 0}))
y_test = y_test.tolist()
x_test = x_test_word_based_feature_matrix
wb_predictions = nb_word_based_tokenization.predict(x_test)
prediction_accuracy(wb_predictions, y_test)


80.09478672985783

## BPE Tokenization Prediction

In [20]:
# Predict
x_test_bpe = x_test_bpe_feature_matrix
bpe_test_predictions = nb_bpe.predict(x_test_bpe)
prediction_accuracy(bpe_test_predictions, y_test)

80.09478672985783

# Model Evaluation

We evaluate the performance of our model on a development set.

## Word-Based Tokenization Model Evaluation

In [21]:
y_dev = dev_df["label"]
y_dev = np.array(y_dev.map({"positive": 1, "negative": 0}))
y_dev = y_dev.tolist()
x_dev = x_dev_word_based_feature_matrix

wb_eval_predictions = nb_word_based_tokenization.predict(x_dev)
print(classification_report(y_dev, wb_eval_predictions))


              precision    recall  f1-score   support

           0       0.78      0.81      0.80       287
           1       0.75      0.72      0.73       225

    accuracy                           0.77       512
   macro avg       0.76      0.76      0.76       512
weighted avg       0.77      0.77      0.77       512


## BPE Tokenization Model Evaluation

In [22]:
x_dev_bpe = x_dev_bpe_feature_matrix

bpe_eval_predictions = nb_word_based_tokenization.predict(x_dev_bpe)
print(classification_report(y_dev, bpe_eval_predictions))

              precision    recall  f1-score   support

           0       0.55      0.73      0.63       287
           1       0.41      0.24      0.30       225

    accuracy                           0.52       512
   macro avg       0.48      0.49      0.47       512
weighted avg       0.49      0.52      0.49       512
