<a href="https://colab.research.google.com/github/nitsansoffair/pos/blob/master/pos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# parts of speech tagging

In [None]:
from utils_pos import get_word_tag, preprocess  
import pandas as pd
from collections import defaultdict
import math
from math import log
import numpy as np
import w2_unittest

In [None]:
with open("./data/WSJ_02-21.pos", 'r') as f:
    training_corpus = f.readlines()

In [None]:
with open("./data/hmm_vocab.txt", 'r') as f:
    voc_l = f.read().split('\n')

In [None]:
vocab = {}
for i, word in enumerate(sorted(voc_l)): 
    vocab[word] = i 

In [None]:
with open("./data/WSJ_24.pos", 'r') as f:
    y = f.readlines()

In [None]:
_, prep = preprocess(vocab, "./data/test.words")     

print('The length of the preprocessed test corpus: ', len(prep))
print('This is a sample of the test_corpus: ')
print(prep[0:10])

The length of the preprocessed test corpus:  34199
This is a sample of the test_corpus: 
['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken', 'from', 'several', '--unk--']


#### Create dictionaries procedure.

In [None]:
def create_dictionaries(training_corpus, vocab, verbose=False):
    emission_counts, transition_counts, tag_counts = defaultdict(int), defaultdict(int), defaultdict(int)
    prev_tag = '--s--'
    i = 0
    for word_tag in training_corpus:
        i += 1
        if i % 50000 == 0 and verbose:
            print(f"word count = {i}")
        word, tag = get_word_tag(word_tag, vocab)
        transition_counts[(prev_tag, tag)] += 1
        emission_counts[(tag, word)] += 1
        tag_counts[tag] += 1
        prev_tag = tag
    return emission_counts, transition_counts, tag_counts

In [None]:
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)

In [None]:
states = sorted(tag_counts.keys())
print(f"Number of POS tags (number of 'states'): {len(states)}")
print("View these POS tags (states)")
print(states)

Number of POS tags (number of 'states'): 46
View these POS tags (states)
['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']


In [None]:
print("transition examples: ")
for ex in list(transition_counts.items())[:3]:
    print(ex)
print()

print("emission examples: ")
for ex in list(emission_counts.items())[200:203]:
    print (ex)
print()

print("ambiguous word example: ")
for tup,cnt in emission_counts.items():
    if tup[1] == 'back': print (tup, cnt) 

transition examples: 
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)

emission examples: 
(('DT', 'any'), 721)
(('NN', 'decrease'), 7)
(('NN', 'insider-trading'), 5)

ambiguous word example: 
('RB', 'back') 304
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4


#### Predict pos of word procedure.

In [None]:
def predict_pos(prep, y, emission_counts, vocab, states):
    num_correct, total = 0, len(y)
    for word_tag, y_tup in zip(prep, y):
        y_tup_l = y_tup.split()
        if len(y_tup_l) == 2:
            true_label = y_tup_l[1]
        else:
            continue
        pos_final, count_final = '', 0
        word = word_tag.split('\t')[0]
        if word in list(vocab.keys()):
            for pos in states:
                key = (pos, word)
                if key in emission_counts.keys():
                    count = emission_counts[key]
                    if count > count_final:
                        count_final, pos_final = count, key[0]
            if pos_final == true_label:
                num_correct += 1
    return num_correct / total

In [None]:
accuracy_predict_pos = predict_pos(prep, y, emission_counts, vocab, states)
print(f"Accuracy of prediction using predict_pos is {accuracy_predict_pos:.4f}")

Accuracy of prediction using predict_pos is 0.8889


#### Create transition matrix procedure.

In [None]:
def create_transition_matrix(alpha, tag_counts, transition_counts):
    all_tags = sorted(tag_counts.keys())
    num_tags = len(all_tags)
    A = np.zeros((num_tags, num_tags))
    trans_keys = set(transition_counts.keys())
    for i in range(num_tags):
        for j in range(num_tags):
            count = 0
            key = (all_tags[i], all_tags[j]) 
            if key in trans_keys:
                count = transition_counts[key]
            count_prev_tag = tag_counts[all_tags[i]]
            A[i, j] = (count + alpha) / (count_prev_tag + num_tags * alpha)
    return A

In [None]:
alpha = 0.001
A = create_transition_matrix(alpha, tag_counts, transition_counts)
print(f"A at row 0, col 0: {A[0,0]:.9f}")
print(f"A at row 3, col 1: {A[3,1]:.4f}")

print("View a subset of transition matrix A")
A_sub = pd.DataFrame(A[30:35,30:35], index=states[30:35], columns = states[30:35] )
print(A_sub)

A at row 0, col 0: 0.000007040
A at row 3, col 1: 0.1691
View a subset of transition matrix A
              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02


#### Create emission matrix procedure.

