#***LIBRARIES***

In [43]:
import collections
import json
from collections import defaultdict
import os
import math

#***Task 1: Vocabulary Creation***

This code implements a vocabulary builder, where creates a vocabulary file from a training data corpus with the following key features:

* **Word Frequency Analysis**: The code scans the input file and counts the frequency of each word in the corpus.
* **Threshold Filtering**: Words appearing fewer than min_freq times are excluded from the vocabulary and counted as unknown words.
* **Vocabulary Sorting**: The vocabulary is sorted in descending order based on word frequency.
* **Unknown Token Handling**: Adding a special <unk> token at the beginning of the vocabulary to represent all low-frequency words.
* **Output Format**: Each vocabulary entry is written in the format word\tindex\tfrequency, where the index starts from 1 (with <unk> at index 0).

The function also provides statistics about the vocabulary size and the number of words classified as unknown.

In [44]:
def build_vocabulary(input_file, output_file, min_freq=3):
    freq_dict = collections.defaultdict(int)

    with open(input_file, 'r', encoding='utf-8') as file:
        for line in file:
            parts = line.strip().split('\t')
            if len(parts) >= 2:
                word = parts[1]
                freq_dict[word] += 1

    unknown_count = sum(count for word, count in freq_dict.items() if count < min_freq)
    vocab_list = [(word, count) for word, count in freq_dict.items() if count >= min_freq]
    vocab_list.sort(key=lambda x: x[1], reverse=True)


    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(f"<unk>\t0\t{unknown_count}\n")


        for i, (word, count) in enumerate(vocab_list):
            f.write(f"{word}\t{i+1}\t{count}\n")

    print(f"Vocabulary saved to '{output_file}'")
    print(f"Total vocabulary size: {len(vocab_list) + 1}")
    print(f"Occurrences of '<unk>': {unknown_count}")

input_file = "train"
output_file = "vocab.txt"
threshold = 3
build_vocabulary(input_file, output_file, threshold)

Vocabulary saved to 'vocab.txt'
Total vocabulary size: 16920
Occurrences of '<unk>': 32537


#***Task 2: Model Learning***

This code implements a function for training a Hidden Markov Model (HMM) for part-of-speech (POS) tagging. The function builds transition and emission probability matrices from a tagged training corpus with these key features:

* **Vocabulary Integration**: The code loads a pre-built vocabulary file, supporting out-of-vocabulary words via the <unk> token.
* **Count Collection**: The algorithm traverses the training file to collect:
  * Transition counts (frequency of state-to-state transitions)
  * Emission counts (frequency of state-word pairings)
  * State counts (frequency of each state)

* **Probability Calculation**: The function computes:
  * Transition probabilities: P(current state | previous state)
  * Emission probabilities: P(word | state)

* **JSON Model Output**: The resulting probabilities are stored in a JSON file with two main components:
  * Transition matrix
  * Emission matrix


* **Statistics Reporting**: The code outputs useful statistics about the trained model, including:

  * Number of unique states
  * Total transition parameters
  * Total emission parameters


In [45]:
import json
from collections import defaultdict

