In [1]:
import os
import random
from collections import defaultdict, Counter
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

In [2]:
brown_path = os.path.join("data", "brown-universal.txt")
tags_path = os.path.join("data", "tags-universal.txt")

In [3]:
class Subset(object):
    def __init__(self, full_dataset, keys, tags):
        self.full_dataset = full_dataset
        self.dataset = {}
        self.keys = keys
        self.tags = tags
        self.vocab = []
        self.X = []
        self.Y = []
        
        self._clean_up()
        self._generate_vocab()
        
    def _clean_up(self):
        for key in self.keys:
            self.dataset[key] = self.full_dataset[key]
            self.X.append(self.dataset[key]['sentence_words'])
            self.Y.append(self.dataset[key]['sentence_tags'])
        del self.full_dataset
        
    def _generate_vocab(self):
        for sentence in self.dataset.values():
            for word in sentence['sentence_words']:
                if word not in self.vocab:
                    self.vocab.append(word)
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        if type(idx) == int:
            key = self.keys[idx]
            return self.dataset[key]
        
        assert type(idx) == str and idx in self.keys
        return self.dataset[idx]
    
    def __iter__(self):
        idx = 0
        while(idx < len(self.dataset)):
            yield self[idx]
            idx += 1

class Dataset(object):
    def __init__(self, brown_path, tags_path, train_test_split=0.8, seed=28934897):
        self.brown_path = brown_path
        self.tags_path = tags_path
        self.train_test_split = train_test_split
        self.seed = seed
        
        self.tags = []
        self.dataset = {}
        self.keys = []
        
        self._prepare_tags()
        self._prepare_brown()
        self.datasets = self._split_dataset()
        
    def _prepare_tags(self):
        with open(self.tags_path) as f:
            raw_tag_data = f.read()
            self.tags = raw_tag_data.lower().split('\n')
    
    def _prepare_brown(self):
        key = ''
        
        with open(brown_path) as f:
            while(raw_data := f.readline()):
                raw_data = raw_data.replace('\n', '').replace('\r', '').lower()
                if len(raw_data) != 0:
                    if raw_data.split('\t')[-1] not in self.tags:
                        key = raw_data
                        self.dataset[key] = {'sentence_words': [], 'sentence_tags': []}
                        self.keys.append(key)
                    else:
                        word_tag_pairing = raw_data.split('\t')
                        self.dataset[key]['sentence_words'].append(word_tag_pairing[0])
                        self.dataset[key]['sentence_tags'].append(word_tag_pairing[1])
                                                
    def _split_dataset(self):
        if self.seed:
            random.seed(self.seed)
        _keys = self.keys
        random.shuffle(_keys)
        split = int(self.train_test_split * len(_keys))
        
        self.training_dataset = Subset(self.dataset, _keys[:split], self.tags)
        self.testing_dataset = Subset(self.dataset, _keys[split:], self.tags)
        
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        if type(idx) == int:
            key = self.keys[idx]
            return self.dataset[key]
        
        assert type(idx) == str and idx in self.keys
        return self.dataset[idx]
    
    def __iter__(self):
        idx = 0
        while(idx < len(self.dataset)):
            yield self[idx]
            idx += 1
    
data = Dataset(brown_path=brown_path, tags_path=tags_path)

In [4]:
class MostFrequentTagger(object):
    def __init__(self, dataset):
        self.dataset = dataset
        self.freq_counter = self._frequency_counter(dataset.X, dataset.Y)
        self.table = {}
        
        self._generate_table()
    
    def _frequency_counter(self, seq_A, seq_B):
        counter = defaultdict(Counter)
        
        for i in range(len(seq_A)):
            for a, b in zip(seq_A[i], seq_B[i]):
                counter[a][b] += 1
                
        return counter
    
    def _generate_table(self):
        for word, tags in self.freq_counter.items():
            self.table[word] = tags.most_common(1)[0][0]
            
    def predict(self, sentence):
        result = ["<start>"]
        
        for word in sentence:
            if pos := self.table.get(word):
                result.append(pos)
            else:
                result.append('nan')
        result.append('<end>')
        
        return result
    
