In [1]:
from collections import Counter
import copy

import re
import numpy as np

# for testing
from sklearn import naive_bayes, feature_extraction, feature_selection, preprocessing, metrics

# TF-IDF

In [2]:
corpus = ['this is the first document',
          'this document is the second document',
          'and this is the third one',
          'is this the first document']

In [3]:
class TFIDF:
    def __init__(self):
        self.word_counter = None
        self.index_to_word = None
        self.word_to_index = None
        self.document_frequency = None
        self.n_docs = 0
        self.n_vocab = 0

    def _clean_data(self, data: list[str]) -> list[str]:
        """Cleans and preprocesses data by removing non-alphabet characters and lowering case."""
        return [' '.join(re.findall(r'[a-zA-Z]+', sentence)).lower() for sentence in data]

    def _build_vocab(self, cleaned_data: list[str]) -> list[str]:
        """Builds vocabulary and mappings from cleaned data."""
        word_list = [word for sentence in cleaned_data for word in sentence.split()]
        self.word_counter = dict(sorted(Counter(word_list).items()))
        self.index_to_word = {idx: word for idx, word in enumerate(self.word_counter.keys())}
        self.word_to_index = {word: idx for idx, word in enumerate(self.word_counter.keys())}
        self.n_vocab = len(self.word_counter)

    def _calculate_document_frequency(self, cleaned_data: list[str]):
        """Calculates document frequency for each word."""
        self.document_frequency = {word: 0 for word in self.word_to_index.keys()}
        for sentence in cleaned_data:
          splitted_sentence = sentence.split(" ")
          unique_word = np.unique(splitted_sentence)
          for word in unique_word:
            self.document_frequency[word] += 1

    def _calculate_tfidf(self, sentence: str):
        """Calculates the TF-IDF vector for a single sentence."""
        counter = Counter(sentence.split())
        row_sum = 0
        tfidf_vector = np.zeros(self.n_vocab)

        for j in range(self.n_vocab):
            word = self.index_to_word[j]
            # term frequency only caculated from freq on the sentence
            # same as BoW (sklearn references)
            tf = counter.get(word, 0)
            df = self.document_frequency.get(word, 0)
            # references: https://scikit-learn.org/1.5/modules/generated/sklearn.feature_extraction.text.TfidfTransformer.html
            # adding 1 on n and df -> smoothing
            idf = np.log((1 + self.n_docs) / (1 + df)) + 1
            tf_idf = tf * idf

            row_sum += np.square(tf_idf)
            tfidf_vector[j] = tf_idf

        l2_norm = np.sqrt(row_sum)
        if l2_norm > 0:
            tfidf_vector /= l2_norm

        return tfidf_vector

    def fit(self, data):
        """Fits the model on the data"""
        cleaned_data = self._clean_data(data)
        self.n_docs = len(cleaned_data)
        self._build_vocab(cleaned_data)
        self._calculate_document_frequency(cleaned_data)
        return self

    def transform(self, data):
        """Transforms new data based on the already fitted vocabulary and document frequencies."""
        if self.word_to_index is None or self.document_frequency is None:
            raise ValueError("The TFIDF model must be fitted before calling transform.")

        cleaned_data = self._clean_data(data)
        transformed_matrix = np.array([self._calculate_tfidf(sentence) for sentence in cleaned_data])
        return transformed_matrix

    def fit_transform(self, data):
        self.fit(data)
        return self.transform(data)

In [4]:
tfidf = TFIDF()
X_tfidf = tfidf.fit_transform(corpus)
X_tfidf

array([[0.        , 0.46979139, 0.58028582, 0.38408524, 0.        ,
        0.        , 0.38408524, 0.        , 0.38408524],
       [0.        , 0.6876236 , 0.        , 0.28108867, 0.        ,
        0.53864762, 0.28108867, 0.        , 0.28108867],
       [0.51184851, 0.        , 0.        , 0.26710379, 0.51184851,
        0.        , 0.26710379, 0.51184851, 0.26710379],
       [0.        , 0.46979139, 0.58028582, 0.38408524, 0.        ,
        0.        , 0.38408524, 0.        , 0.38408524]])

In [5]:
tfidf_sklearn = feature_extraction.text.TfidfVectorizer()
X_sklearn = tfidf_sklearn.fit_transform(corpus).toarray()
X_sklearn

