In [93]:
import nltk
from nltk import corpus
import numpy as np
from sklearn.metrics import classification_report

# Download the Brown corpus and the universal tagset
nltk.download("brown")
nltk.download("universal_tagset")

# Get the tagged sentences from the Brown corpus
tagged_sentences = list(corpus.brown.tagged_sents(tagset="universal"))

for sentence in tagged_sentences:
    for word in sentence:
        word[0].lower()

[nltk_data] Downloading package brown to /home/labuser/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     /home/labuser/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


In [94]:
# for sentence in tagged_sentences[:10]:
#     for word_tuple in sentence:
#         print(word_tuple[0], end=" ")
#     print()

pos = {
    "ADJ": 1,
    "ADP": 2,
    "ADV": 3,
    "CONJ": 4,
    "DET": 5,
    "NOUN": 6,
    "NUM": 7,
    "PRON": 8,
    "PRT": 9,
    "VERB": 10,
    ".": 11,
    "X": 12,
}

rev_pos = ["", "ADJ", "ADP", "ADV", "CONJ", "DET", "NOUN", "NUM", "PRON", "PRT", "VERB", ".", "X", ""]

# beginning = 0, ending = 13

In [95]:
class HiddenMarkov:
    def __init__(self, tagged_sentences, pos, rev_pos):
        self.tagged_sentences = tagged_sentences
        self.pos = pos
        self.rev_pos = rev_pos
        self.word_map = {}

        self.train_sets = []
        self.test_sets = []
        self.transition_matrix = []
        self.emission_matrices = []
        self.total_transition_matrix = []
        self.total_emission_matrix = []

        pass

    def get_map(self) -> dict:
        id = 0
        for sentence in tagged_sentences:
            for word, _ in sentence:
                if word in self.word_map.keys():
                    continue
                self.word_map[word] = id
                id += 1

        return

    def get_transition_matrix(self, tagged_sentence) -> np.ndarray:
        transition_matrix = np.ones((14, 14))
        for sentence in tagged_sentence:
            n = len(sentence)
            for i in range(n):
                word, tag = sentence[i]
                tag = pos[tag]
                if i == 0:
                    transition_matrix[0][tag] += 1
                elif i < n - 1:
                    transition_matrix[tag][pos[sentence[i + 1][1]]] += 1
                else:
                    transition_matrix[tag][13] += 1

        transition_matrix = transition_matrix / np.sum(
            transition_matrix, axis=1, keepdims=True
        )

        return transition_matrix

    def get_emission_matrix(self, tagged_sentence) -> np.ndarray:
        n = len(self.word_map)
        emission_matrix = np.ones((14, n))
        for sentence in tagged_sentence:
            for word, tag in sentence:
                tag = pos[tag]
                emission_matrix[tag][self.word_map[word]] += 1

        emission_matrix = emission_matrix / np.sum(
            emission_matrix, axis=1, keepdims=True
        )

        return emission_matrix

    def five_fold(self):
        n = len(self.tagged_sentences)
        sz = n // 5
        parts = []
        for i in range(5):
            parts.append(tagged_sentences[i * sz : i * sz + sz])
        for i in range(5):
            self.test_sets.append(parts[i])
            train_set = []
            for j in range(5):
                if j == i:
                    continue
                train_set.extend(parts[j])

            self.train_sets.append(train_set)

        return

    def get_five_fold_matrices(self):
        self.train_sets = []
        self.test_sets = []
        self.transition_matrix = []
        self.emission_matrices = []
        self.five_fold()
        self.get_map()

        self.total_emission_matrix = self.get_emission_matrix(self.tagged_sentences)
        self.total_transition_matrix = self.get_transition_matrix(self.tagged_sentences)        
        for i in range(5):
            self.transition_matrix.append(
                self.get_transition_matrix(self.train_sets[i])
            )
            self.emission_matrices.append(self.get_emission_matrix(self.train_sets[i]))

        return
    
    def viterbi_sentence(self, sentence, transition_matrix : np.ndarray, emmission_matrix : np.ndarray):
        n = len(sentence)
        T = len(self.pos) + 2
        V = np.full((n, T), 0) 
        B = np.full((n, T), -1)
        const = np.log(1/len(self.word_map))

        # first word
        word = sentence[0][0]
        word_id = self.word_map[word] if word in self.word_map else -1
        for tag in range(1, T-1):
            V[0, tag] = np.log(transition_matrix[0, tag]) 
            if word_id != -1:
                V[0, tag]+=np.log(emmission_matrix[tag, word_id])
            else:
                V[0, tag]+=const


        # rest all words
        for i in range(1, n):
            word = sentence[i][0]
            word_id = self.word_map[word] if word in self.word_map else -1
            for tag in range(1, T-1):
                max_prob = -np.inf
                back_id = -1
                for prev in range(1, T-1):
                    prob = V[i-1][prev] + np.log(transition_matrix[prev, tag]) 
                    if prob > max_prob:
                        max_prob = prob
                        back_id = prev
                
                if word_id != -1:
                    max_prob+=(np.log(emmission_matrix[tag, word_id]))
                else:
                    max_prob+=const
                
                if i == n-1:
                    max_prob+=(np.log(transition_matrix[tag, T-1]))

                V[i, tag] = max_prob
                B[i, tag] = back_id
        
        # find last word tag and back propogate
        last_tag = -1
        max_prob = -np.inf
        for tag in range(1, T-1):
            if V[n-1, tag] > max_prob:
                max_prob = V[n-1, tag]
                last_tag = tag
        
        pos_tags = np.full((n), 0)
        pos_tags[n-1] = last_tag
        for i in range(n-2, -1, -1):
            pos_tags[i] = B[i+1, pos_tags[i+1]]
        
        pred_tags = [self.rev_pos[tag] for tag in pos_tags]

        return pred_tags
    
    def viterbi_algo(self, sentence):
        return self.viterbi_sentence(sentence, self.total_emission_matrix, self.total_emission_matrix)


    def accuracy_of_match(self, pred_labels, actual_labels):
        n = len(pred_labels)
        m = 0 
        for i in range(n):
            if pred_labels[i] == actual_labels[i]:
                m+=1 
        acc = (m/n)*100 
        return acc
    
    def classification_report(self):
        predicted = []
        for i in range(5):
            transition_matrix = self.transition_matrix[i]
            emmission_matrix = self.emission_matrices[i]
            for sentence in self.test_sets[i]:
                predicted.extend(self.viterbi_sentence(sentence, transition_matrix, emmission_matrix))
        
        actual = []
        for sentence in self.tagged_sentences:
            for word in sentence:
                actual.append(word[1])
        
        return actual, predicted


