In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
DATA_FOLDER = '/content/drive/My Drive/Colab Notebooks/data/wsj'

In [47]:
import os
import re
from collections import namedtuple

TaggedWord = namedtuple('TaggedWord', ['word', 'tag'])
START_TAG = '--s--'

def preprocess_wsj(text):
    corpus = []

    for paragraph_text in text.split('=+'):
        corpus.append(TaggedWord(word='', tag=START_TAG))
        for line in paragraph_text.split('\n'):
            parts = line.split(' ')
            for part in parts:
                part = part.strip().strip(']').strip('[')
                if len(part) > 0 and not part.startswith('='):
                    subparts = part.split('/')
                    if len(subparts) > 2:
                        word, tag = subparts[0].strip('\\') + '/' + subparts[1], subparts[2]
                    else:
                        word, tag = subparts
                    corpus.append(TaggedWord(word=word, tag=tag))
    return corpus

corpus = []
for file in os.listdir(DATA_FOLDER):
    if file.startswith('wsj'):
        with open(DATA_FOLDER+f'/{file}') as f:
            subcorpus = preprocess_wsj(f.read())
            corpus.extend(subcorpus)

In [53]:
split_num = int(len(corpus) * 0.8)
training_corpus = corpus[:split_num]
test_corpus = corpus[split_num:]

In [54]:
training_corpus[:5]

[TaggedWord(word='', tag='--s--'),
 TaggedWord(word='R.P.', tag='NNP'),
 TaggedWord(word='Scherer', tag='NNP'),
 TaggedWord(word='Corp.', tag='NNP'),
 TaggedWord(word='said', tag='VBD')]

In [55]:
len(training_corpus)

75519

In [56]:
from collections import defaultdict
import numpy as np

def generate_vocab(corpus):
    vocab = {}
    index = 0

    for (word, tag) in corpus:
        if word not in vocab:
            vocab[word] = index
            index += 1
    
    return vocab

def create_dictionaries(corpus, vocab):
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    prev_tag = '--s--'

    for (word, tag) in corpus:
        transition_counts[(prev_tag, tag)] += 1
        emission_counts[(tag, word)] += 1
        tag_counts[tag] += 1
        prev_tag = tag

    return emission_counts, transition_counts, tag_counts

def create_transition_matrix(alpha, tag_counts, transition_counts):
    tags = sorted(tag_counts.keys())
    num_tags = len(tags)
    transition_matrix = np.zeros((num_tags, num_tags))

    for i in range(num_tags):
        for j in range(num_tags):
            trans_key = (tags[i], tags[j])
            count = 0
            if trans_key in transition_counts:
                count = transition_counts[trans_key]
            transition_matrix[i, j] = (count + alpha) / (tag_counts[tags[i]] + alpha * num_tags)
    
    return transition_matrix

def create_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    tags = sorted(tag_counts.keys())
    words = sorted(vocab.keys())
    num_tags = len(tags)
    num_words = len(words)
    emission_matrix = np.zeros((num_tags, num_words))

    for i in range(num_tags):
        for j in range(num_words):
            count = 0
            emission_key = (tags[i], words[j])
            if emission_key in emission_counts:
                count = emission_counts[emission_key]
            emission_matrix[i, j] = (count + alpha) / (tag_counts[tags[i]] + alpha * num_words)
    
    return emission_matrix

In [68]:
vocab = generate_vocab(corpus)
emission_counts, transition_counts, tag_counts = create_dictionaries(training_corpus, vocab)

In [69]:
transition_matrix = create_transition_matrix(0.001, tag_counts, transition_counts)

In [70]:
transition_matrix

array([[7.11895778e-05, 7.11895778e-05, 7.11895778e-05, ...,
        7.11895778e-05, 7.11895778e-05, 7.11895778e-05],
       [1.74810811e-06, 1.74810811e-06, 1.74810811e-06, ...,
        1.74810811e-06, 1.74810811e-06, 1.74810811e-06],
       [1.65824554e-06, 1.65824554e-06, 6.63464042e-03, ...,
        1.65990379e-03, 3.31814933e-03, 1.32676226e-02],
       ...,
       [9.05223138e-05, 9.05223138e-05, 9.05223138e-05, ...,
        9.05223138e-05, 9.05223138e-05, 9.05223138e-05],
       [6.70929304e-06, 6.70929304e-06, 6.70929304e-06, ...,
        6.70929304e-06, 6.70929304e-06, 6.71600233e-03],
       [1.65824554e-06, 1.65824554e-06, 1.65824554e-06, ...,
        1.65824554e-06, 9.95113150e-03, 1.65824554e-06]])

In [71]:
emission_matrix = create_emission_matrix(0.001, tag_counts, emission_counts, vocab)

