In [4]:
import pandas as pd
import numpy as np
from collections import defaultdict
import math
import torch
import torchtext

# Read in train dataset from txt file
> - file associates words and corresponding part of speech (POS) tags
> - one such file can be acquired from Penn Treebank dataset (WSJ articles)
> - only reads the first read_lim word/POS pairs (for small sample testing opt.)
> - vocab_lim is a fraction between 0 and 1 which adds that fraction of the 
>   words to the vocabulary so that the model can account for the <unk> token

In [5]:
START_TOKEN = '<s>'
UNKNOWN_TOKEN = '<unk>'

def read_pos_tags(path,read_lim,vocab_lim,verbose=False):
    vocab = dict()
    vocab[UNKNOWN_TOKEN] = 0
    pos_all = dict()
    tran = dict()
    emi = dict()
    last_pos = START_TOKEN
    with open(path,'r') as f:
        lines = f.readlines()
        if verbose:
            print(len(lines))
        for i, line in enumerate(lines):
            line_L = line.split()
            if not line_L:
                line_L = [START_TOKEN,START_TOKEN]
            
            assert(len(line_L) == 2)

            if i < int(len(lines)*vocab_lim):
                if line_L[0] not in vocab:
                    vocab[line_L[0]] = 1
                else:
                    vocab[line_L[0]] += 1
            else:
                if line_L[0] not in vocab:
                    line_L[0] = UNKNOWN_TOKEN
                vocab[line_L[0]] += 1

            this_emi = (line_L[1],line_L[0])
            if this_emi not in emi:
                emi[this_emi] = 1
            else:
                emi[this_emi] += 1

            this_tran = (last_pos,line_L[1])
            if this_tran not in tran:
                tran[this_tran] = 1
            else:
                tran[this_tran] += 1

            if last_pos not in pos_all:
                pos_all[last_pos] = 1
            else:
                pos_all[last_pos] += 1

            if i == read_lim-1:
                break

            last_pos = line_L[1]
        return vocab, pos_all, tran, emi


#after counting word-POS associations, convert to log probabilities and
#also perform smoothing to the probabilities
def calc_probs(vocab, pos_all, tran, emi, k = 1):
    num_vocab = len(vocab.keys())
    num_pos = len(pos_all.keys())
    for key, val in tran.items():
        tran[key] = math.log((val + k)/(pos_all[key[0]] + k*num_pos))

    for key, val in emi.items():
        emi[key] = math.log((val + k)/(pos_all[key[0]] + k*num_vocab))

In [6]:
p = '../data/WSJPartOfSpeech/POS_tags_train.txt'
k = 1 #smoothing constant

vocab, pos_all, tran, emi = read_pos_tags(p,np.inf,0.5, True)
calc_probs(vocab, pos_all, tran, emi, k)

989860


In [7]:
#assign a unique index to all POS tags for quick access
#create dictionaries to convert between POS tag (string), its index, and back

pos_to_ind = dict()
for i,pos in enumerate(pos_all.keys()):
    pos_to_ind[pos] = i - 1
del pos_to_ind[START_TOKEN]

ind_to_pos = {v:k for k,v in pos_to_ind.items()}

# Viterbi algorithm implementation
> - sentence is a list of words (strings)
> - tran: transition probabilities matrix
> - emi: emissions probabilities matrix
> - pos_to_ind and ind_to_pos_in are dictionaries that convert between POS and ind
> - vocab_in is the vocabulary dictionary

In [14]:
def get_viterbi_probs(sentence, tran, emi, pos_to_ind_in, ind_to_pos_in , vocab_in, k, verbose=False):
    num_pos_v = len(pos_to_ind_in)
    num_vocab_v = len(vocab)
    vit_probs = np.empty(shape=(num_pos_v,len(sentence)))
    back_ptrs = np.empty(shape=(num_pos_v,len(sentence)))

    #initialize first column of viterbi probabilities matrix
    for key,val in pos_to_ind_in.items():
        vit_probs[val,0] = 0

        #add transition log prob from start token
        if (START_TOKEN,key) in tran:
            vit_probs[val,0] += tran[(START_TOKEN,key)] 
        else: 
            vit_probs[val,0] += math.log((k)/(pos_all[START_TOKEN] + k*num_pos_v))
        
        #add emission log prob given the first word
        if (key,sentence[0]) in emi:
            vit_probs[val,0] += emi[(key,sentence[0])]
        else:
            vit_probs[val,0] += math.log((k)/(pos_all[key] + k*num_vocab_v))

    #viterbi forward pass
    for w_ind_last,word in enumerate(sentence[1:]):
        if word not in vocab_in:
            word = UNKNOWN_TOKEN
        for pos_this,ind_this in pos_to_ind_in.items():
            best_prob = np.NINF
            best_ind = -1
            for pos_last,ind_last in pos_to_ind_in.items():
                prob = vit_probs[ind_last,w_ind_last]
                if (pos_last,pos_this) in tran:
                    prob += tran[(pos_last,pos_this)] 
                else:
                    prob += math.log((k)/(pos_all[pos_last] + k*num_pos_v))
                
                if (pos_this,sentence[w_ind_last+1]) in emi:
                    prob += emi[(pos_this,sentence[w_ind_last+1])]
                else:
                    prob += math.log((k)/(pos_all[pos_this] + k*num_vocab_v))

                if prob > best_prob:
                    best_prob = prob
                    best_ind = ind_last
            vit_probs[ind_this,w_ind_last+1] = best_prob
            back_ptrs[ind_this,w_ind_last+1] = best_ind

    tag_inds = list()
    tag_inds.append(np.argmax(vit_probs[:,len(sentence)-1]))
    for w_ind in range(len(sentence)-1,0,-1):
        next_ind = back_ptrs[int(tag_inds[-1]),w_ind]
        tag_inds.append(next_ind)
    if(verbose):
        print(vit_probs)
        print(back_ptrs)
    tags_output = [ind_to_pos_in[x] for x in reversed(tag_inds)]

    return tags_output