mft = MostFrequentTagger(data.training_dataset)

In [5]:
def visualize_predictions(dataset, model):
    for key in dataset.keys[:3]:
        print('---------------------------')
        print('Sentence ID: ', key)

        prediction = []
        actual = []

        sample = dataset[key]
        print('Sentence: ', sample['sentence_words'])
        print()
        
        prediction = model.predict(sample['sentence_words'])
        actual += sample['sentence_tags']
        actual.insert(0, '<start>')
        actual.append('<end>')

        print('Prediction: \n\r', prediction)
        print('Actual: \n\r', actual)
    print('---------------------------')
    
def calc_accuracy(X, Y, model):
    correct = 0
    total_tags = 0
    
    for sentence, actual_tags in zip(X, Y):
        predicted_tags = model.predict(sentence)
        for predicted_tag, actual_tag in zip(predicted_tags[1:-1], actual_tags):
            if predicted_tag == actual_tag:
                correct += 1
            total_tags += 1
            
    return correct/total_tags

In [6]:
visualize_predictions(data.testing_dataset, mft)

---------------------------
Sentence ID:  b100-49401
Sentence:  ['``', "i'll", 'shore', 'be', 'needing', 'ye', 'both', 'on', 'the', 'pull', 'out', "o'", 'the', 'canyon', "''", '.']

Prediction: 
 ['<start>', '.', 'prt', 'noun', 'verb', 'verb', 'pron', 'det', 'adp', 'det', 'verb', 'prt', 'adp', 'det', 'noun', '.', '.', '<end>']
Actual: 
 ['<start>', '.', 'prt', 'noun', 'verb', 'verb', 'pron', 'det', 'adp', 'det', 'noun', 'prt', 'adp', 'det', 'noun', '.', '.', '<end>']
---------------------------
Sentence ID:  b100-18537
Sentence:  ['she', 'named', '48', 'items', ',', 'and', 'said', 'there', 'were', '``', 'many', 'more', 'things', 'which', 'it', 'would', 'take', 'too', 'long', 'to', 'write', "''", '.']

Prediction: 
 ['<start>', 'pron', 'verb', 'num', 'noun', '.', 'conj', 'verb', 'prt', 'verb', '.', 'adj', 'adv', 'noun', 'det', 'pron', 'verb', 'verb', 'adv', 'adj', 'prt', 'verb', '.', '.', '<end>']