In [72]:
emission_matrix

array([[3.84704162e-05, 3.84704162e-05, 5.38624298e-01, ...,
        3.84704162e-05, 3.84704162e-05, 3.84704162e-05],
       [1.71234636e-06, 1.71234636e-06, 1.71234636e-06, ...,
        1.71234636e-06, 1.71234636e-06, 1.71234636e-06],
       [1.62603212e-06, 1.62603212e-06, 1.62603212e-06, ...,
        1.62603212e-06, 1.62603212e-06, 1.62603212e-06],
       ...,
       [4.34896060e-05, 4.34896060e-05, 4.34896060e-05, ...,
        4.34896060e-05, 4.34896060e-05, 4.34896060e-05],
       [6.21141161e-06, 6.21141161e-06, 6.21141161e-06, ...,
        6.21141161e-06, 6.21141161e-06, 6.21141161e-06],
       [1.62603212e-06, 1.62603212e-06, 1.62603212e-06, ...,
        1.62603212e-06, 1.62603212e-06, 1.62603212e-06]])

In [82]:
import math

def initialize(states, tag_counts, A, B, corpus, vocab):
    '''
    Input: 
        states: a list of all possible parts-of-speech
        tag_counts: a dictionary mapping each tag to its respective count
        A: Transition Matrix of dimension (num_tags, num_tags)
        B: Emission Matrix of dimension (num_tags, len(vocab))
        corpus: a sequence of words whose POS is to be identified in a list 
        vocab: a dictionary where keys are words in vocabulary and value is an index
    Output:
        best_probs: matrix of dimension (num_tags, len(corpus)) of floats
        best_paths: matrix of dimension (num_tags, len(corpus)) of integers
    '''
    # Get the total number of unique POS tags
    num_tags = len(tag_counts)
    
    # Initialize best_probs matrix 
    # POS tags in the rows, number of words in the corpus as the columns
    best_probs = np.zeros((num_tags, len(corpus)))
    
    # Initialize best_paths matrix
    # POS tags in the rows, number of words in the corpus as columns
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    
    # Define the start token
    s_idx = states.index("--s--")
    
    # Go through each of the POS tags
    for i in range(num_tags):
        
        # Handle the special case when the transition from start token to POS tag i is zero
        if A[s_idx, i] == 0:
            # Initialize best_probs at POS tag 'i', column 0, to negative infinity
            best_probs[i,0] = float('-inf')
        
        # For all other cases when transition from start token to POS tag i is non-zero:
        else:
            # Initialize best_probs at POS tag 'i', column 0
            # Check the formula in the instructions above
            best_probs[i,0] = math.log(A[s_idx, i]) + math.log(B[i, vocab[corpus[0][0]]])
                        
    return best_probs, best_paths

def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
    '''
    Input: 
        A, B: The transition and emission matrices respectively
        test_corpus: a list containing a preprocessed corpus
        best_probs: an initilized matrix of dimension (num_tags, len(corpus))
        best_paths: an initilized matrix of dimension (num_tags, len(corpus))
        vocab: a dictionary where keys are words in vocabulary and value is an index 
    Output: 
        best_probs: a completed matrix of dimension (num_tags, len(corpus))
        best_paths: a completed matrix of dimension (num_tags, len(corpus))
    '''
    # Get the number of unique POS tags (which is the num of rows in best_probs)
    num_tags = best_probs.shape[0]
    
    # Go through every word in the corpus starting from word 1
    # Recall that word 0 was initialized in `initialize()`
    for i in range(1, len(test_corpus)): 
        
        # Print number of words processed, every 5000 words
        if i % 5000 == 0:
            print("Words processed: {:>8}".format(i))
            
        # For each unique POS tag that the current word can be
        for j in range(num_tags):
            
            # Initialize best_prob for word i to negative infinity
            best_prob_i = float('-inf')
            
            # Initialize best_path for current word i to None
            best_path_i = None

            # For each POS tag that the previous word can be:
            for k in range(num_tags):
            
                # Calculate the probability = 
                # best probs of POS tag k, previous word i-1 + 
                # log(prob of transition from POS k to POS j) + 
                # log(prob that emission of POS j is word i)
                prob = best_probs[k, i-1] + math.log(A[k, j]) + math.log(B[j, vocab[test_corpus[i][0]]])

                # check if this path's probability is greater than
                # the best probability up to and before this point
                if prob > best_prob_i: # complete this line
                    
                    # Keep track of the best probability
                    best_prob_i = prob
                    
                    # keep track of the POS tag of the previous word
                    # that is part of the best path.  
                    # Save the index (integer) associated with 
                    # that previous word's POS tag
                    best_path_i = k

            # Save the best probability for the 
            # given current word's POS tag
            # and the position of the current word inside the corpus
            best_probs[j,i] = best_prob_i
            
            # Save the unique integer ID of the previous POS tag
            # into best_paths matrix, for the POS tag of the current word
            # and the position of the current word inside the corpus.
            best_paths[j,i] = best_path_i

    return best_probs, best_paths

