In [1]:
def read_data(filename):
	"""Read tagged sentence data"""
	from collections import namedtuple, OrderedDict
	Sentence = namedtuple("Sentence", "words tags")
	with open(filename, 'r') as f:
		content = f.read()
		sentences = content.split("\n\n")
		sentences_split_by_words = [sen.split("\n") for sen in sentences]

	stored_dict = OrderedDict()
	for sentence in sentences_split_by_words:
		# try:
		# 	assert sentence[0][0] == 'b'
		# except Exception: 
		# 	continue
		word_tags = []
		for word in sentence[1:]:
			word_tag = word.split("\t")
			# if word_tag[0] != '.':
			assert len(word_tag) == 2
			assert word_tag[1].upper() == word_tag[1]
			word_tags.append(word_tag)
		if len(word_tags) == 0:
			continue
		temp = list(zip(*word_tags))
		if len(temp) != 2:
			pass
		assert len(temp) == 2
		# print(temp)
		l1, l2 = temp
		s = Sentence(l1, l2)
		stored_dict[sentence[0]] = s
	return stored_dict
		

In [2]:
stored_dict = read_data("brown-universal.txt")
stored_dict
len(stored_dict)

57340

In [3]:
# count = 0
# for k in stored_dict:
# 	if stored_dict[k].words[0] == '.':
# 		count+=1
# 		print(stored_dict[k].words)
# print(f"count = {count}")

In [3]:
def read_tags(filename):
	"""Read a list of word tag classes"""
	with open(filename, 'r') as f:
		tags = f.read().split("\n")
	return frozenset(tags)

tag_set = read_tags('tags-universal.txt')

In [4]:
tag_set

frozenset({'.',
           'ADJ',
           'ADP',
           'ADV',
           'CONJ',
           'DET',
           'NOUN',
           'NUM',
           'PRON',
           'PRT',
           'VERB',
           'X'})

In [5]:
from itertools import chain
from collections import namedtuple

class Subset(namedtuple("BaseSet", "sentences keys vocab X tagset Y N stream")):
	def __new__(cls, sentences, keys):
		word_sequences = tuple([sentences[k].words for k in keys])
		tag_sequences = tuple([sentences[k].tags for k in keys])
		wordset = frozenset(chain(*word_sequences))
		tagset = frozenset(chain(*tag_sequences))
		N = sum(1 for _ in chain(*word_sequences))
		stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
		return super().__new__(cls,
                               {k: sentences[k] for k in keys},
							   keys,
							   wordset,
							   word_sequences,
							   tagset,
							   tag_sequences,
							   N,
							   stream.__iter__
							   )
	
	def __len__(self):
		return len(self.sentences)
	
	def __iter__(self):
		return iter(self.sentences.items())

In [6]:
from itertools import chain
from collections import namedtuple
import random

class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream")):
	def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=112890):
		tagset = read_tags(tagfile)
		sentences = read_data(datafile)
		keys = tuple(sentences.keys())
		wordset = frozenset(chain(*[sentence.words for sentence in sentences.values()]))
		word_sequences = tuple([sentences[k].words for k in keys])
		tag_sequences = tuple([sentences[k].tags for k in keys])
		N = sum(1 for _ in chain(*(sen.words for sen in sentences.values())))
	
		# split data into train/test sets
		_keys = list(keys)
		if seed is not None: 
			random.seed(seed)
		random.shuffle(_keys)
		split = int(train_test_split * len(_keys))
		training_data = Subset(sentences, _keys[:split])
		testing_data = Subset(sentences, _keys[split:])
		stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
		return super().__new__(cls, dict(sentences), keys,
							   wordset,
							   word_sequences,
							   tagset,
							   tag_sequences,
							   training_data,
							   testing_data,
							   N,
							   stream.__iter__)
	
	def __len__(self):
		return len(self.sentences)
	
	def __iter__(self):
		return iter(self.sentences.items())


In [7]:
data = Dataset("./tags-universal.txt", "./brown-universal.txt", train_test_split=.8)

print(f"There are {len(data)} sentences in the corpus.")
print(f"There are {len(data.training_set)} sentences in the training set.")
print(f"There are {len(data.testing_set)} sentences in the testing set.")

There are 57340 sentences in the corpus.
There are 45872 sentences in the training set.
There are 11468 sentences in the testing set.