Actual: 
 ['<start>', 'pron', 'verb', 'num', 'noun', '.', 'conj', 'verb', 'prt', 'verb', '

In [7]:
training_acc = calc_accuracy(data.training_dataset.X, data.training_dataset.Y, mft)
testing_acc = calc_accuracy(data.testing_dataset.X, data.testing_dataset.Y, mft)

print(f'Training Accuracy: {training_acc*100:.2f}%')
print(f'Testing Accuracy: {testing_acc*100:.2f}%')

Training Accuracy: 95.56%
Testing Accuracy: 93.19%


In [8]:
class HiddenMarkovModelWrapper(object):
    def __init__(self, dataset, model):
        self.dataset = dataset
        self.model = model
        
        self.freq_counter = self._frequency_counter(self.dataset.Y, self.dataset.X)
        self.unigram_table = self._get_unigram_count()
        self.bigram_table = self._get_bigram_count()
        self.starting_table = self._get_starting_sentence_count()
        self.ending_table = self._get_ending_sentence_count()
        
        self._prepare_model()
    
    def _get_unigram_count(self):
        flattened = []
        for tags in self.dataset.Y:
            flattened += tags
        
        return Counter(flattened)
    
    def _get_bigram_count(self):
        pairs = []
        for tags in self.dataset.Y:
            for current_tag, next_tag in zip(tags[:-1], tags[1:]):
                pairs.append((current_tag, next_tag))
        
        return Counter(pairs)
    
    def _get_starting_sentence_count(self):
        starting_tags = []
        for tags in self.dataset.Y:
            starting_tags.append(tags[0])
        return Counter(starting_tags)
    
    def _get_ending_sentence_count(self):
        ending_tags = []
        for tags in self.dataset.Y:
            ending_tags.append(tags[-1])
        return Counter(ending_tags)
    
    def _frequency_counter(self, seq_A, seq_B):
        counter = defaultdict(Counter)
        
        for i in range(len(seq_A)):
            for a, b in zip(seq_A[i], seq_B[i]):
                counter[a][b] += 1
                
        return counter
    
    def _prepare_model(self):
        states = {}
        
        for tag in self.dataset.tags:
            emission_prob = {}
            for word in self.freq_counter[tag]:
                emission_prob[word] = self.freq_counter[tag][word] / self.unigram_table[tag]
        
            emission_per_tag = DiscreteDistribution(emission_prob)
            tag_state = State(emission_per_tag, tag)
            states[tag] = tag_state
        
        self.model.add_states([state for state in states.values()])
        
        for tag in self.dataset.tags:
            start_prob = self.starting_table[tag] / self.starting_table.total()
            self.model.add_transition(self.model.start, states[tag], start_prob)
            
        for tag in self.dataset.tags:
            end_prob = self.ending_table[tag] / self.unigram_table[tag]
            self.model.add_transition(states[tag], self.model.end, end_prob)
            
        for current_tag, next_tag in self.bigram_table.keys():
            transition_prob = self.bigram_table[(current_tag, next_tag)] / self.unigram_table[current_tag]
            self.model.add_transition(states[current_tag], states[next_tag], transition_prob)
            
        self.model.bake()
    
    def predict(self, X):
        sentence = []
        predictions = ['<start>']
            
        for word in X:
            if word in self.dataset.vocab:
                sentence.append(word)
            else:
                sentence.append('nan')
        
        _, state_path = self.model.viterbi(sentence)
        for state in state_path[1:-1]:
            predictions.append(state[1].name)
        predictions.append('<end>')
            
        return predictions
    
hmm = HiddenMarkovModelWrapper(data.training_dataset, HiddenMarkovModel(name="base-hmm-tagger"))

In [9]:
visualize_predictions(data.testing_dataset, hmm)

---------------------------
Sentence ID:  b100-49401
Sentence:  ['``', "i'll", 'shore', 'be', 'needing', 'ye', 'both', 'on', 'the', 'pull', 'out', "o'", 'the', 'canyon', "''", '.']

Prediction: 
 ['<start>', '.', 'prt', 'noun', 'verb', 'verb', 'pron', 'det', 'adp', 'det', 'verb', 'prt', 'adp', 'det', 'noun', '.', '.', '<end>']
Actual: 
 ['<start>', '.', 'prt', 'noun', 'verb', 'verb', 'pron', 'det', 'adp', 'det', 'noun', 'prt', 'adp', 'det', 'noun', '.', '.', '<end>']
---------------------------
Sentence ID:  b100-18537
Sentence:  ['she', 'named', '48', 'items', ',', 'and', 'said', 'there', 'were', '``', 'many', 'more', 'things', 'which', 'it', 'would', 'take', 'too', 'long', 'to', 'write', "''", '.']

Prediction: 
 ['<start>', 'pron', 'verb', 'num', 'noun', '.', 'conj', 'verb', 'prt', 'verb', '.', 'adj', 'adj', 'noun', 'det', 'pron', 'verb', 'verb', 'adv', 'adj', 'prt', 'verb', '.', '.', '<end>']
Actual: 
 ['<start>', 'pron', 'verb', 'num', 'noun', '.', 'conj', 'verb', 'prt', 'verb', '

In [10]:
training_acc = calc_accuracy(data.training_dataset.X, data.training_dataset.Y, hmm)
testing_acc = calc_accuracy(data.testing_dataset.X, data.testing_dataset.Y, hmm)

print(f'Training Accuracy: {training_acc*100:.2f}%')
print(f'Testing Accuracy: {testing_acc*100:.2f}%')

Training Accuracy: 97.41%
Testing Accuracy: 96.02%