In [None]:
def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    num_tags, num_words = len(tag_counts), len(vocab.keys())
    emission_matrix = np.zeros((num_tags, num_words))
    emis_keys, vocab_keys = list(emission_counts.keys()), list(vocab.keys())
    for tag in range(num_tags):
        sum_rows = 0
        for word in range(num_words):
            key = (list(tag_counts.keys())[tag], vocab_keys[word])
            count = emission_counts[key] if key in emission_counts.keys() else 0
            emission_matrix[tag, word] = count + alpha
            sum_rows += emission_matrix[tag, word]
        emission_matrix[tag, :] /= sum_rows
    return emission_matrix

In [None]:
alpha = 0.001
B = create_emission_matrix(alpha, tag_counts, emission_counts, vocab)

print(f"View Matrix position at row 0, column 0: {B[0,0]:.9f}")
print(f"View Matrix position at row 3, column 1: {B[3,1]:.9f}")

cidx  = ['725','adroitly','engineers', 'promoted', 'synergy']

cols = [vocab[a] for a in cidx]

rvals =['CD','NN','NNS', 'VB','RB','RP']

rows = [states.index(a) for a in rvals]

B_sub = pd.DataFrame(B[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(B_sub)

View Matrix position at row 0, column 0: 0.000000010
View Matrix position at row 3, column 1: 0.000000027
              725      adroitly     engineers      promoted       synergy
CD   1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
NN   4.609192e-08  4.609192e-08  4.609192e-08  4.609192e-08  4.609192e-08
NNS  1.186130e-07  1.186130e-07  1.186130e-07  1.186130e-07  1.186130e-07
VB   4.615150e-07  4.615150e-07  4.615150e-07  4.615150e-07  4.615150e-07
RB   5.581052e-07  5.581052e-07  5.581052e-07  5.581052e-07  5.581052e-07
RP   2.316007e-07  2.316007e-07  2.316007e-07  2.316007e-07  2.316007e-07


#### Viterbi algorithm Initialize procedure.

In [None]:
def initialize(states, tag_counts, A, B, corpus, vocab, start_token="--s--"):
    num_tags = len(tag_counts)
    best_probs = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    s_idx = states.index(start_token)
    for tag in range(num_tags):
        if A[0, tag] == 0:
            best_probs[tag, 0] = float("-inf")
        else:
            word = corpus[0].split('\t')[0]
            best_probs[tag, 0] = log(A[s_idx, tag] + B[tag, vocab[word]]) if A[s_idx, tag] != 0 else float('-inf')
    return best_probs, best_paths

In [None]:
best_probs, best_paths = initialize(states, tag_counts, A, B, prep, vocab)

In [None]:
with open("./data/WSJ_24.pos", 'r') as f:
    testing_corpus = f.readlines()

#### Viterbi algorithm forward procedure.

In [None]:
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab, verbose=True):
    num_tags = best_probs.shape[0]
    for word in range(1, len(vocab)):
        if word % 5000 == 0 and verbose:
            print("Words processed: {:>8}".format(word))
        for tag in range(num_tags):
            best_prob, best_path = float("-inf"), None
            for previous_tag in range(num_tags):
                prob = best_probs[previous_tag, word - 1] + log(A[previous_tag, tag]) + log(B[tag, word])
                if prob > best_prob:
                    best_prob = prob
                    best_path = previous_tag
            best_probs[tag, word] = best_prob
            best_paths[tag, word] = best_path
    return best_probs, best_paths

In [None]:
best_probs, best_paths = viterbi_forward(A, B, prep, best_probs, best_paths, vocab)

Words processed:     5000
Words processed:    10000
Words processed:    15000
Words processed:    20000


#### Viterbi algorithm backward procedure.

In [None]:
def viterbi_backward(best_probs, best_paths, corpus, states):
    last_word = best_paths.shape[1]
    z = [None] * last_word
    num_tags = best_probs.shape[0]
    best_prob_for_last_word = float('-inf')
    predictions = [None] * last_word
    for tag in range(num_tags):
        if best_probs[tag, last_word - 1] > best_prob_for_last_word:
            best_prob_for_last_word, z[last_word - 1] = best_probs[tag, last_word - 1], tag
    predictions[last_word - 1] = states[z[last_word - 1]]
    for word in range(len(corpus) - 1, -1, -1):
        tag_index = np.argmax(best_probs[:, word], axis=0)
        pos_tag_for_word, z[word - 1] = states[tag_index], best_paths[tag_index, word]
        predictions[word - 1] = pos_tag_for_word
    return predictions

#### Compute accuracy procedure.

In [None]:
def compute_accuracy(pred, y):
    num_correct, total = 0, 0
    for prediction, true_label in zip(pred, y):
        word_tag_tuple = true_label.split('\t')
        if len(word_tag_tuple) != 2:
            continue
        word, tag = word_tag_tuple
        num_correct += 1 if tag[:-1] == prediction else 0
        total += 1
    return num_correct / total

### References

- Coursera - Natural language processing with probabilistic models [course](https://www.coursera.org/learn/probabilistic-models-in-nlp).