In [9]:
key = 'b100-38532'
print(f"Sentence: {key}")
print(f"words:\n\t{data.sentences[key].words}")
print(f"tags:\n\t{data.sentences[key].tags}")

Sentence: b100-38532
words:
	('Perhaps', 'it', 'was', 'right', ';', ';')
tags:
	('ADV', 'PRON', 'VERB', 'ADJ', '.', '.')


In [8]:
print(f"There are a total of {data.N} samples of {len(data.vocab)} unique words in the corpus.")
print(f"There are {data.training_set.N} samples of {len(data.training_set.vocab)} unique words in the training set.")
print(f"There are {data.testing_set.N} samples of {len(data.testing_set.vocab)} unique words in the testing set.")
print(f"There are {len(data.testing_set.vocab - data.training_set.vocab)} words in the test set that are missing in the training set.")

There are a total of 1161192 samples of 56057 unique words in the corpus.
There are 928458 samples of 50536 unique words in the training set.
There are 232734 samples of 25112 unique words in the testing set.
There are 5521 words in the test set that are missing in the training set.


In [9]:
# Accessing word and tag sequences
for i in range(2):
	print(f"Sentence {i+1}: {data.X[i]}")
	print(f"Labels {i+1}: {data.Y[i]}")
	print()

Sentence 1: ('Mr.', 'Podger', 'had', 'thanked', 'him', 'gravely', ',', 'and', 'now', 'he', 'made', 'use', 'of', 'the', 'advice', '.')
Labels 1: ('NOUN', 'NOUN', 'VERB', 'VERB', 'PRON', 'ADV', '.', 'CONJ', 'ADV', 'PRON', 'VERB', 'NOUN', 'ADP', 'DET', 'NOUN', '.')

Sentence 2: ('But', 'there', 'seemed', 'to', 'be', 'some', 'difference', 'of', 'opinion', 'as', 'to', 'how', 'far', 'the', 'board', 'should', 'go', ',', 'and', 'whose', 'advice', 'it', 'should', 'follow', '.')
Labels 2: ('CONJ', 'PRT', 'VERB', 'PRT', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'ADP', 'ADV', 'ADV', 'DET', 'NOUN', 'VERB', 'VERB', '.', 'CONJ', 'DET', 'NOUN', 'PRON', 'VERB', 'VERB', '.')


In [10]:
# Accessing (word, tag) samples
print("\nStream (word, tag) pairs:\n")
for i, pair in enumerate(data.stream()):
	print("\t", pair)
	if i > 25:
		break


Stream (word, tag) pairs:

	 ('Mr.', 'NOUN')
	 ('Podger', 'NOUN')
	 ('had', 'VERB')
	 ('thanked', 'VERB')
	 ('him', 'PRON')
	 ('gravely', 'ADV')
	 (',', '.')
	 ('and', 'CONJ')
	 ('now', 'ADV')
	 ('he', 'PRON')
	 ('made', 'VERB')
	 ('use', 'NOUN')
	 ('of', 'ADP')
	 ('the', 'DET')
	 ('advice', 'NOUN')
	 ('.', '.')
	 ('But', 'CONJ')
	 ('there', 'PRT')
	 ('seemed', 'VERB')
	 ('to', 'PRT')
	 ('be', 'VERB')
	 ('some', 'DET')
	 ('difference', 'NOUN')
	 ('of', 'ADP')
	 ('opinion', 'NOUN')
	 ('as', 'ADP')
	 ('to', 'ADP')


# Simple tagger
### Simply choose the tag most frequently assigned to each word

In [11]:
from itertools import chain
def pair_counts(sequences_A, sequences_B):
	"""
	Returns a dictionary keyed to each unique value in the first sequence list that counts the number of occurrences of the corresponding value from the second sequences list.
	For example, if sequences_A is tags and sequences_B is the corresponding
    words, then if 1244 sequences contain the word "time" tagged as a NOUN, then should return a dictionary such that pair_counts[NOUN][time] == 1244

	"""
	pair_count = {}
	assert len(sequences_A) == len(sequences_B)
	for w1, w2 in zip(chain(*sequences_A), chain(*sequences_B)):
		if w1 in pair_count:
			if w2 in pair_count[w1]:
				pair_count[w1][w2] += 1
			else:
				pair_count[w1][w2] = 1
		else:
			pair_count[w1] = dict()
			pair_count[w1][w2] = 1
	return pair_count

