# CSCI 544 HW2

In [261]:
import json
import numpy as np
from collections import Counter, defaultdict

## Task 1: Vocabulary Creation (20 points)

In [276]:
# Constants
TRAIN_DATA_PATH = 'data/train.json'
DEV_DATA_PATH = 'data/dev.json'
UNKNOWN_KEY = '<unk>'
THRESHOLD = 2
OUTPUT_FOLDER = 'verification/out'
OUTPUT_PATH_VOCAB = OUTPUT_FOLDER + '/vocab.txt'
OUTPUT_PATH_HMM = OUTPUT_FOLDER + '/hmm.json'
OUTPUT_PATH_GREEDY = OUTPUT_FOLDER + '/greedy.json'
OUTPUT_PATH_VITERBI = OUTPUT_FOLDER + '/viterbi.json'

In [277]:
initial_counts = {}
transition_counts = defaultdict(lambda: defaultdict(int))
emission_counts = defaultdict(lambda: defaultdict(int))
with open(TRAIN_DATA_PATH) as f:
    train_data = json.load(f)
    train_data_words = []
    for train_entry in train_data:
        train_data_words.extend(train_entry['sentence'])

temp_dict = Counter(train_data_words)

freq_dict = {}

freq_dict[UNKNOWN_KEY] = 0
for word in temp_dict:
    if temp_dict[word] < THRESHOLD:
        freq_dict[UNKNOWN_KEY] += temp_dict[word]
    else:
        freq_dict[word] = temp_dict[word]

unk_value = freq_dict[UNKNOWN_KEY]
del freq_dict[UNKNOWN_KEY]
freq_dict = dict([(UNKNOWN_KEY, unk_value)] + sorted(freq_dict.items(), key=lambda item: item[1], reverse=True))

with open(OUTPUT_PATH_VOCAB, 'w') as f:
    for o, word in enumerate(freq_dict):
        freq_dict[word] = {
            'index': o,
            'frequency': freq_dict[word]
        }
        f.write(f'{word}\t{o}\t{freq_dict[word]["frequency"]}\n')

In [278]:
len(freq_dict), freq_dict['<unk>']['frequency']

(23183, 20011)

What threshold value did you choose for identifying unknown words for replacement?
3

What is the overall size of your vocabulary, and how many times does the special token ”< unk >” occur following the replacement process?
Vocabulary size:  16920
< unk > count:  32357



## Task 2: Model Learning

In [279]:
tags = {}
for train_entry in train_data:
    labels = train_entry['labels']
    label_len = len(labels)
    for s in range(label_len):
        tag = labels[s]
        if tag not in tags:
            tags[tag] = {
                'index': len(tags),
                'frequency': 1
            }
        else:
            tags[tag]['frequency'] += 1
        if s == 0:
            initial_counts[tag] = initial_counts.get(tag, 0) + 1
        emitted_word = train_entry['sentence'][s] if train_entry['sentence'][s] in freq_dict else UNKNOWN_KEY
        emission_counts[tag][emitted_word] += 1
        if s < label_len - 1:
            next_tag = labels[s + 1]
            transition_counts[tag][next_tag] += 1

NUM_TAGS = len(tags)
NUM_WORDS = len(freq_dict)

In [280]:
sum(transition_counts['NNP'].values()) / tags['NNP']['frequency']

0.9987101634553922

In [281]:
transition = {}
emission = {}

for tag in transition_counts:
    for next_tag in transition_counts[tag]:
        transition[f'({tag},{next_tag})'] = transition_counts[tag][next_tag] / tags[tag]['frequency']

for tag in emission_counts:
    for next_tag in emission_counts[tag]:
        emission[f'({tag},{next_tag})'] = emission_counts[tag][next_tag] / tags[tag]['frequency']

hmm_json = {
    'transition': transition,
    'emission': emission,
}

with open(OUTPUT_PATH_HMM, 'w') as json_file:
    json.dump(hmm_json, json_file)

In [282]:
len(transition), len(emission)

(1351, 30303)

How many transition and emission parameters in your HMM?

Transition parameters:  1351
Emission parameters:  23373

## Task 3: Greedy Decoding with HMM

In [283]:
initial_prob = np.zeros(NUM_TAGS)
for o, tag in enumerate(tags):
    initial_prob[o] = initial_counts.get(tag, 0) / len(train_data)

transition_prob = np.zeros((NUM_TAGS, NUM_TAGS))
for tag in transition_counts:
    for next_tag in transition_counts[tag]:
        transition_prob[tags[tag]['index']][tags[next_tag]['index']] = transition_counts[tag][next_tag] / tags[tag]['frequency']

emission_prob = np.zeros((NUM_TAGS, NUM_WORDS))
for tag in emission_counts:
    for word in emission_counts[tag]:
        emission_prob[tags[tag]['index']][freq_dict[word]['index']] = emission_counts[tag][word] / tags[tag]['frequency']

In [288]:
with open(DEV_DATA_PATH) as f:
    dev_data = json.load(f)

greedy = []
tag_list = list(tags.keys())
res = np.array([], dtype=bool)

for data_idx, dev_entry in enumerate(dev_data):
    sentence = dev_entry['sentence']
    pred = []
    for o, word in enumerate(sentence):
        init_prob = initial_prob if o == 0 else transition_prob[tags[pred[-1]]['index']]
        mul_value = init_prob * emission_prob[:, freq_dict.get(word, freq_dict[UNKNOWN_KEY])['index']]
        pred.append(tag_list[np.argmax(mul_value)])
    greedy.append({
        'index': data_idx,
        'sentence': sentence,
        'labels': pred
        })
    res = np.append(res, np.array(pred) == np.array(dev_entry['labels']))