def train_hmm_model(train_file, vocab_file, model_output):
    vocabulary = []
    with open(vocab_file, 'r', encoding='utf-8') as vf:
        for line in vf:
            parts = line.strip().split('\t')
            if len(parts) >= 1:
                word = parts[0]
                vocabulary.append(word)

    print(f"Loaded vocabulary with {len(vocabulary)} words")

    transition_counts = defaultdict(int)
    emission_counts = defaultdict(int)
    state_counts = defaultdict(int)
    observed_states = set()

    with open(train_file, 'r', encoding='utf-8') as file:
        prev_state = None

        for line in file:
            line = line.strip()
            if not line:
                prev_state = None
                continue

            parts = line.split('\t')
            if len(parts) < 3:
                continue

            index, word, state = parts

            if word not in vocabulary:
                word = "<unk>"

            emission_counts[(state, word)] += 1
            state_counts[state] += 1
            observed_states.add(state)

            if prev_state is not None:
                transition_counts[(prev_state, state)] += 1

            prev_state = state

    states_list = sorted(list(observed_states))

    transition_probabilities = {}
    for s in states_list:
        for s_prime in states_list:
            key = f"({s}, {s_prime})"

            if (s, s_prime) in transition_counts:
                probability = transition_counts[(s, s_prime)] / state_counts[s]
            else:
                probability = 0

            transition_probabilities[key] = probability

    emission_probabilities = {}
    for s in states_list:
        for x in vocabulary:
            key = f"({s}, {x})"

            if (s, x) in emission_counts:
                probability = emission_counts[(s, x)] / state_counts[s]
            else:
                probability = 0

            emission_probabilities[key] = probability

    hmm_model = {
        "transition": transition_probabilities,
        "emission": emission_probabilities
    }

    with open(model_output, 'w', encoding='utf-8') as f:
        json.dump(hmm_model, f, indent=4)

    expected_transition_params = len(states_list) * len(states_list)
    expected_emission_params = len(states_list) * len(vocabulary)

    print(f"HMM model saved to '{model_output}'")
    print(f"Total unique states: {len(states_list)}")
    print(f"Total transition parameters: {len(transition_probabilities)}")
    print(f"Total emission parameters: {len(emission_probabilities)}")

    return hmm_model

In [46]:
model = train_hmm_model("train", "vocab.txt", "hmm.json")

Loaded vocabulary with 16920 words
HMM model saved to 'hmm.json'
Total unique states: 45
Total transition parameters: 2025
Total emission parameters: 761400


This code loads and analyzes the Hidden Markov Model, displaying the highest-probability state transitions and word emissions while reporting statistics on non-zero entries to evaluate model characteristics.

In [48]:
with open("hmm.json", 'r', encoding='utf-8') as f:
    hmm_model = json.load(f)

transition_probs = hmm_model["transition"]
emission_probs = hmm_model["emission"]

print("\nFew Transition Probabilities:")
non_zero_transitions = [(k, v) for k, v in transition_probs.items() if v > 0]
non_zero_transitions.sort(key=lambda x: x[1], reverse=True)

for key, prob in non_zero_transitions[:10]:
    print(f"  {key}: {prob:.6f}")

print("\nFew Emission Probabilities:")
non_zero_emissions = [(k, v) for k, v in emission_probs.items() if v > 0]
non_zero_emissions.sort(key=lambda x: x[1], reverse=True)

for key, prob in non_zero_emissions[:10]:
    print(f"  {key}: {prob:.6f}")

print(f"\nTotal transitions: {len(transition_probs)}")
print(f"Non-zero transitions: {len(non_zero_transitions)}")
print(f"Total emissions: {len(emission_probs)}")
print(f"Non-zero emissions: {len(non_zero_emissions)}")