emission_counts = pair_counts(data.training_set.Y, data.training_set.X)
	

In [12]:
emission_counts.keys()

dict_keys(['ADV', 'NOUN', '.', 'VERB', 'ADP', 'ADJ', 'CONJ', 'DET', 'PRT', 'NUM', 'PRON', 'X'])

In [13]:
# most-freqent-class tagger
# find the most frequent class label for each word in the training data
# MFCTagger class is meant to be similar to the interface of the Pomegranate HMM models so that they can be used interchangeably
from collections import namedtuple, defaultdict

FakeState = namedtuple("FakeState", "name")

class MFCTagger:
	
	missing = FakeState(name="<MISSING>")
	
	def __init__(self, table):
		self.table = defaultdict(lambda: MFCTagger.missing)
		self.table.update({
			word: FakeState(name=tag) for word, tag in table.items()
		})
	
	def viterbi(self, seq):
		"""
		This method simplifies predictions by matching the Pomegranate viterbi() interface
		"""
		return 0., list(enumerate(["<start>"] + [self.table[w] for w in seq] + ["<end>"]))
	
word_counts = pair_counts(data.training_set.Y, data.training_set.X)
mfc_table = {}
for word in data.training_set.vocab:
	most_frequent_count = -1
	most_frequent_tag = None
	for tag in data.training_set.tagset:
		try:
			count = word_counts[tag][word]
			if count > most_frequent_count:
				most_frequent_count = count
				most_frequent_tag = tag
				mfc_table[word] = most_frequent_tag 
		except KeyError:
			continue
			


In [15]:
i = 0
for word, tag in mfc_table.items():
	print(f"{word}: {tag}")
	i += 1
	if i > 10: break

poverty-stricken: ADJ
styled: VERB
payroll: NOUN
spinach: NOUN
they: PRON
rushes: VERB
Future: ADJ
Hammer: NOUN
$14,000: NOUN
aborigine: NOUN
Sheriff: NOUN


In [16]:
def replace_unknown(sequence):
	"""Return a copy of the input sequence where each unknown word is replaced by the literal string value 'nan'. Pomegranate will ignore these vvalues during computation."""
	return [w if w in data.training_set.vocab else 'nan' for w in sequence]

def simplify_decoding(X, model):
	"""X should be a 1-D sequence of observations for the model to predict"""
	_, state_path = model.viterbi(replace_unknown(X))
	return [state[1].name for state in state_path[1:-1]]

In [17]:
# Example decoding sequences with MFC Tagger
mfc_model = MFCTagger(mfc_table)
for key in data.testing_set.keys[:3]:
	print(f"Sentence Key: {key}")
	print(f"Sentence: {data.sentences[key].words}")
	print(f"Predicted labels: \n-------------------")
	print(simplify_decoding(data.sentences[key].words, mfc_model))
	print()
	print("Actual labels:\n--------------------")
	print(data.sentences[key].tags)
	print()
	
	

Sentence Key: b100-28144
Sentence: ('and', 'August', '15', ',', 'November', '15', ',', 'February', '17', ',', 'and', 'May', '15', ',', '(', 'Cranston', ')', '.')
Predicted labels: 
-------------------
['CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.']

Actual labels:
--------------------
('CONJ', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'NOUN', 'NUM', '.', 'CONJ', 'NOUN', 'NUM', '.', '.', 'NOUN', '.', '.')

Sentence Key: b100-23146
Sentence: ('She', 'had', 'the', 'opportunity', 'that', 'few', 'clever', 'women', 'can', 'resist', ',', 'of', 'showing', 'her', 'superiority', 'in', 'argument', 'over', 'a', 'man', '.')
Predicted labels: 
-------------------
['PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.', 'ADP', 'VERB', 'DET', 'NOUN', 'ADP', 'NOUN', 'ADP', 'DET', 'NOUN', '.']

