In [None]:
import numpy as np
import pandas as pd

import string
from sklearn.model_selection import train_test_split
from nltk import word_tokenize

We will classify poems of two authors by creating two Markov models and then feeding their predictions into a Naive Bayes classifier.

In [None]:
!wget -nc https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/edgar_allan_poe.txt
!wget -nc https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/robert_frost.txt

In [None]:
input_files = [
  'edgar_allan_poe.txt',
  'robert_frost.txt',
]

In [None]:
# collect data into lists
input_texts = []
labels = []

for label, f in enumerate(input_files):
  print(f"{f} corresponds to label {label}")

  for line in open(f):
    line = line.rstrip().lower()
    if line:
      # remove punctuation
      line = line.translate(str.maketrans('', '', string.punctuation))

      input_texts.append(line)
      labels.append(label)

In [None]:
input_train, input_test, Y_train, Y_test = train_test_split(input_texts, labels, random_state=42)

Y_train = np.array(Y_train)
Y_test = np.array(Y_test)

In [None]:
vocab = []
X_train = []
tok2ind = {}
current_ind = 0

for line in input_train:
    tokenised_line = word_tokenize(line)
    X_train.append(tokenised_line)
    for tok in tokenised_line:
        if tok not in vocab:
            vocab.append(tok)
            tok2ind[tok] = current_ind
            current_ind += 1

ind2tok = {val: it for it, val in tok2ind.items()}

D = len(vocab)
print(f'Vocab length: {D}')

In [None]:
X_test = [word_tokenize(line) for line in input_test]

Convert tokens to indices:

In [None]:
X_train_ind = [[tok2ind[tok] for tok in line] for line in X_train]
# for test set, we give a special index to tokens which are not in the train vocabulary - D+1
X_test_ind = [[tok2ind[tok] if tok in list(tok2ind.keys()) else D for tok in line] for line in X_test]

vocab_ind = [tok2ind[tok] for tok in vocab]

N = len(X_train_ind) # number of train sequences

Now calculate Pi_i and A_ij using simple counting:

In [None]:
# counts(state_1 = i)
counts_initial_0 = np.zeros(D+1) # +1 to cover the special tokens not present in train set
counts_initial_1 = np.zeros(D+1) # +1 to cover the special tokens not present in train set
# counts(i->j)
counts_transition_0 = np.zeros((D+1, D+1))
counts_transition_1 = np.zeros((D+1, D+1))
# counts(i)
counts_words_0 = np.zeros(D+1)
counts_words_1 = np.zeros(D+1)

# proceed line by line
for line_no in range(N):
    line = X_train_ind[line_no]
    label = Y_train[line_no] # get the label so that we know which Markov model we are training

    transitions = [line[i:i+2] for i in range(0, len(line)-1)]
    # [6, 34, 97, 12] -> [[6, 34], [34, 97], [97, 12]]

    for ii, jj in transitions:
        if label:
            counts_transition_1[ii, jj] += 1 # fill in the counts(i->j) and counts(i) matrices
            counts_words_1[ii] += 1

            if ii == transitions[0][0]: # fill in the counts(state_1 = i) matrix
                counts_initial_1[ii] += 1

            if jj == transitions[-1][-1]: # counts_words_1[ii] += 1 does not cover the last token in the line, so handle it here
                counts_words_1[jj] += 1
        # and same for the other Markov process
        else:
            counts_transition_0[ii, jj] += 1
            counts_words_0[ii] += 1

            if ii == transitions[0][0]:
                counts_initial_0[ii] += 1

            if jj == transitions[-1][-1]:
                counts_words_0[jj] += 1

In [None]:
# we work with logs for better numerical stability (i.e. underflow)
# take log(term1) - log(term2), not log(term1/term2)
# finally, we do 'add-one' smoothing to prevent taking log(0)
log_pi_0 = np.log(counts_initial_0 + 1) - np.log(N + (D+1))
log_pi_1 = np.log(counts_initial_1 + 1) - np.log(N + (D+1))

log_A_0 = np.log(counts_transition_0 + 1) - np.log(counts_words_0 + (D+1))
log_A_1 = np.log(counts_transition_1 + 1) - np.log(counts_words_1 + (D+1))

Now we have our trained Markov models for class 0 and 1. We need to plug them into Bayes rule. We want:

p(class|x) = p(x|class) * p(class) / p(x)

We ignore the evidence p(x), because we're just interested in the argmax of the LHS. We need to compute the prior p(class). This is unless we are sure that the prior is uniform, but if we have unbalanced classes, this is not true.

In [None]:
# computing the prior
n_0 = (Y_train == 0).sum()
n_1 = (Y_train == 1).sum()

print(f'n_0: {n_0}, n_1: {n_1}')

log_prior_0 = np.log(n_0) - np.log(n_0 + n_1)
log_prior_1 = np.log(n_1) - np.log(n_0 + n_1)

In [None]:
def compute_log_prob_Markov(line: list[int], log_pi, log_A, log_prior = 0):
    """Compute log probs of a sequence according to Markov assumptions.
    line needs to be a list of tokens turned into indices, e.g. 'I like dogs' -> [34, 9, 51]
    """
    transition_probs = 0
    transitions = [line[i:i+2] for i in range(0, len(line)-1)]

    for ii, jj in transitions:
        transition_probs += log_A[ii, jj]
    
    initial_prob = log_pi[line[0]]
    
    return initial_prob + transition_probs + log_prior

In [None]:
compute_log_prob_Markov(X_test_ind[0], log_pi_0, log_A_0, log_prior_0)

In [None]:
def classify_Markov(line: list[int]):
    prob_0 = compute_log_prob_Markov(line, log_pi_0, log_A_0, log_prior_0)
    prob_1 = compute_log_prob_Markov(line, log_pi_1, log_A_1, log_prior_1)

    if prob_0 > prob_1:
        return 0
    else:
        return 1

In [None]:
def evaluate(lines: list[list[int]], labels: list[int]):
    preds = np.array([classify_Markov(line) for line in lines])

    return (preds == labels).mean()

In [None]:
print(f'Accuracy on train set: {evaluate(X_train_ind, Y_train)}')
print(f'Accuracy on test set: {evaluate(X_test_ind, Y_test)}')