In [25]:
import pandas as pd
from collections import defaultdict
import math
import numpy as np
import string 
# Punctuation characters
punct = set(string.punctuation)

# Morphology rules used to assign unknown word tokens
noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]

# Additive smoothing parameter
alpha = 0.001

### Important functions

In [3]:
def get_states(target_corpus):
    
    states = set()
    observations = set()
    
    for senten in target_corpus:
        for tup in senten:
            
            word , label = tup
            
            states.add(label)
            
            observations.add(word)
            
    observations.add('<s>')
    states.add('<s>')
    
    return tuple(states), tuple(observations)

In [26]:
def assign_unk(tok):
    """
    Assign unknown word tokens
    """
    # Digits
    if any(char.isdigit() for char in tok):
        return "--unk_digit--"

    # Punctuation
    elif any(char in punct for char in tok):
        return "--unk_punct--"

    # Upper-case
    elif any(char.isupper() for char in tok):
        return "--unk_upper--"

    # Nouns
    elif any(tok.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"

    # Verbs
    elif any(tok.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"

    # Adjectives
    elif any(tok.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"

    # Adverbs
    elif any(tok.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"

    return "--unk--"

In [81]:
def get_word_tag(line,vocab):
    
    if not line.split():
        
        word = "--n--"
        tag = "--s--"
        
        return word, tag
    else:
        word,tag = line.split()
        if word not in vocab:
            word = assign_unk(word) 
        return word, tag
    return None


In [28]:
def preprocess(vocab, data_fp):
    """
    Preprocess data
    """
    orig = []
    prep = []
    punct = string.punctuation 

    # Read data
    with open(data_fp, "r") as data_file:

        for cnt, word in enumerate(data_file):

            # End of sentence
            if not word.split():
                orig.append(word.strip())
                word = "--n--"
                prep.append(word)
                continue

            # Handle unknown words
            elif word.strip() not in vocab:
                orig.append(word.strip())
                word = assign_unk(word)
                prep.append(word)
                continue

            else:
                orig.append(word.strip())
                prep.append(word.strip())

    assert(len(orig) == len(open(data_fp, "r").readlines()))
    assert(len(prep) == len(open(data_fp, "r").readlines()))

    return orig, prep

### Importing Training Corpus

In [2]:
with open ("./data/WSJ_02-21.pos", 'r') as f:
    
    training_corpus = f.readlines()

In [4]:
training_corpus[0:5]

['In\tIN\n', 'an\tDT\n', 'Oct.\tNNP\n', '19\tCD\n', 'review\tNN\n']

### Importing Vocab

In [72]:
with open ("./data/hmm_vocab.txt", 'r') as f:
    
    vocab_l = f.read().split('\n')

In [73]:
vocab_l[-5:]

['zones', 'zoning', '{', '}', '']

In [82]:
# vocab: dictionary that has the index of the corresponding words
vocab = {}

# Get the index of the corresponding words. 
for i, word in enumerate(sorted(vocab_l)): 
    vocab[word] = i       
    
print("Vocabulary dictionary, key is the word, value is a unique integer")
cnt = 0
for k,v in vocab.items():
    print(f"{k}:{v}")
    cnt += 1
    if cnt > 20:
        break
    

Vocabulary dictionary, key is the word, value is a unique integer
:0
!:1
#:2
$:3
%:4
&:5
':6
'':7
'40s:8
'60s:9
'70s:10
'80s:11
'86:12
'90s:13
'N:14
'S:15
'd:16
'em:17
'll:18
'm:19
'n':20


### Importing Testing Corpus

In [75]:
# load in the test corpus
with open("./data/WSJ_24.pos", 'r') as f:
    y = f.readlines()

In [76]:
print("A sample of the test corpus")
print(y[0:10])

A sample of the test corpus
['The\tDT\n', 'economy\tNN\n', "'s\tPOS\n", 'temperature\tNN\n', 'will\tMD\n', 'be\tVB\n', 'taken\tVBN\n', 'from\tIN\n', 'several\tJJ\n', 'vantage\tNN\n']


### Corpus without tags (preprocessed)

In [77]:
_,prep = preprocess(vocab,"./data/test.words")

In [78]:
print('The length of the preprocessed test corpus: ', len(prep))
print('This is a sample of the test_corpus: ')
print(prep[0:10])

The length of the preprocessed test corpus:  34199
This is a sample of the test_corpus: 
['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk--']


### Instructions: Write a program that takes in the training_corpus and returns the three dictionaries mentioned above transition_counts, emission_counts, and tag_counts.

1. emission_counts: maps (tag, word) to the number of times it happened.
2. transition_counts: maps (prev_tag, tag) to the number of times it has appeared.
3. tag_counts: maps (tag) to the number of times it has occured.

In [84]:
def create_dictionaries(training_corpus, vocab):
    
    transition_counts = defaultdict(int)
    emission_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    
    prev_tag = '--s--'
    
    for word_tag in training_corpus:
        
        word, tag = get_word_tag(word_tag,vocab)
        
        transition_counts[(prev_tag,tag)] +=1
        emission_counts[(tag, word)] +=1
        tag_counts[tag] +=1
        
        prev_tag = tag
        
        
        
    return transition_counts,emission_counts,tag_counts

In [85]:
transition_counts,emission_counts,tag_counts = create_dictionaries(training_corpus, vocab)

In [86]:
states = sorted(tag_counts)

In [44]:
states

['#',
 '$',
 "''",
 '(',
 ')',
 ',',
 '--s--',
 '.',
 ':',
 'CC',
 'CD',
 'DT',
 'EX',
 'FW',
 'IN',
 'JJ',
 'JJR',
 'JJS',
 'LS',
 'MD',
 'NN',
 'NNP',
 'NNPS',
 'NNS',
 'PDT',
 'POS',
 'PRP',
 'PRP$',
 'RB',
 'RBR',
 'RBS',
 'RP',
 'SYM',
 'TO',
 'UH',
 'VB',
 'VBD',
 'VBG',
 'VBN',
 'VBP',
 'VBZ',
 'WDT',
 'WP',
 'WP$',
 'WRB',
 '``']

### Instructions: Implement predict_pos that computes the accuracy of your model.

1. This is a warm up exercise.
2. To assign a part of speech to a word, assign the most frequent POS for that word in the training set.
3. Then evaluate how well this approach works. Each time you predict based on the most frequent POS for the given word, check whether the actual POS of that word is the same. If so, the prediction was correct!
4. Calculate the accuracy as the number of correct predictions divided by the total number of words for which you predicted the POS tag.

In [87]:
def predict_pos(prep, y, emission_counts, vocab, states):
    num_correct = 0
    for word, tup in zip(prep,y):
        
        tup_list = tup.split()
        
        if len(tup_list) ==2:
            true_label = tup_list[1]
            
        else:
            continue
            
        count_final = 0
        pos_final = ''  
        if word in vocab:
            
            for pos in states:
                
                key = (pos,word)
                
                if key in emission_counts:
                    
                    count = emission_counts[key]
                    
                    if count_final < count:
                        count_final = count
                        pos_final = pos
                         
            if true_label == pos_final:
                num_correct += 1
                
    return num_correct/len(y) , num_correct

In [88]:
predict_pos(prep, y, emission_counts, vocab, states)
#print(f"Accuracy of prediction using predict_pos is {accuracy_predict_pos:.4f}")

(0.8888563993099213, 30398)

# Hidden Markov Model

### Instructions: Implement the create_transition_matrix below for all tags. Your task is to output a matrix that computes equation 3 for each cell in matrix A.



In [91]:
def create_transition_matrix(alpha, tag_counts, transition_counts):
    all_tags = sorted(tag_counts.keys())
    num_tags = len(all_tags)
    
    A = np.zeros((num_tags,num_tags))
    
    trans_keys = set(transition_counts.keys())
    
    for i in range(num_tags):
        
        for j in range(num_tags):
            
            count = 0
            
            key = (all_tags[i],all_tags[j])
            
            if key in trans_keys:
                
                count = transition_counts[key]
                
            count_prev_tag = tag_counts[all_tags[i]]
            
            A[i,j] = (count + alpha) / (count_prev_tag + alpha * num_tags)
            
    return A     

    

In [92]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)

In [94]:
A_sub = pd.DataFrame(A[30:35,30:35], index=states[30:35], columns = states[30:35] )
print(A_sub)

              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02


### Instructions: Implement the create_emission_matrix below that computes the B emission probabilities matrix. 

Your function takes in alpha, the smoothing parameter, tag_counts, which is a dictionary mapping each tag to its respective count, the emission_counts dictionary where the keys are (tag, word) and the values are the counts. Your task is to output a matrix that computes equation 4 for each cell in matrix B.

In [103]:
def create_emission_matrix (alpha, tag_counts, emission_counts,vocab):
    
    all_tags = sorted(tag_counts.keys())
    num_tags = len(all_tags)
    
    
    emiss_keys = set(list(emission_counts.keys()))
    
    no_words = len(vocab)
    
    B = np.zeros((num_tags,no_words))
    
    for i in range(num_tags):
        
        for j in range(no_words):
            
            count = 0
            
            key = (all_tags[i],vocab[j])
            
            if key in emiss_keys:
                
                count = emission_counts[key]
                
            count_tag = tag_counts[all_tags[i]]
            
            B[i,j] = (count + alpha) / (count_tag + alpha * no_words)
            
    return B  

In [104]:
# creating your emission probability matrix. this takes a few minutes to run. 
alpha = 0.001
B = create_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

print(f"View Matrix position at row 0, column 0: {B[0,0]:.9f}")
print(f"View Matrix position at row 3, column 1: {B[3,1]:.9f}")

View Matrix position at row 0, column 0: 0.000006032
View Matrix position at row 3, column 1: 0.000000720


In [106]:
# Try viewing emissions for a few words in a sample dataframe
cidx  = ['725','adroitly','engineers', 'promoted', 'synergy']

# Get the integer ID for each word
cols = [vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]
# Get the emissions for the sample of words, and the sample of POS tags
B_sub = pd.DataFrame(B[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(B_sub)

              725      adroitly     engineers      promoted       synergy
CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07


# Viterbi Algorithm and Dynamic Programming

1. Initialization - In this part you initialize the best_paths and best_probabilities matrices that you will be populating in feed_forward.
2. Feed forward - At each step, you calculate the probability of each path happening and the best paths up to that point.
3. Feed backward: This allows you to find the best path with the highest probabilities.

In [108]:
def initialize(states, tag_counts, A, B, corpus, vocab):
    
    # Get the total number of unique POS tags
    num_tags = len(tag_counts)
    best_probs = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype = int)
    
    s_indx = states.index('--s--')
    
    for i in range(num_tags):
        
        if A[s_indx,i] == 0:
            best_probs[i,0] = float('-inf')
            
        else:
            best_probs[i,0] = math.log(A[s_indx,i]) + math.log(B[i,vocab[corpus[0]]])
            
            
    return best_probs, best_paths

In [109]:
best_probs, best_paths = initialize(states, tag_counts, A, B, prep, vocab)

In [110]:
# Test the function
print(f"best_probs[0,0]: {best_probs[0,0]:.4f}")
print(f"best_paths[2,3]: {best_paths[2,3]:.4f}")

best_probs[0,0]: -22.6098
best_paths[2,3]: 0.0000


### Instructions: Implement the viterbi_forward algorithm and store the best_path and best_prob for every possible tag for each word in the matrices best_probs and best_tags

In [112]:
def viterbi_forward(A,B,test_corpus, best_probs, best_paths, vocab):
    
    num_tags = best_probs.shape[0]
    
    for i in range(1, len(test_corpus)):
        # For each unique POS tag that the current word can be
        for j in range(num_tags):
            
            best_prob_i = float("-inf")
            best_path_i = None
            
            # For each POS tag that the previous word can be
            for k in range(num_tags):
                
                if test_corpus[i] in vocab:
                    prob = best_probs[k,i-1] +math.log(A[k,j]) + math.log(B[j,vocab[test_corpus[i]]])
                    
                else:
                    prob = float("-inf")
                    
                # check if this path's probability is greater than the best probability up to and before this point
                if prob > best_prob_i:
                    # Keep track of the best probability
                    best_prob_i = prob
                    # keep track of the POS tag of the previous word that is part of the best path
                    best_path_i = k
            # Save the best probability for the given current word's POS tag and the position of the current word inside the corpus
            best_probs[j, i] = best_prob_i
            
            # Save the unique integer ID of the previous POS tag into best_paths matrix, for the POS tag of the current word
            # and the position of the current word inside the corpus.
            best_paths[j, i] = best_path_i

    return best_probs, best_paths

In [113]:
# this will take a few minutes to run => processes ~ 30,000 words
best_probs, best_paths = viterbi_forward(A, B, prep, best_probs, best_paths, vocab)

In [114]:
# Test this function 
print(f"best_probs[0,1]: {best_probs[0,1]:.4f}") 
print(f"best_probs[0,4]: {best_probs[0,4]:.4f}") 

best_probs[0,1]: -24.7822
best_probs[0,4]: -49.5601


### Implement the viterbi_backward algorithm, which returns a list of predicted POS tags for each word in the corpus.

Note that the numbering of the index positions starts at 0 and not 1.
m is the number of words in the corpus.
So the indexing into the corpus goes from 0 to m - 1.
Also, the columns in best_probs and best_paths are indexed from 0 to m - 1

In [116]:
def viterbi_backward(best_probs, best_paths, corpus, states):
    
    m = best_paths.shape[1]
    
    # Initialize array z, same length as the corpus
    z = [None] * m
    
    # Get the number of unique POS tags
    num_tags = best_probs.shape[0]
    
    # Initialize the best probability for the last word
    best_prob_for_last_word = float('-inf')
    
    # Initialize pred array, same length as corpus
    pred = [None] * m
    
    
    # Go through each POS tag for the last word (last column of best_probs)
    # in order to find the row (POS tag integer ID) 
    # with highest probability for the last word
    for k in range(num_tags):
        # If the probability of POS tag at row k 
        # is better than the previously best probability for the last word:
        if best_probs[k, m-1] > best_prob_for_last_word:
            # Store the new best probability for the last word
            best_prob_for_last_word = best_probs[k, m-1]
            # Store the unique integer ID of the POS tag
            # which is also the row number in best_probs
            z[m - 1] = k
            
    # Convert the last word's predicted POS tag
    # from its unique integer ID into the string representation
    # using the 'states' list
    # store this in the 'pred' array for the last word
    pred[m - 1] = states[z[m - 1]]
    
    ## Step 2 ##
    # Find the best POS tags by walking backward through the best_paths
    # From the last word in the corpus to the 0th word in the corpus
    for i in range(m-1, 0, -1):
        # Retrieve the unique integer ID of the POS tag for the word at position 'i' in the corpus
        pos_tag_for_word_i = z[i]
        
        # In best_paths, go to the row representing the POS tag of word i
        # and the column representing the word's position in the corpus
        # to retrieve the predicted POS for the word at position i-1 in the corpus
        z[i - 1] = best_paths[pos_tag_for_word_i, i]
        
        # Get the previous word's POS tag in string form
        # Use the 'states' list, 
        # where the key is the unique integer ID of the POS tag,
        # and the value is the string representation of that POS tag
        pred[i - 1] = states[z[i - 1]]
        

    return pred


In [117]:
# Run and test your function
pred = viterbi_backward(best_probs, best_paths, prep, states)
m=len(pred)
print('The prediction for pred[-7:m-1] is: \n', prep[-7:m-1], "\n", pred[-7:m-1], "\n")
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", prep[0:7])

The prediction for pred[-7:m-1] is: 
 ['see', 'them', 'here', 'with', 'us', '.'] 
 ['VB', 'PRP', 'RB', 'IN', 'PRP', '.'] 

The prediction for pred[0:8] is: 
 ['DT', 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN'] 
 ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken']