array([[0.        , 0.46979139, 0.58028582, 0.38408524, 0.        ,
        0.        , 0.38408524, 0.        , 0.38408524],
       [0.        , 0.6876236 , 0.        , 0.28108867, 0.        ,
        0.53864762, 0.28108867, 0.        , 0.28108867],
       [0.51184851, 0.        , 0.        , 0.26710379, 0.51184851,
        0.        , 0.26710379, 0.51184851, 0.26710379],
       [0.        , 0.46979139, 0.58028582, 0.38408524, 0.        ,
        0.        , 0.38408524, 0.        , 0.38408524]])

In [6]:
assert np.array_equal(X_tfidf, X_sklearn)

# Multinomial NB

In [7]:
class MultinomialNB:
    def __init__(self, alpha: float = 1.0):
        self.alpha = alpha

    def fit(self, X: np.ndarray, y: np.ndarray):
        self.classes = np.unique(y)
        self.class_priors = {}
        self.feature_likelihoods = {}
        self.join_log_likelihoods = []
        self.alpha = 1.0  # additive laplacian smoothing

        # for each class calculate prior and likelihoods
        for cls in self.classes:
            X_cls = X[y == cls]

            # class prior = total this class / total dataset
            self.class_priors[cls] = X_cls.shape[0] / X.shape[0]

            # feature likelihoods = array with cols eq to total feature
            # for each feature, sum all occurrence of that feature
            # divide it by (sum features in that class + total feature)
            self.feature_likelihoods[cls] = (np.sum(X_cls, axis=0) + (self.alpha)) / (
                np.sum(X_cls) + (self.alpha * X.shape[1])
            )

    def predict(self, X):
        predictions = []

        # for each data
        for x in X:
            posteriors = {}

            # for each class
            for cls in self.classes:
                # normal
                # posterior = (self.class_priors[cls])
                # posterior *= np.prod(np.power(self.feature_likelihoods[cls], x))

                # problem with normal is too small result, because fraction * fraction
                # for example 0.3 * 0.5 * ......
                # log version
                log_posterior = np.log(self.class_priors[cls])

                # math time
                # log(A^B) = log(A) * B
                # log(likelihoods ^ x ) = log(likelihoods) * x
                # log(a*b) = log(a) + log(b), then:
                # np.prod => np.sum
                log_posterior += np.sum(np.log(self.feature_likelihoods[cls]) * x)

                # add to class posterior
                posteriors[cls] = np.float64(log_posterior)

            # predict the class by choosing the highst log posterior
            self.join_log_likelihoods.append(list(posteriors.values()))
            predictions.append(max(posteriors, key=posteriors.get))
            # same as np.argmax(list(posteriors.values()))

        return np.array(predictions)

In [8]:
rng = np.random.RandomState(1)
X = rng.randint(5, size=(6, 100))
y = np.array([1, 2, 3, 4, 5, 6])

In [9]:
mnb = MultinomialNB()

In [10]:
mnb.fit(X, y)
print(mnb.predict(X[2:3]))

[3]


In [11]:
clf = naive_bayes.MultinomialNB()
clf.fit(X, y)
pred = (clf.predict_joint_log_proba(X[2:3]))
pred

array([[-980.3129722 , -981.88514255, -911.99414188, -988.58679562,
        -997.02412521, -995.70414588]])

In [12]:
jll = np.array(mnb.join_log_likelihoods)
jll

array([[-980.3129722 , -981.88514255, -911.99414188, -988.58679562,
        -997.02412521, -995.70414588]])

In [13]:
assert np.allclose(jll, pred)

# Chi-Square

In [14]:
class LabelBinarizer:
    def __init__(self):
        self.rank_mapping = None

    def fit(self, y):
        unique_label = np.unique(y)
        self.rank_mapping = {label: rank for rank, label in enumerate(unique_label)}
        return self

    def transform(self, y):
        if len(self.rank_mapping) == 2:
            result = np.zeros((len(y), 1))
            for i in range(len(y)):
                if y[i] == list(self.rank_mapping.keys())[0]:
                    result[i] = 0
                else:
                    result[i] = 1
            return result

        result = np.zeros((len(y), len(self.rank_mapping)))
        for i in range(len(y)):
            if self.rank_mapping.get(y[i], None) is not None:
                col = self.rank_mapping.get(y[i])
                result[i][col] = 1
        return result

    def fit_transform(self, y):
        self.fit(y)
        return self.transform(y)