# Example of Viterbi prediction
> - Give pre-defined transition and emission matrices to ensure viterbi function
>   works properly.

In [9]:
sentence1 = ['happy','finds','optimists']
tran1 = {
            ('N','N'):0.1,
            ('N','V'):0.25,
            ('N','A'):0.65,
            ('V','N'):0.75,
            ('V','V'):0.15,
            ('V','A'):0.1,
            ('A','N'):0.35,
            ('A','V'):0.2,
            ('A','A'):0.45,
            (START_TOKEN,'N'):0.2,
            (START_TOKEN,'V'):0.3,
            (START_TOKEN,'A'):0.5,
        }

emi1 = {
            ('N','happy'):0.4,
            ('N','finds'):0.25,
            ('N','optimists'):0.35,
            ('V','happy'):0.05,
            ('V','finds'):0.75,
            ('V','optimists'):0.2,
            ('A','happy'):0.2,
            ('A','finds'):0.3,
            ('A','optimists'):0.5,
        }

pos_ind1 = {'N':0,'V':1,'A':2}
ind_pos1 = {0:'N',1:'V',2:'A'}
vocab1 = {'happy','finds','optimists'}
get_viterbi_probs(sentence1, tran1, emi1, pos_ind1, ind_pos1, vocab1, True)

[[0.6  1.35 2.75]
 [0.35 1.65 2.  ]
 [0.7  1.55 2.5 ]]
[[0.00000000e+000 1.00000000e+000 1.00000000e+000]
 [0.00000000e+000 2.00000000e+000 1.00000000e+000]
 [1.24610723e-306 0.00000000e+000 0.00000000e+000]]


['A', 'V', 'N']

In [10]:
#reads the test set from a separate txt file
#vocab_in is the vocab dicitonary from the test set
#read_lim limits the number of word/POS pairs used in the test set
def read_test_set(path, vocab_in, read_lim = np.inf,start_token=START_TOKEN):
    last_pos = start_token
    words_test = []
    tags_test = []
    with open(path) as f:
        lines = f.readlines()
        for i, line in enumerate(lines):
            if i == read_lim:
                break
            line_L = line.split()
            if not line_L:
                line_L = [start_token,start_token]
            try:
                assert(len(line_L) == 2)
            except:
                print(line_L)

            if line_L[0] not in vocab_in:
                line_L[0] = UNKNOWN_TOKEN

            words_test.append(line_L[0])
            tags_test.append(line_L[1])

        
        return words_test, tags_test





In [11]:
X_test,Y_test = read_test_set('../data/WSJPartOfSpeech/POS_tags_test.txt',vocab)

In [24]:
#Breaks up list of words (in word_L) into sentences and applies viterbi 
#algorithm on each sentence.
def vit_predict(word_L,get_vit_in, tran_in, emi_in, pos_to_ind_in, ind_to_pos_in,vocab_in,k=1):
    out_tags = list()
    
    end_sent_inds = [-1]
    end_sent_inds += [i for i,word in enumerate(word_L) if word == START_TOKEN]
    end_sent_inds.append(len(word_L))

    for i in range(len(end_sent_inds)-2):
        start_ind = end_sent_inds[i]
        end_ind = end_sent_inds[i+1]
        sent = word_L[start_ind+1:end_ind]
        out_tags += get_vit_in(sent, tran_in, emi_in, pos_to_ind_in, ind_to_pos_in,vocab_in,k)
        out_tags.append(START_TOKEN)

    return out_tags



In [25]:
h_test = vit_predict(X_test,get_viterbi_probs,tran,emi,pos_to_ind,ind_to_pos,vocab)

In [33]:
#This viterbi algorithm has 92% accuracy on the test set
Y_test_no_starts = [el for el in Y_test if el != START_TOKEN]
h_test_no_starts = [el for el in h_test if el != START_TOKEN]

Y_test_np = np.array(Y_test_no_starts)
h_test_np = np.array(h_test_no_starts)

np.sum((Y_test_np == h_test_np).astype(int))/len(Y_test_np)

0.9278300307430067