In [96]:
model = HiddenMarkov(tagged_sentences=tagged_sentences, pos=pos, rev_pos=rev_pos)
model.get_five_fold_matrices()

def actual_tags(sentence):
    return [word[1] for word in sentence]

In [97]:
# DEBUGGING CODE
transition_matrix = model.transition_matrix[0]
emmission_matrix = model.emission_matrices[0]
train_data = model.train_sets[0]
test_data = model.test_sets[0]
actual = actual_tags(train_data[6])
pred = model.viterbi_sentence(train_data[6], transition_matrix, emmission_matrix)
print(actual)
print(pred)
print(model.accuracy_of_match(pred, actual))

['DET', 'NOUN', 'ADP', 'NUM', 'ADP', 'DET', 'NOUN', 'VERB', 'NOUN', '.', 'ADP', 'PRON', 'ADV', 'VERB', 'DET', 'NOUN', 'ADV', 'ADP', 'NOUN', 'ADV', 'VERB', 'ADP', 'PRON', 'VERB', 'VERB', 'VERB', 'ADV', 'ADP', 'DET', 'NOUN', 'ADP', 'ADJ', 'NOUN', '.']
['DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', 'VERB', 'NOUN', '.', 'ADP', 'PRON', 'ADV', 'VERB', 'ADP', 'NOUN', 'ADV', 'ADP', 'NOUN', 'ADV', 'VERB', 'ADP', 'PRON', 'VERB', 'VERB', 'VERB', 'ADV', 'ADP', 'DET', 'NOUN', 'ADP', 'ADJ', 'NOUN', '.']
94.11764705882352


In [98]:
actual, predicted = model.classification_report()

In [99]:
report = classification_report(actual, predicted)
print(report)

              precision    recall  f1-score   support

           .       0.97      1.00      0.98    147565
         ADJ       0.85      0.85      0.85     83721
         ADP       0.89      0.97      0.93    144766
         ADV       0.84      0.85      0.85     56239
        CONJ       0.97      0.99      0.98     38151
         DET       0.89      0.99      0.94    137019
        NOUN       0.94      0.87      0.90    275558
         NUM       0.99      0.70      0.82     14874
        PRON       0.84      0.95      0.89     49334
         PRT       0.90      0.83      0.86     29829
        VERB       0.96      0.89      0.93    182750
           X       0.61      0.26      0.36      1386

    accuracy                           0.92   1161192
   macro avg       0.89      0.85      0.86   1161192
weighted avg       0.92      0.92      0.92   1161192



In [103]:
import sklearn.metrics as metrics

print("F1 score : ", metrics.f1_score(actual, predicted, average=None))
print("Accuracy score : ", metrics.accuracy_score(actual, predicted))

F1 score :  [0.98491412 0.8509025  0.93209387 0.84508466 0.97871184 0.93639309
 0.90199898 0.81931219 0.89267798 0.86474106 0.92521884 0.36067244]
Accuracy score :  0.9178378769402477