Few Transition Probabilities:
  (#, CD): 0.992126
  ($, CD): 0.992072
  (PDT, DT): 0.918919
  (MD, VB): 0.799089
  (RBS, JJ): 0.747126
  (SYM, :): 0.600000
  (TO, VB): 0.577699
  (UH, ,): 0.517241
  (DT, NN): 0.473488
  (EX, VBZ): 0.468187

Few Emission Probabilities:
  (#, #): 1.000000
  (WP$, whose): 1.000000
  (,, ,): 0.999914
  (TO, to): 0.992591
  (., .): 0.988623
  (``, ``): 0.983928
  ('', ''): 0.981577
  ($, $): 0.974773
  (POS, 's): 0.927933
  (RBS, most): 0.878161

Total transitions: 2025
Non-zero transitions: 1351
Total emissions: 761400
Non-zero emissions: 23373


#***Task 3: Greedy Decoding with HMM***

This code implements a greedy decoder for POS tagging that uses a Hidden Markov Model. It processes text word-by-word, selecting the most probable tag for each word based on emission probabilities and the previous tag. The algorithm handles out-of-vocabulary words using an "unknown" token and evaluates accuracy when gold standard tags are available.

In [49]:
%%writefile greedy_decode.py
import json
def load_vocabulary(vocab_path):
    vocabulary = set()
    with open(vocab_path, 'r', encoding='utf-8') as file:
        for line in file:
            parts = line.strip().split('\t')
            if parts:
                word = parts[0]
                vocabulary.add(word)
    return vocabulary

def load_hmm_model(model_path):
    with open(model_path, 'r', encoding='utf-8') as file:
        hmm = json.load(file)
    transition_probs = {}
    for key, value in hmm["transition"].items():
        s, s_prime = key.strip("()").split(", ")
        transition_probs[(s, s_prime)] = value

    emission_probs = {}
    for key, value in hmm["emission"].items():
        s, x = key.strip("()").split(", ")
        emission_probs[(s, x)] = value

    return transition_probs, emission_probs

def greedy_decoding(input_file, model_file, vocab_file, output_file, evaluate=False):
    transition_probs, emission_probs = load_hmm_model(model_file)
    vocabulary = load_vocabulary(vocab_file)

    all_states = set()
    for (s, _) in transition_probs.keys():
        all_states.add(s)
    for (_, s_prime) in transition_probs.keys():
        all_states.add(s_prime)

    total_words = 0
    correct_predictions = 0

    with open(input_file, 'r', encoding='utf-8') as file_in, open(output_file, 'w', encoding='utf-8') as file_out:
        current_sentence = []
        gold_tags = []

        for line in file_in:
            line = line.strip()

            if not line:
                if current_sentence:
                    prev_state = None
                    predicted_tags = []

                    for word in current_sentence:
                        lookup_word = word if word in vocabulary else "<unk>"

                        best_state = None
                        best_prob = -1

                        for state in all_states:
                            emission_prob = emission_probs.get((state, lookup_word), 0)

                            if prev_state is not None:
                                transition_prob = transition_probs.get((prev_state, state), 0)
                            else:
                                transition_prob = 1
                            prob = emission_prob * transition_prob
                            if prob > best_prob:
                                best_prob = prob
                                best_state = state

                        if best_state is None:
                            best_state = "NN"

                        predicted_tags.append(best_state)
                        prev_state = best_state

                    for i, (word, tag) in enumerate(zip(current_sentence, predicted_tags)):
                        file_out.write(f"{i+1}\t{word}\t{tag}\n")

                    if evaluate and gold_tags:
                        for pred_tag, gold_tag in zip(predicted_tags, gold_tags):
                            total_words += 1
                            if pred_tag == gold_tag:
                                correct_predictions += 1

                file_out.write("\n")
                current_sentence = []
                gold_tags = []
                continue

            parts = line.split('\t')
            if len(parts) >= 2:
                word = parts[1]
                current_sentence.append(word)

                if evaluate and len(parts) >= 3:
                    gold_tags.append(parts[2])

    accuracy = correct_predictions / total_words if total_words > 0 else 0
    if evaluate:
        print(f"Accuracy on dev data: {accuracy:.4f}")

    return accuracy if evaluate else None


vocab_file = "vocab.txt"
dev_file = "dev"
test_file = "test"
model_file = "hmm.json"
dev_output = "dev_greedy.out"
test_output = "greedy.out"

accuracy = greedy_decoding(dev_file, model_file, vocab_file, dev_output, evaluate=True)
print(f"Accuracy on dev data: {accuracy:.4f}")

greedy_decoding(test_file, model_file, vocab_file, test_output)
print(f"Predictions for test data saved to '{test_output}'")

Writing greedy_decode.py


In [50]:
!python greedy_decode.py

Accuracy on dev data: 0.9231
Accuracy on dev data: 0.9231
Predictions for test data saved to 'greedy.out'


This command is running an evaluation script (eval.py) to compare the performance of greedy decoder against a gold standard. The script is taking two parameters:

* -g dev: Specifies the gold standard file (containing the correct POS tags)
* -p dev_greedy.out: Specifies prediction file (containing the tags predicted by greedy decoder)

In [51]:
!python eval.py -g dev -p dev_greedy.out


'1\tThat\tDT' '38\t.\t.' 131751
total: 131751, correct: 121623, accuracy: 92.31%


This code defines a function that displays the first several lines of an output file, useful for quickly inspecting POS tagging results. It counts total tokens and sentences in the file and handles potential errors. The function is applied to view both 'greedy.out' (test results) and 'dev_greedy.out' (development set results).

In [52]:
def view_output_file(file_path, num_lines=20):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            lines = file.readlines()

        print(f"Showing first {min(num_lines, len(lines))} lines of {file_path}:")
        print("-" * 50)

        for i, line in enumerate(lines[:num_lines]):
            print(f"{i+1:>3}: {line.rstrip()}")

        if len(lines) > num_lines:
            print(f"... and {len(lines) - num_lines} more lines")

        sentences = sum(1 for line in lines if line.strip() == '')
        tokens = sum(1 for line in lines if line.strip() != '')

        print("-" * 50)
        print(f"File contains {tokens} tokens in {sentences} sentences")

    except FileNotFoundError:
        print(f"File not found: {file_path}")
    except Exception as e:
        print(f"Error reading file: {e}")

view_output_file('greedy.out', 30)

print("\n")
view_output_file('dev_greedy.out', 30)

Showing first 30 lines of greedy.out:
--------------------------------------------------
  1: 1	Influential	FW
  2: 2	members	NNS
  3: 3	of	IN
  4: 4	the	DT
  5: 5	House	NNP
  6: 6	Ways	NNPS
  7: 7	and	CC
  8: 8	Means	NNP
  9: 9	Committee	NNP
 10: 10	introduced	VBD
 11: 11	legislation	NN
 12: 12	that	IN
 13: 13	would	MD
 14: 14	restrict	VB
 15: 15	how	WRB
 16: 16	the	DT
 17: 17	new	JJ
 18: 18	savings-and-loan	NN
 19: 19	bailout	NN
 20: 20	agency	NN
 21: 21	can	MD
 22: 22	raise	VB
 23: 23	capital	NN
 24: 24	,	,
 25: 25	creating	VBG
 26: 26	another	DT
 27: 27	potential	JJ
 28: 28	obstacle	NN
 29: 29	to	TO
 30: 30	the	DT
... and 135055 more lines
--------------------------------------------------
File contains 129624 tokens in 5461 sentences


Showing first 30 lines of dev_greedy.out:
--------------------------------------------------
  1: 1	The	DT
  2: 2	Arizona	NNP
  3: 3	Corporations	NNS
  4: 4	Commission	NNP
  5: 5	authorized	VBD
  6: 6	an	DT
  7: 7	11.5	CD
  8: 8	%	NN
  9: 9	rate	NN


#***Task 4: Viterbi Decoding with HMM***

In [53]:
import json
import math

This function loads an HMM model from a JSON file, restructuring the transition and emission probabilities into nested dictionaries for more efficient access. It converts string keys like "(NNP, CD)" into a hierarchical structure where transition_probs[s][s_prime] gives the probability of moving from state s to s_prime.

In [54]:
def load_hmm_model(model_path):
    with open(model_path, 'r', encoding='utf-8') as file:
        hmm = json.load(file)

    transition_probs = {}
    for key, value in hmm["transition"].items():
        s, s_prime = key.strip("()").split(", ")
        if s not in transition_probs:
            transition_probs[s] = {}
        transition_probs[s][s_prime] = value

    emission_probs = {}
    for key, value in hmm["emission"].items():
        s, x = key.strip("()").split(", ")
        if s not in emission_probs:
            emission_probs[s] = {}
        emission_probs[s][x] = value

    return transition_probs, emission_probs

This function implements Viterbi decoding for POS tagging. It processes input text sentence by sentence, applies the Viterbi algorithm to find the optimal tag sequence, writes predictions to an output file, and calculates accuracy when in evaluation mode by comparing against gold standard tags.

In [55]:
def viterbi_decoding(input_file, model_file, vocab_file, output_file, evaluate=False):
    transition_probs, emission_probs = load_hmm_model(model_file)
    vocabulary = load_vocabulary(vocab_file)

    all_states = set(transition_probs.keys())

    total_words = 0
    correct_predictions = 0

    with open(input_file, 'r', encoding='utf-8') as file_in, open(output_file, 'w', encoding='utf-8') as file_out:
        sentence = []
        gold_tags = []

        for line in file_in:
            line = line.strip()

            if not line:
                if sentence:
                    best_path = viterbi_algorithm(sentence, all_states, transition_probs, emission_probs, vocabulary)

                    for i, (word, tag) in enumerate(zip(sentence, best_path), 1):
                        file_out.write(f"{i}\t{word}\t{tag}\n")

                    if evaluate and gold_tags:
                        for pred_tag, gold_tag in zip(best_path, gold_tags):
                            total_words += 1
                            if pred_tag == gold_tag:
                                correct_predictions += 1

                file_out.write("\n")
                sentence = []
                gold_tags = []
                continue

            parts = line.split('\t')
            if len(parts) >= 2:
                word = parts[1]
                sentence.append(word)

                if evaluate and len(parts) >= 3:
                    gold_tags.append(parts[2])

    accuracy = correct_predictions / total_words if total_words > 0 else 0
    if evaluate:
        print(f"Accuracy on dev data: {accuracy:.4f}")

    return accuracy if evaluate else None

In [56]:
def load_vocabulary(vocab_path):
    vocabulary = set()
    with open(vocab_path, 'r', encoding='utf-8') as file:
        for line in file:
            parts = line.strip().split('\t')
            if parts:
                word = parts[0]
                vocabulary.add(word)
    return vocabulary

This function implements the Viterbi algorithm for finding the most likely sequence of POS tags for a given sentence. It works by:

* Using logarithms to prevent underflow with small probabilities
* Building a trellis of state probabilities for each word position
* Tracking the most likely previous state with backpointers
* Handling unknown words by substituting with an "<unk>" token
* Setting minimum probability thresholds to avoid math errors
* Finding the highest probability final state
* Backtracking through the trellis to reconstruct the optimal tag sequence

In [57]:
def viterbi_algorithm(words, states, transition_probs, emission_probs, vocabulary):
    n = len(words)

    LOG_ZERO = -1000.0

    viterbi = [{}]
    backpointer = [{}]

    first_word = words[0]
    if first_word not in vocabulary:
        first_word = "<unk>"

    for state in states:
        emission_prob = emission_probs.get(state, {}).get(first_word, 1e-10)

        emission_prob = max(emission_prob, 1e-10)
        viterbi[0][state] = math.log(emission_prob)
        backpointer[0][state] = None

    for t in range(1, n):
        viterbi.append({})
        backpointer.append({})

        word = words[t]
        if word not in vocabulary:
            word = "<unk>"

        for curr_state in states:
            max_prob = float('-inf')
            best_prev_state = None

            for prev_state in states:
                trans_prob = transition_probs.get(prev_state, {}).get(curr_state, 1e-10)
                trans_prob = max(trans_prob, 1e-10)

                prob = viterbi[t-1][prev_state] + math.log(trans_prob)

                if prob > max_prob:
                    max_prob = prob
                    best_prev_state = prev_state

            emission_prob = emission_probs.get(curr_state, {}).get(word, 1e-10)
            emission_prob = max(emission_prob, 1e-10)

            viterbi[t][curr_state] = max_prob + math.log(emission_prob)
            backpointer[t][curr_state] = best_prev_state

    best_final_state = None
    max_final_prob = float('-inf')

    for state in states:
        if viterbi[-1][state] > max_final_prob:
            max_final_prob = viterbi[-1][state]
            best_final_state = state

    if best_final_state is None and states:
        best_final_state = next(iter(states))

    best_path = [best_final_state]
    for t in range(n-1, 0, -1):
        prev_state = backpointer[t][best_path[0]]
        if prev_state is None and states:
            prev_state = next(iter(states))
        best_path.insert(0, prev_state)

    return best_path

In [58]:
accuracy = viterbi_decoding("dev", "hmm.json", "vocab.txt", "dev_viterbi.out", evaluate=True)
print(f"Viterbi decoding accuracy on dev data: {accuracy:.4f}")

Accuracy on dev data: 0.9385
Viterbi decoding accuracy on dev data: 0.9385


In [59]:
viterbi_decoding("test", "hmm.json", "vocab.txt", "viterbi.out")
print(f"Viterbi decoding completed. Predictions saved in '{test_output}'")

Viterbi decoding completed. Predictions saved in 'viterbi.out'