print(res.mean())
with open(OUTPUT_PATH_GREEDY, 'w') as json_file:
    json.dump(greedy, json_file)

0.9350297492562686


What is the accuracy on the dev data? 0.9298615748891992

## Task 4: Viterbi Decoding with HMM

In [285]:
def step(mu_prev: np.ndarray,
         emission_probs: np.ndarray,
         transition_probs: np.ndarray,
         observed_state: int):
    pre_max = mu_prev * transition_probs.T
    max_prev_states = np.argmax(pre_max, axis=1)
    max_vals = pre_max[np.arange(len(max_prev_states)), max_prev_states]
    mu_new = max_vals * emission_probs[:, observed_state]
    return mu_new, max_prev_states

In [297]:
def viterbi(sentence, initial_prob, transition_prob, emission_prob):

    text_length = len(sentence)
    viterbi_matrix = np.zeros((NUM_TAGS, text_length))
    backpointers = np.zeros((NUM_TAGS, text_length), dtype=int)

    word = sentence[0] if sentence[0] in freq_dict else UNKNOWN_KEY
    word_index = freq_dict[word]['index']
    viterbi_matrix[:, 0] = initial_prob * emission_prob[:, word_index]

    for t in range(1, len(sentence)):
        for i, tag in enumerate(tags):
            word = sentence[t] if sentence[t] in freq_dict else UNKNOWN_KEY
            word_index = freq_dict[word]['index']

            max_prob = 0
            max_index = 0
            for j, prev_tag in enumerate(tags):
                transition = transition_prob[j, i]
                emission = emission_prob[i, word_index]
                prob = viterbi_matrix[j, t - 1] * transition * emission

                if prob > max_prob:
                    max_prob = prob
                    max_index = j

            viterbi_matrix[i, t] = max_prob
            backpointers[i, t] = max_index

    # Backtrack to find the best sequence of tags
    best_sequence = []
    max_prob = 0
    max_index = 0
    for i in range(NUM_TAGS):
        if viterbi_matrix[i, len(sentence) - 1] > max_prob:
            max_prob = viterbi_matrix[i, len(sentence) - 1]
            max_index = i

    best_sequence.append(max_index)
    for t in range(len(sentence) - 1, 0, -1):
        max_index = backpointers[max_index, t]
        best_sequence.append(max_index)

    best_sequence = best_sequence[::-1]  # Reverse the sequence

    return [tag_list[i] for i in best_sequence]

def calculate_accuracy(predictions, labels):
    correct = sum(1 for p, l in zip(predictions, labels) if p == l)
    total = len(predictions)
    accuracy = correct / total
    return accuracy

with open(DEV_DATA_PATH) as f:
    test_data = json.load(f)

# Perform POS tagging using Viterbi and calculate accuracy
predictions = []
true_labels = []
for test_entry in test_data:
    sentence = test_entry['sentence']
    true_label = test_entry['labels']
    predicted_label = viterbi(sentence, initial_prob, transition_prob, emission_prob)
    predictions.extend(predicted_label)
    true_labels.extend(true_label)

accuracy = (np.array(predictions) == np.array(true_labels)).mean()
print("Accuracy:", accuracy)

Accuracy: 0.9476883613623945


In [298]:
def viterbi_vectorized(sentence, initial_prob, transition_prob, emission_prob, tags, freq_dict):
    num_tags = len(tags)
    num_words = len(freq_dict)
    sentence_length = len(sentence)

    # Create matrices for the Viterbi algorithm
    viterbi_matrix = np.zeros((num_tags, sentence_length))
    backpointers = np.zeros((num_tags, sentence_length), dtype=int)

    # Initialize the first column of the Viterbi matrix
    word_indices = [freq_dict.get(word, freq_dict[UNKNOWN_KEY])['index'] for word in sentence]
    # initial_prob_matrix = np.tile(initial_prob, (sentence_length, 1)).T
    # initial_prob_matrix = np.log(initial_prob_matrix)
    emission_prob_matrix = np.log(emission_prob[:, word_indices])

    viterbi_matrix[:, 0] = initial_prob + emission_prob_matrix[:, 0]

    # Fill in the rest of the Viterbi matrix
    for t in range(1, sentence_length):
        transition_prob_matrix = np.log(transition_prob)
        scores = viterbi_matrix[:, t - 1] + transition_prob_matrix.T + emission_prob_matrix[:, t]
        backpointers[:, t] = np.argmax(scores, axis=0)
        viterbi_matrix[:, t] = np.max(scores, axis=0)

    # Backtrack to find the best sequence of tags
    best_sequence = [np.argmax(viterbi_matrix[:, -1])]
    for t in range(sentence_length - 1, 0, -1):
        best_sequence.append(backpointers[best_sequence[-1], t])

    best_sequence = best_sequence[::-1]  # Reverse the sequence

    return [tag_list[i] for i in best_sequence]

# Perform POS tagging using Viterbi and calculate accuracy
predictions = []
true_labels = []
for test_entry in dev_data:
    sentence = test_entry['sentence']
    true_label = test_entry['labels']
    predicted_label = viterbi_vectorized(sentence, initial_prob, transition_prob, emission_prob, tags, freq_dict)
    predictions.extend(predicted_label)
    true_labels.extend(true_label)

accuracy = (np.array(predictions) == np.array(true_labels)).mean()
print("Accuracy:", accuracy)


  initial_prob_matrix = np.log(initial_prob_matrix)
  emission_prob_matrix = np.log(emission_prob[:, word_indices])


ValueError: operands could not be broadcast together with shapes (45,37) (45,) 