In [15]:
label_binarizer = LabelBinarizer()

In [16]:
lb = preprocessing.LabelBinarizer()

In [17]:
y = [1, 2, 6, 4, 2]
assert np.array_equal(label_binarizer.fit_transform(y), lb.fit_transform(y))

In [18]:
y = ['yes', 'no', 'no', 'yes']
assert np.array_equal(label_binarizer.fit_transform(y), lb.fit_transform(y))

In [19]:
def chi_square(X: np.ndarray, y: np.ndarray):
    y = LabelBinarizer().fit_transform(y)

    observed = np.dot(y.T, X)

    class_prob = np.mean(y, axis=0).reshape(1, -1)
    feature_count = np.sum(X, axis=0).reshape(1, -1)
    expected = np.dot(class_prob.T, feature_count)

    chi_square = np.sum((observed - expected) ** 2 / expected, axis=0)
    return chi_square

In [20]:
X = np.array([[1, 1, 3],
              [0, 1, 5],
              [5, 4, 1],
              [6, 6, 2],
              [1, 4, 0],
              [0, 0, 0]])
y = np.array([1, 1, 0, 0, 2, 2])

In [21]:
chi_square = chi_square(X, y)
chi_square


array([15.38461538,  6.5       ,  8.90909091])

In [22]:
chi2_stats, _ = feature_selection.chi2(X, y)
chi2_stats

array([15.38461538,  6.5       ,  8.90909091])

In [23]:
assert np.array_equal(chi_square, chi2_stats)

In [24]:
def split_data(X: np.ndarray, y: np.ndarray, test_size=0.2, random_state=None):
    """
    Split data into train, test
    """
    if random_state:
        np.random.seed(random_state)

    indices = np.random.permutation(X.shape[0])
    test_size = int(X.shape[0] * test_size)
    test_indices = indices[:test_size]
    train_indices = indices[test_size:]

    X_train, X_test = X[train_indices], X[test_indices]
    y_train, y_test = y[train_indices], y[test_indices]

    return X_train, X_test, y_train, y_test

In [25]:
def accuracy_score(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """
    Calculate accuracy score. 
    Formula: (TP + TN) / (TP + FP + TN + FN)
    """
    return float(np.mean(y_true == y_pred))

In [26]:
y_pred = [0, 2, 1, 3]
y_true = [0, 1, 2, 3]

In [27]:
assert metrics.accuracy_score(y_true, y_pred) == accuracy_score(np.array(y_true), np.array(y_pred))

In [28]:
def cross_validation(
    model: MultinomialNB,
    X: np.ndarray,
    y: np.ndarray,
    k_folds: int = 5,
    random_state: int = 42,
) -> list[float]:
    """
    Cross validation for MultinomialNB model
    """
    # shuffle data
    np.random.seed(random_state)
    indices = np.arange(len(y))
    np.random.shuffle(indices)

    X, y = X[indices], y[indices]

    # split data into k folds
    fold_size = len(y) // k_folds
    scores = []

    for i in range(k_folds):
        start = i * fold_size
        end = start + fold_size if i != k_folds - 1 else len(y)

        # per iteration, split data into train and validation
        X_valid, y_valid = X[start:end], y[start:end]
        X_train = np.concatenate([X[:start], X[end:]], axis=0)
        y_train = np.concatenate([y[:start], y[end:]], axis=0)

        # model to avoid data leakage between folds
        model_clone = copy.deepcopy(model)

        # tran and evaluate
        model_clone.fit(X_train, y_train)
        y_pred = model_clone.predict(X_valid)
        score = accuracy_score(y_valid, y_pred)
        scores.append(score)

    return scores

In [29]:
X = rng.randint(5, size=(10, 100))
y = np.array([1, 2, 3, 4, 1, 2, 3, 4, 1, 2])

In [30]:
mnb_model = MultinomialNB()
k_folds = 5
cv = cross_validation(mnb_model, X, y, k_folds=k_folds)
cv

[0.5, 0.5, 0.0, 0.5, 0.0]

In [31]:
assert len(cv) == k_folds