In [1]:
# https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/edgar_allan_poe.txt
# https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/robert_frost.txt

## Importing libraries

In [2]:
import numpy as np
import string

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score

## Preparing data

In [3]:
input_files = [
  'datasets/edgar_allan_poe.txt',
  'datasets/robert_frost.txt',
]

input_txts = []
labels = []

for label, data in enumerate(input_files):
    print(f'{data} belongs to the labels {label}')
    with open(data, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip().lower()
            line = line.translate(str.maketrans('', '', string.punctuation))
            if line != '':
                input_txts.append(line)
                labels.append(label)

datasets/edgar_allan_poe.txt belongs to the labels 0
datasets/robert_frost.txt belongs to the labels 1


In [4]:
input_txts[:5], labels[:5]

(['lo death hath reard himself a throne',
  'in a strange city all alone',
  'far down within the dim west',
  'where the good and the bad and the worst and the best',
  'have gone to their eternal rest'],
 [0, 0, 0, 0, 0])

In [5]:
input_txts[-5:], labels[-5:]

(['to say which buds are leaf and which are bloom',
  'a featherhammer gives a double knock',
  'this eden day is done at two oclock',
  'an hour of winter day might seem too short',
  'to make it worth lifes while to wake and sport'],
 [1, 1, 1, 1, 1])

In [6]:
len(input_txts)

2154

In [7]:
X_train, X_test, y_train, y_test = train_test_split(input_txts, labels, test_size = 0.2, shuffle = True)

In [8]:
X_train[:5], y_train[:5]

(['hath ever told or is it of a thought',
  'smoothlaid like thatch with the heavy dew',
  'ancestral memories might come together',
  'call her nausicaa the unafraid',
  'far down within the dim west'],
 [0, 1, 1, 1, 0])

## Word-to-Index

In [9]:
idx = 1
word2idx = {'<unk>': 0}

for text in X_train:
    tokens = text.split()
    for token in tokens:
        if token not in word2idx:
            word2idx[token] = idx
            idx += 1

In [10]:
word2idx

{'<unk>': 0,
 'hath': 1,
 'ever': 2,
 'told': 3,
 'or': 4,
 'is': 5,
 'it': 6,
 'of': 7,
 'a': 8,
 'thought': 9,
 'smoothlaid': 10,
 'like': 11,
 'thatch': 12,
 'with': 13,
 'the': 14,
 'heavy': 15,
 'dew': 16,
 'ancestral': 17,
 'memories': 18,
 'might': 19,
 'come': 20,
 'together': 21,
 'call': 22,
 'her': 23,
 'nausicaa': 24,
 'unafraid': 25,
 'far': 26,
 'down': 27,
 'within': 28,
 'dim': 29,
 'west': 30,
 'not': 31,
 'that': 32,
 'my': 33,
 'founts': 34,
 'bliss': 35,
 'and': 36,
 'give': 37,
 'up': 38,
 'sleep': 39,
 'before': 40,
 'its': 41,
 'face': 42,
 'by': 43,
 'picking': 44,
 'faded': 45,
 'blue': 46,
 'gourd': 47,
 'grape': 48,
 'luxuriant': 49,
 'grew': 50,
 'as': 51,
 'witch': 52,
 'id': 53,
 'often': 54,
 'milk': 55,
 'bat': 56,
 'too': 57,
 'run': 58,
 'this': 59,
 'last': 60,
 'she': 61,
 'was': 62,
 'shut': 63,
 'in': 64,
 'for': 65,
 'life': 66,
 'lived': 67,
 'whole': 68,
 'we': 69,
 'can': 70,
 'fall': 71,
 'keep': 72,
 'abouncing': 73,
 'on': 74,
 'our': 75,
 '

## Converting data to interger format

In [11]:
X_train_int = []
X_test_int = []

for text in X_train:
  tokens = text.split()
  line_as_int = [word2idx[token] for token in tokens]
  X_train_int.append(line_as_int)

for text in X_test:
  tokens = text.split()
  line_as_int = [word2idx.get(token, 0) for token in tokens]
  X_test_int.append(line_as_int)

In [12]:
X_train_int[:5]

[[1, 2, 3, 4, 5, 6, 7, 8, 9],
 [10, 11, 12, 13, 14, 15, 16],
 [17, 18, 19, 20, 21],
 [22, 23, 24, 14, 25],
 [26, 27, 28, 14, 29, 30]]

In [13]:
X_test_int[:5]

[[31, 32, 136, 766, 1512, 14, 0, 2364],
 [1669, 14, 0, 7, 1845, 0, 1606],
 [299, 204, 89, 822, 11, 32, 203, 204, 89, 210],
 [74, 59, 440, 7, 92, 1342, 64, 14, 488],
 [250, 83, 428, 380, 2548, 64, 14, 681]]

## Calculating A and pi

In [14]:
# initialize A and pi matrices - for both classes
N = len(word2idx)

A0 = np.ones((N, N))
pi0 = np.ones(N)

A1 = np.ones((N, N))
pi1 = np.ones(N)

In [15]:
# compute counts for A and pi
def compute_counts(text_as_int, A, pi):
    for tokens in text_as_int:
        last_idx = None
        for idx in tokens:
            if last_idx is None:
                # it's the first word in a sentence
                pi[idx] += 1
            else:
                # the last word exists, so count a transition
                A[last_idx, idx] += 1

            # update last idx
            last_idx = idx


compute_counts([t for t, y in zip(X_train_int, y_train) if y == 0], A0, pi0)
compute_counts([t for t, y in zip(X_train_int, y_train) if y == 1], A1, pi1)

In [16]:
# normalize A and pi so they are valid probability matrices
A0 /= A0.sum(axis=1, keepdims=False)
pi0 /= pi0.sum()

A1 /= A1.sum(axis=1, keepdims=True)
pi1 /= pi1.sum()

In [17]:
# log A and pi since we don't need the actual probs
logA0 = np.log(A0)
logpi0 = np.log(pi0)

logA1 = np.log(A1)
logpi1 = np.log(pi1)

## Compute priors

In [18]:
count0 = sum(y == 0 for y in y_train)
count1 = sum(y == 1 for y in y_train)
total = len(y_train)
p0 = count0 / total
p1 = count1 / total
logp0 = np.log(p0)
logp1 = np.log(p1)

p0, p1

(0.3308183401044689, 0.669181659895531)

## Build a Classifier

In [19]:
class Classifier:
    def __init__(self, logAs, logpis, logpriors):
        self.logAs = logAs
        self.logpis = logpis
        self.logpriors = logpriors
        self.K = len(logpriors) # number of classes

    def _compute_log_likelihood(self, input_, class_):
        logA = self.logAs[class_]
        logpi = self.logpis[class_]
    
        last_idx = None
        logprob = 0
        for idx in input_:
          if last_idx is None:
            # it's the first token
            logprob += logpi[idx]
          else:
            logprob += logA[last_idx, idx]
          
          # update last_idx
          last_idx = idx
        
        return logprob
  
    def predict(self, inputs):
        predictions = np.zeros(len(inputs))
        for i, input_ in enumerate(inputs):
            posteriors = [self._compute_log_likelihood(input_, c) + self.logpriors[c] for c in range(self.K)]
            pred = np.argmax(posteriors)
            predictions[i] = pred
        return predictions

In [20]:
# each array must be in order since classes are assumed to index these lists
clf = Classifier([logA0, logA1], [logpi0, logpi1], [logp0, logp1])

## Evaluation

**Accuracy**

In [21]:
train_output = clf.predict(X_train_int)
print(f"Train acc: {np.mean(train_output == y_train)}")

Train acc: 0.995937318630296


In [22]:
test_output = clf.predict(X_test_int)
print(f"Test acc: {np.mean(test_output == y_test)}")

Test acc: 0.839907192575406


**Confusion Matrix**

In [23]:
train_cm = confusion_matrix(y_train, train_output)
train_cm

array([[ 563,    7],
       [   0, 1153]], dtype=int64)

In [24]:
test_cm = confusion_matrix(y_test, test_output)
test_cm

array([[ 88,  60],
       [  9, 274]], dtype=int64)

**F1-score**

In [25]:
f1_score(y_train, train_output)

0.9969736273238219

In [26]:
f1_score(y_test, test_output)

0.8881685575364667