Actual labels:
--------------------
('PRON', 'VERB', 'DET', 'NOUN', 'ADP', 'ADJ', 'ADJ', 'NOUN', 'VERB', 'VERB', '.',

In [18]:
# Evaluating model accuracy
def accuracy(X, Y, model):
	"""
	Calculate the prediction accuracy by using the model to decode each sequence in the input X and comparing the prediction with the true labels in Y.
	
	The X should be an array whose first dimensions is the number of sentences to test, and each element of the array should be an iterable of the words in the sequence. The arrays X and Y should have the exact same shape.
	"""
	correct = total_predictions = 0
	for observations, actual_tags in zip(X, Y):
		# if there's any exception, count the full sentence as an error (which makes this a conservative estimate)
		try:
			most_likely_tags = simplify_decoding(observations, model)
			correct += sum(p==t for p, t in zip(most_likely_tags, actual_tags))
		except:
			pass
		total_predictions += len(observations)
	return correct / total_predictions

In [19]:
# evaluate the accuracy of the MFC tagger
mfc_training_acc = accuracy(data.training_set.X, data.training_set.Y, mfc_model)
print(f"training accuracy mfc_model: {100 * mfc_training_acc:.2f}%")

mfc_testing_acc = accuracy(data.testing_set.X, data.testing_set.Y, mfc_model)
print(f"testing accuracy mfc_model: {100 * mfc_testing_acc:.2f}%")

assert mfc_training_acc >= 0.955
assert mfc_testing_acc >= 0.925
from IPython.core.display import HTML
HTML('<div class="alert alert-block alert-success">MFC tagger accuracy looks correct!</div>')

training accuracy mfc_model: 95.72%
testing accuracy mfc_model: 93.02%


# Hidden Markov Model tagger


In [20]:
def unigram_counts(sequences):
	"""Return a dictionary keyed to each unique value in the input sequence list that counts the number of occurrences of the value in the sequences list.  The sequences collection should be a 2-dimensional array.
	
	For example, if the tag NOUN appears 275558 times over all the input sequences, then should return a dictionary such that dict[NOUN] == 275558"""
	d = {}
	for seq in sequences:
		for t in seq:
			if t in d:
				d[t] += 1
			else:
				d[t] = 1
	return d

tag_unigrams = unigram_counts(data.training_set.Y)
tag_unigrams
# len(data.training_set.Y)
# sum(tag_unigrams.values())

{'ADV': 44877,
 'NOUN': 220632,
 '.': 117757,
 'VERB': 146161,
 'ADP': 115808,
 'ADJ': 66754,
 'CONJ': 30537,
 'DET': 109671,
 'PRT': 23906,
 'NUM': 11878,
 'PRON': 39383,
 'X': 1094}

In [21]:
# compute the transition counts from a certain tag to another type of tag
def bigram_counts(sequences):
	"""Return a dictionary keyed to each unique pair of values in the input sequences list that counts the number of occurrences of pair in the sequences list. The input should be a 2-dimensional array."""
	d = {}
	for seq in sequences:
		for idx, t in enumerate(seq[:-1]):
			pair = (seq[idx], seq[idx+1])
			if pair in d:
				d[pair] += 1
			else:
				d[pair] = 1
	return d

tag_bigrams = bigram_counts(data.training_set.Y)
tag_bigrams
# sum(tag_bigrams.values())

{('ADV', 'NOUN'): 1478,
 ('NOUN', '.'): 62639,
 ('.', 'ADV'): 5124,
 ('ADV', '.'): 7577,
 ('.', 'VERB'): 9041,
 ('VERB', 'ADP'): 24927,
 ('ADP', 'ADJ'): 9533,
 ('ADJ', 'NOUN'): 43664,
 ('NOUN', 'CONJ'): 13185,
 ('CONJ', 'VERB'): 6012,
 ('VERB', 'ADJ'): 8423,
 ('.', 'DET'): 8008,
 ('DET', 'VERB'): 7062,
 ('ADJ', 'PRT'): 1301,
 ('PRT', 'ADP'): 2189,
 ('ADP', 'NUM'): 3467,
 ('NUM', 'NOUN'): 4524,
 ('.', 'PRON'): 5448,
 ('PRON', 'VERB'): 27860,
 ('VERB', 'PRT'): 9556,
 ('PRT', 'VERB'): 14886,
 ('VERB', 'NOUN'): 14230,
 ('NOUN', 'NUM'): 1783,
 ('NUM', '.'): 3210,
 ('.', 'NUM'): 1412,
 ('.', '.'): 12588,
 ('ADP', 'ADV'): 1805,
 ('ADV', 'NUM'): 597,
 ('DET', 'NOUN'): 68785,
 ('CONJ', 'DET'): 4636,
 ('NOUN', 'VERB'): 34972,
 ('ADP', 'NOUN'): 29965,
 ('ADP', 'DET'): 52841,
 ('NOUN', 'ADP'): 53884,
 ('CONJ', 'NOUN'): 7502,
 ('.', 'NOUN'): 9782,
 ('VERB', '.'): 11699,
 ('VERB', 'VERB'): 26957,
 ('.', 'ADP'): 7595,
 ('ADV', 'DET'): 3309,
 ('DET', 'ADJ'): 26236,
 ('NOUN', 'DET'): 3425,
 ('ADJ', '.'

In [22]:
# must also taking into account these "<start> -> word type" transitions
def starting_counts(sequences):
	"""Return a dictionary keyed to each value in the input sequences list that counts the number of occurrences where that value is at the beginning of a sequence"""
	d = {}
	for seq in sequences:
		if len(seq) == 0: continue
		beginning_word = seq[0]
		if beginning_word in d:
			d[beginning_word] += 1
		else:
			d[beginning_word] = 1
	return d

tag_starts = starting_counts(data.training_set.Y)
tag_starts

{'ADV': 4185,
 'ADP': 5583,
 'ADJ': 1582,
 'PRT': 1718,
 'DET': 9763,
 'PRON': 7318,
 'NOUN': 6469,
 'CONJ': 2282,
 '.': 4107,
 'NUM': 760,
 'VERB': 2080,
 'X': 25}

In [23]:
# count the instances where a sentence with a given tag
def ending_counts(sequences):
	"""Return a dictionary keyed to each unique value in the input sequences list that counts the number of occurrences where that value is at the end of a sequence."""
	d = {}
	for seq in sequences:
		if len(seq) == 0: continue
		ending_word = seq[-1]
		if ending_word in d:
			d[ending_word] += 1
		else:
			d[ending_word] = 1
	return d

tag_ends = ending_counts(data.training_set.Y)
tag_ends

{'.': 44936,
 'NOUN': 722,
 'NUM': 63,
 'VERB': 75,
 'ADJ': 25,
 'ADV': 16,
 'ADP': 7,
 'DET': 14,
 'CONJ': 2,
 'PRON': 4,
 'PRT': 7,
 'X': 1}

In [47]:
# building HMM tagger
# the DenseHMM class in pomegranate no longer seems to support viterbi algorithm
# so wrap it around MyHMM which supports viterbi
from pomegranate.hmm import DenseHMM
import numpy as np
class MyHMM(DenseHMM):
	
	def viterbi(self, X_input):
		import numpy as np
		X_shape = np.array(X_input).shape
		if len(X_shape) != 3:
			X = np.array([[[X_input[i]] for i in range(len(X_input))]])
		assert len(X.shape) == 3
		assert X.shape[0] == 1
		assert X.shape[2] == 1
		sequence_length = len(X[0])
		assert sequence_length > 0
		# l_message = self.forward(X)
		num_states = len(self.distributions)
		previous_max_l = np.array([
			[None for i in range(sequence_length)]
			for j in range(num_states)
		])
		max_l_log = np.array([
			[float("-inf") for i in range(sequence_length)]
			for j in range(num_states)
		])
		# initialize the first "column" in viterbi algorithm - looping through each row
		for r in range(num_states):
			max_l_log[r][0] = self.starts[r].item() + np.log(list(self.distributions[r].parameters())[1][0][X[0, 0, 0]])
		# go through each "column"
		for c in range(1, sequence_length):
			# go through each state in that column, use the values from previous column to update this state in this column
			for r in range(num_states):
				# compute the log-probabilities leading to this state in this column
				# temp_arr = []
				# for prev_r in range(num_states):
				# 	prev_l_log = max_l_log[prev_r][c-1]
				# 	transition_prob_log = self.edges[prev_r][r]
				# 	temp_arr.append(prev_l_log + transition_prob_log)
				# temp_arr = np.array(temp_arr)
				temp_arr = np.array([max_l_log[prev_r][c - 1] + self.edges[prev_r][r] for prev_r in range(num_states)])				
				# determine the max
				prev_r_max = np.argmax(temp_arr)
				# update the values and previous_max_l table
				prob_of_this_observation = list(self.distributions[r].parameters())[1][0][X[0, c, 0]].item()
				
				max_l_log[r][c] = temp_arr[prev_r_max] + np.log(prob_of_this_observation)
				previous_max_l[r][c] = prev_r_max
		# max_l_log = np.array(max_l_log)
		last_col = max_l_log[:, -1]
		max_row = np.argmax(last_col)
		
		most_likely_path = []
		most_likely_path.append(max_row)
		current = max_row
		for c in range(sequence_length-1, 0, -1):
			prev_r = previous_max_l[current][c]
			assert prev_r is not None
			most_likely_path.append(prev_r)
			current = prev_r
		
		return np.exp(last_col[max_row]), most_likely_path[::-1]

In [25]:
len(data.tagset)

12

In [26]:
# implementation of a basic HMM tagger

from pomegranate.distributions import Categorical
# must assign an index/ID to each tag and each word first, because pomegranate needs those indices for distributions or the states
dict_id_to_tag = {}
dict_tag_to_id = {}
for id, tag in enumerate(data.tagset):
	dict_id_to_tag[id] = tag
	dict_tag_to_id[tag] = id
dict_id_to_word = {}
dict_word_to_id = {}
for id, word in enumerate(data.vocab):
	dict_id_to_word[id] = word
	dict_word_to_id[word] = id
	

In [107]:
# emission probability distributions
emission_distributions = {}
for tag in data.training_set.tagset:
	probs_for_this_tag = []
	for word_id in range(len(data.vocab)):
		word = dict_id_to_word[word_id]
		if word in emission_counts[tag]:
			prob = emission_counts[tag][word] / tag_unigrams[tag]
		else:
			prob = 10**(-10)
		probs_for_this_tag.append(prob)
	assert len(probs_for_this_tag) == len(data.vocab)
	probs_for_this_tag = np.array(probs_for_this_tag)
	probs_for_this_tag = probs_for_this_tag / np.sum(probs_for_this_tag)
	assert abs(sum(probs_for_this_tag) - 1) < 0.00000001 
	emission_distributions[tag] = Categorical([probs_for_this_tag])

# adding these distributions to hmm model
hmm_model = MyHMM()
hmm_model.add_distributions([emission_distributions[dict_id_to_tag[i]] for i in range(len(data.training_set.tagset))])

  return torch.tensor(value)


In [108]:
num_of_tags = len(data.training_set.tagset)
# get the count of total number of "transitions" - this is equals to the total number of words - the number of "ending words," essentially: num of all words - num of sentences. This will be the denominator in the transition probabilities
count_all_transitions = data.training_set.N - len(data.training_set.X)

for tag1_id in range(num_of_tags):
	for tag2_id in range(num_of_tags):
		tag1 = dict_id_to_tag[tag1_id]
		tag2 = dict_id_to_tag[tag2_id]
		# retrieve the counts of transition from tag1 to tag2
		try:
			count_transitions = tag_bigrams[(tag1, tag2)]
		except KeyError:
			count_transitions = 1
			count_all_transitions += 1
		# compute the probability
		prob = count_transitions / count_all_transitions
		# add the transition probability to the HMM
		if prob > 0:
			dist_tag1 = emission_distributions[tag1]
			dist_tag2 = emission_distributions[tag2]
			# print(f"adding edge between {dist_tag1} and {dist_tag2}: prob = {prob}")
			hmm_model.add_edge(dist_tag1, dist_tag2, prob)

In [109]:
# add edges from starting state to each tag
count_sentences = len(data.training_set.X)
num_of_tags = len(data.training_set.tagset)
start_probs = []
for tag_id in range(num_of_tags):
	tag = dict_id_to_tag[tag_id]
	prob = tag_starts[tag] / count_sentences
	start_probs.append(prob)
	dist_tag = emission_distributions[tag]
	hmm_model.add_edge(hmm_model.start, dist_tag, prob)
assert abs(sum(start_probs) - 1) < 0.00000001

In [110]:
# add edges to model.end
count_sentences = len(data.training_set.X)
num_of_tags = len(data.training_set.tagset)
end_probs = []
for tag_id in range(num_of_tags):
	tag = dict_id_to_tag[tag_id]
	prob = tag_ends[tag] / count_sentences
	end_probs.append(prob)
	dist_tag = emission_distributions[tag]
	hmm_model.add_edge(dist_tag, hmm_model.end, prob)
assert abs(sum(end_probs) - 1) < 0.00000001

In [111]:
# now testing the HMM model

xx = data.training_set.X[0]
def convert_words_to_ids(xx):
	sentence_converted_to_word_id = [dict_word_to_id[word] for word in xx]
	return sentence_converted_to_word_id

def convert_ids_to_tags(tag_id_list):
	converted_to_tags = [dict_id_to_tag[tag_id] for tag_id in tag_id_list]
	return converted_to_tags

for i in range(5):
	xx = data.training_set.X[i]
	_, tag_id_list = hmm_model.viterbi(convert_words_to_ids(xx))
	print(xx)
	# print(tag_id_list)
	print(convert_ids_to_tags(tag_id_list))
	print()
	print()

('Whenever', 'artists', ',', 'indeed', ',', 'turned', 'to', 'actual', 'representations', 'or', 'molded', 'three-dimensional', 'figures', ',', 'which', 'were', 'rare', 'down', 'to', '800', 'B.C.', ',', 'they', 'tended', 'to', 'reflect', 'reality', '(', 'see', 'Plate', '6a', ',', '9b', ')', ';', ';')
['ADV', 'NOUN', '.', 'ADV', '.', 'VERB', 'ADP', 'ADJ', 'NOUN', 'CONJ', 'VERB', 'ADJ', 'NOUN', '.', 'DET', 'VERB', 'ADJ', 'PRT', 'ADP', 'NUM', 'NOUN', '.', 'PRON', 'VERB', 'PRT', 'VERB', 'NOUN', '.', 'VERB', 'NOUN', 'NUM', '.', 'NUM', '.', '.', '.']

('For', 'almost', 'two', 'months', ',', 'the', 'defendant', 'and', 'the', 'world', 'heard', 'from', 'individuals', 'escaped', 'from', 'the', 'grave', 'about', 'fathers', 'and', 'mothers', ',', 'graybeards', ',', 'adolescents', ',', 'babies', ',', 'starved', ',', 'beaten', 'to', 'death', ',', 'strangled', ',', 'machine-gunned', ',', 'gassed', ',', 'burned', '.')
['ADP', 'ADV', 'NUM', 'NOUN', '.', 'DET', 'NOUN', 'CONJ', 'DET', 'NOUN', 'VERB', 'ADP'

In [121]:
# Evaluating model accuracy
def hmm_accuracy(X, Y, model):
	"""
	Calculate the prediction accuracy by using the model to decode each sequence in the input X and comparing the prediction with the true labels in Y.
	
	The X should be an array whose first dimensions is the number of sentences to test, and each element of the array should be an iterable of the words in the sequence. The arrays X and Y should have the exact same shape.
	"""
	correct = total_predictions = 0
	num_sentences = len(X)
	for i, (observations, actual_tags) in enumerate(zip(X, Y)):
		# print(f"Parsing sentence {i} out of {num_sentences}")
		# if there's any exception, count the full sentence as an error (which makes this a conservative estimate)
		try:
			# most_likely_tags = simplify_decoding(observations, model)
			tem = convert_words_to_ids(observations)
			_, tag_id_list = hmm_model.viterbi(tem)
			most_likely_tags = convert_ids_to_tags(tag_id_list)
			count_correct_this_sentence = sum(p==t for p, t in zip(most_likely_tags, actual_tags))
			# print(f"Parsing sentence {i} out of {num_sentences} - correct {count_correct_this_sentence}/{len(actual_tags)} tags")
			correct += count_correct_this_sentence
		except:
			pass
		total_predictions += len(observations)
	return correct / total_predictions

In [122]:
hmm_training_acc = hmm_accuracy(data.training_set.X[:1000], data.training_set.Y[:1000], hmm_model)
print(f"training accuracy - basic HMM: {100 * hmm_training_acc:.2f}%")

hmm_testing_acc = hmm_accuracy(data.testing_set.X[:1000], data.testing_set.Y[:1000], hmm_model)
print(f"testing accuracy - basic HMM: {100 * hmm_testing_acc:.2f}%")

training accuracy - basic HMM: 96.66%
testing accuracy - basic HMM: 95.46%
