In [1]:
import numpy as np
import matplotlib.pyplot as plt
import string
from sklearn.model_selection import train_test_split

In [2]:
input_files = ['edgar_allen_poe.txt', 'robert_frost.txt']

In [21]:
input_texts = []
labels = []

# Use enumerate to get the index of the files as well
# First file -> index 0 and so on
for label, f in enumerate(input_files):
    for line in open(f, encoding='utf-8'):
        # Remove new line by rstrip
        line = line.rstrip().lower()
        if line:
            # Remove punctuation
            line = line.translate(str.maketrans('','', string.punctuation))
            input_texts.append(line)
            labels.append(label)

In [22]:
train_text , test_text, Ytrain, Ytest = train_test_split(input_texts, labels)

Check how many train and test sets do we have

In [23]:
len(Ytrain), len(Ytest)

(1615, 539)

Map word to index

In [24]:
idx = 1
# Unknown word -> Data not in train set but in test set
word2idx = {
    '<unk>' : 0
}

In [25]:
for text in train_text:
    # Tokenize the line by splitting it into words
    tokens = text.split()
    for token in tokens:
        if token not in word2idx:
            word2idx[token] = idx
            idx += 1

In [37]:
word2idx['as']

1

In [27]:
# convert word as string into integer as the mapping

train_text_int = []
test_text_int = []

# Go through every line in the training set
for text in train_text:
    tokens = text.split()
    line_as_int = [word2idx[token] for token in tokens]
    train_text_int.append(line_as_int)

for text in test_text:
    tokens = text.split()
    # If word not in training set, put 0 
    line_as_int = [word2idx.get(token,0) for token in tokens]
    test_text_int.append(line_as_int)

In [28]:
# Init A and Pi Matrices for both models
# A => Transition matrix from V states -> V states
# Pi => Prior Probability -> V states
#  N markov models for n authors
V = len(word2idx)

# We use ones here as the one-hot smoothing
A0 = np.ones((V,V))
pi0 = np.ones(V)

A1 = np.ones((V,V))
pi1 = np.ones(V)

In [29]:
def compute_counts(text_as_int, A, pi):
    for tokens in text_as_int:
        last_idx = None
        for idx in tokens:
            if last_idx is None:
                # First word of the sentence -> Populate pi
                pi[idx] += 1
            else:
                # the last word exists, so count a transition
                A[last_idx, idx] += 1
            # Set current word as last visited word
            last_idx = idx

# Zip method aggregrates to a tuple -> https://www.programiz.com/python-programming/methods/built-in/zip
compute_counts([t for t,y in zip(train_text_int, Ytrain) if y == 0], A0, pi0)
compute_counts([t for t,y in zip(train_text_int, Ytrain) if y == 1], A1, pi1)


In [30]:
A0 /= A0.sum(axis=1, keepdims= True)
pi0 /= pi0.sum()

A1 /= A1.sum(axis=1, keepdims=True)
pi1 /= pi1.sum()

In [31]:
# log A and pi since we don't need the actual probs
logA0 = np.log(A0)
logpi0 = np.log(pi0)

logA1 = np.log(A1)
logpi1 = np.log(pi1)

In [32]:
# compute priors
# How many samples belong to set 0/1 in the training set
count0 = sum(y == 0 for y in Ytrain)
count1 = sum(y == 1 for y in Ytrain)
total = len(Ytrain)
# compute prior probability
p0 = count0 / total
p1 = count1 / total
# need log probablity
logp0 = np.log(p0)
logp1 = np.log(p1)
p0, p1

(0.3430340557275542, 0.6569659442724458)

We can see the class distribution is imbalanced

In [39]:
class Classifier:
    def __init__(self, logAs, logpis, logpriors):
        self.logAs = logAs
        self.logpis = logpis
        self.logpriors = logpriors
        self.K = len(logpriors)

    # Compute how likely does a row belongs to a certain class (author)
    def _compute_log_likelihood(self, input_, class_):
        logA = self.logAs[class_]
        logpi = self.logpis[class_]

        last_idx = None
        logprob = 0

        for idx in input_:
            if last_idx is None:
                logprob += logpi[idx]
            else:
                logprob += logA[last_idx, idx]
            last_idx = idx

        return logprob
    
    def predict(self, inputs):
        # arrays to store the predictions
        predictions = np.zeros((len(inputs)))
        for i, input_ in enumerate(inputs):
            posteriors = [self._compute_log_likelihood(input_, c) + self.logpriors[c] for c in range(self.K)]
            pred = np.argmax(posteriors)
            predictions[i] = pred
        return predictions


In [40]:
# Classifier (Transition matrix, initial prob, prior prob)
clf = Classifier([logA0, logA1], [logpi0, logpi1], [logp0, logp1])
Ptrain = clf.predict(train_text_int)
print(f"Train acc: {np.mean(Ptrain == Ytrain)}")

Train acc: 0.9962848297213622


In [41]:
Ptest = clf.predict(test_text_int)
print(f"Test acc: {np.mean(Ptest == Ytest)}")

Test acc: 0.8589981447124304


Our dataset has imbalanced count 33% - 66%

Use a metric that takes it into account -> e.g Confusion Matrix, f1 score

In [42]:
from sklearn.metrics import confusion_matrix, f1_score

In [43]:
cm = confusion_matrix(Ytrain, Ptrain)
cm

array([[ 548,    6],
       [   0, 1061]], dtype=int64)

In [45]:
cm_test = confusion_matrix(Ytest, Ptest)
cm_test

array([[ 96,  68],
       [  8, 367]], dtype=int64)

In [46]:
f1_score(Ytrain, Ptrain)

0.9971804511278196

In [47]:
f1_score(Ytest, Ptest)

0.9061728395061728