def viterbi_backward(best_probs, best_paths, corpus, states):
    '''
    This function returns the best path.
    
    '''
    # Get the number of words in the corpus
    # which is also the number of columns in best_probs, best_paths
    m = best_paths.shape[1] 
    
    # Initialize array z, same length as the corpus
    z = [None] * m
    
    # Get the number of unique POS tags
    num_tags = best_probs.shape[0]
    
    # Initialize the best probability for the last word
    best_prob_for_last_word = float('-inf')
    
    # Initialize pred array, same length as corpus
    pred = [None] * m
    
    # Go through each POS tag for the last word (last column of best_probs)
    # in order to find the row (POS tag integer ID) 
    # with highest probability for the last word
    for k in range(num_tags):

        # If the probability of POS tag at row k 
        # is better than the previously best probability for the last word:
        if best_probs[k, m-1] > best_prob_for_last_word: # complete this line
            
            # Store the new best probability for the lsat word
            best_prob_for_last_word = best_probs[k, m-1]
    
            # Store the unique integer ID of the POS tag
            # which is also the row number in best_probs
            z[m - 1] = k
            
    # Convert the last word's predicted POS tag
    # from its unique integer ID into the string representation
    # using the 'states' dictionary
    # store this in the 'pred' array for the last word
    pred[m - 1] = states[z[m-1]]
    
    # Find the best POS tags by walking backward through the best_paths
    # From the last word in the corpus to the 0th word in the corpus
    for i in range(m-1, 0, -1):
        
        # Retrieve the unique integer ID of
        # the POS tag for the word at position 'i' in the corpus
        pos_tag_for_word_i = z[i]
        
        # In best_paths, go to the row representing the POS tag of word i
        # and the column representing the word's position in the corpus
        # to retrieve the predicted POS for the word at position i-1 in the corpus
        z[i - 1] = best_paths[pos_tag_for_word_i,i]
        
        # Get the previous word's POS tag in string form
        # Use the 'states' dictionary, 
        # where the key is the unique integer ID of the POS tag,
        # and the value is the string representation of that POS tag
        pred[i - 1] = states[z[i-1]]
        
    return pred

def compute_accuracy(pred, y):
    '''
    Input: 
        pred: a list of the predicted parts-of-speech 
        y: a list of word, tag tuples
    Output: 
        
    '''
    num_correct = 0
    total = 0
    
    # Zip together the prediction and the labels
    for prediction, (word, tag) in zip(pred, y):
        
        # Check if the POS tag label matches the prediction
        if tag == prediction:
            
            # count the number of times that the prediction
            # and label match
            num_correct += 1
            
        # keep track of the total number of examples (that have valid labels)
        total += 1
        
    return num_correct/total

In [77]:
states = sorted(tag_counts.keys())
best_probs, best_paths = initialize(states, tag_counts, transition_matrix, emission_matrix, corpus, vocab)

In [78]:
viterbi_forward(transition_matrix, emission_matrix, test_corpus, best_probs, best_paths, vocab)

Words processed:     5000
Words processed:    10000
Words processed:    15000


(array([[-22.12353354, -27.24675713, -40.93673904, ...,   0.        ,
           0.        ,   0.        ],
        [-25.23555851, -30.3587821 , -37.0786291 , ...,   0.        ,
           0.        ,   0.        ],
        [-25.28728031, -30.4105039 , -35.3650472 , ...,   0.        ,
           0.        ,   0.        ],
        ...,
        [-22.00090111, -27.1241247 , -34.56311644, ...,   0.        ,
           0.        ,   0.        ],
        [-17.03828012, -22.16150371, -34.63787101, ...,   0.        ,
           0.        ,   0.        ],
        [-17.68587798, -22.80910157, -36.34580702, ...,   0.        ,
           0.        ,   0.        ]]), array([[ 0,  6, 14, ...,  0,  0,  0],
        [ 0,  6,  3, ...,  0,  0,  0],
        [ 0,  6, 23, ...,  0,  0,  0],
        ...,
        [ 0,  6, 23, ...,  0,  0,  0],
        [ 0,  6, 23, ...,  0,  0,  0],
        [ 0,  6, 23, ...,  0,  0,  0]]))

In [79]:
pred = viterbi_backward(best_probs, best_paths, test_corpus, states)

In [None]:
compute_accuracy(pred, test_corpus)