In [1]:
from google.colab import drive

#Establishing connection to drive for Colab
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
# Import assignment files
train = "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/train"
dev = "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/dev"
test= "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/test"

Task 1: Vocabulary Creation

In [3]:
# Create a Word Count Dictionary
word_counts = {}

# While reading in the file, add to dictionary
with open(train, "r", encoding="utf-8") as file:
    for row in file:
        row = row.strip()
        if row:
            _, word, _ = row.split("\t")
            if word in word_counts:
                word_counts[word] += 1
            else:
                word_counts[word] = 1

In [4]:
# Create a vocab dictionary
vocab = {"<unk>": 0}
filtered_counts = {"<unk>": 0}
index = 1
sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)


In [5]:
for word, count in sorted_words:
    if count < 3: # Threshold specified in instructions
        filtered_counts["<unk>"] += count  # Add occurrences to <unk>
    else:
        vocab[word] = index
        filtered_counts[word] = count
        index += 1

In [6]:
vocabulary = '/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/vocab.txt'
with open(vocabulary, "w", encoding="utf-8") as file:
  # First line should be <unk> words
  file.write(f"<unk>\t0\t{filtered_counts['<unk>']}\n")
  # Known words
  for word, idx in vocab.items():
    if word != "<unk>":
      file.write(f"{word}\t{idx}\t{filtered_counts[word]}\n")

print("Selected Threshold for unknown words replacement: 3")
print(f"Total words: {len(vocab)}")
print(f"Occurrences of '<unk>': {filtered_counts['<unk>']}")

Selected Threshold for unknown words replacement: 3
Total words: 16920
Occurrences of '<unk>': 32537


2. Model Learning

In [7]:
import json

In [8]:
# Create a Hidden Markov Model JSON

hmm = '/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/hmm.json'

In [9]:
# Create dictionaries to track occurances of the transitions/states/emissions

transitions = {}
states = {}
emissions = {}

In [10]:
with open(train, "r", encoding="utf-8") as file:
    previous_state = None

    for sentence in file:
        sentence = sentence.strip()
        if sentence:
            _, word, state = sentence.split("\t")

            # Increase the emissions dictionary count
            if (state, word) in emissions:
                emissions[(state, word)] += 1
            else:
                emissions[(state, word)] = 1

            # Increase the states dictionary count
            if state in states:
                states[state] += 1
            else:
                states[state] = 1

            # Increase the transitions dictionary count
            if previous_state is not None:
                if (previous_state, state) in transitions:
                    transitions[(previous_state, state)] += 1
                else:
                    transitions[(previous_state, state)] = 1

            # Make current state the previous state
            previous_state = state

        else:
            prev_state = None

In [11]:
# Create dictionaries to keep track of probabilities

transition_probabilities = {}
emission_probabilities = {}

In [12]:
# Get probability by dividing count of state by total states
for (state, next_state), count in transitions.items():
  proportion = count / states[state]
  transition_probabilities[(state, next_state)] = proportion

# Get probability by dividing count of emission by total states
for (s, e), count in emissions.items():
  emission_proportion = count / states[s]
  emission_probabilities[(s, e)] = emission_proportion

In [13]:
# Adding the transtion and emission data to a dictionary to add to the JSON
hidden_markov = {
    "transition": {f"{state},{next_state}": probability for (state, next_state), probability in transition_probabilities.items()},
    "emission": {f"{s},{e}": probability for (s, e), probability in emission_probabilities.items()}
}

In [14]:
# Save the Hidden Markov Model as a JSON
with open(hmm, "w", encoding="utf-8") as file:
    json.dump(hidden_markov, file, indent=4)

# Output summary
print(f"Location the model has been saved to {hmm}")
print(f"Total transition parameters: {len(transition_probabilities)}")
print(f"Total emission parameters: {len(emission_probabilities)}")

Location the model has been saved to /content/drive/My Drive/ColabNotebooks/CSCI544_HW3/hmm.json
Total transition parameters: 1378
Total emission parameters: 50286


3. Greedy Decoding

In [15]:
# Greedy Decoding Algorithm
def greedy_decoding(sentence):
    predictions = []
    prev_state = None

    for i, word in enumerate(sentence):
        best_state = None
        best_prob = 0

        for state in states:
            # Get emission probability
            emission_prob = emission_probabilities.get((state, word))

            # If word is not in emission probabilities it goes to unknown
            if emission_prob is None:
                emission_prob = emission_probabilities.get((state, "<unk>"), 1e-6)

            # If first word, probability set to 1
            if i == 0:
                transition_prob = 1
            else:

              # Retrieve probability of this transition
                transition_prob = transition_probabilities.get((prev_state, state), 1e-6)

            # Likelihood of word is product of transition and emission
            prob = transition_prob * emission_prob

            # Check if most likely
            if prob > best_prob:
                best_prob = prob
                best_state = state

        predictions.append((word, best_state))
        # Greedy algorithm selects best state to remember
        prev_state = best_state

    return predictions

In [16]:
output_test = "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/greedy.out"
with open(test, "r", encoding="utf-8") as infile, open(output_test, "w", encoding="utf-8") as outfile:
    sentence = []

    for line in infile:
        line = line.strip()
        if line:
            index, word = line.split("\t")  # Test data has no POS tag
            sentence.append((index, word))
        else:
            # Perform greedy decoding
            predicted_tags = greedy_decoding([w for _, w in sentence])

            # Write predictions in the same format as training data
            for (index, word), (_, predicted_tag) in zip(sentence, predicted_tags):

                outfile.write(f"{index}\t{word}\t{predicted_tag}\n")

            # Separate sentences
            outfile.write("\n")

            # Reset for next sentence
            sentence = []

print(f"Predictions saved in {output_test}")


Predictions saved in /content/drive/My Drive/ColabNotebooks/CSCI544_HW3/greedy.out


In [17]:
# Setting location to output greedy predictions
output = "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/dev_greedy.out"

# Checking greedy decoding on test data
with open(dev, "r", encoding="utf-8") as infile, open(output, "w", encoding="utf-8") as outfile:
    sentence = []
    indexes = []

    for line in infile:
        line = line.strip()
        if line:
            index, word, _ = line.split("\t")
            indexes.append(index)
            sentence.append((index, word))
        else:
            # Use greedy algorithm to predict tags
            predicted_tags = greedy_decoding([w for _, w in sentence])

            # Write predictions in the same format as training data
            for (index, word), (_, predicted_tag) in zip(sentence, predicted_tags):
                outfile.write(f"{index}\t{word}\t{predicted_tag}\n")
            outfile.write("\n")

            # Reset for next sentence
            sentence = []
            indexes = []

    if sentence:
      predicted_tags = greedy_decoding([w for _, w in sentence])

    # Write predictions in the same format as training data
    for (index, word), predicted_tag in zip(sentence, predicted_tags):
      outfile.write(f"{index}\t{word}\t{predicted_tag}\n")
    outfile.write("\n")

print(f"Predictions saved in {output}")
!python "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/eval.py" -p "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/dev_greedy.out" -g "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/dev"

Predictions saved in /content/drive/My Drive/ColabNotebooks/CSCI544_HW3/dev_greedy.out
total: 131768, correct: 123287, accuracy: 93.56%


In [18]:
with open(hmm, "r", encoding="utf-8") as file:
    hmm_model = json.load(file)

In [19]:
import numpy as np

transition_probs = {tuple(k.split(',')): v for k, v in hmm_model["transition"].items()}
emission_probalities = {tuple(k.split(',')): v for k, v in hmm_model["emission"].items()}

# Extract Part of Speech tag states
states = list(set(s for s, _ in emission_probabilities.keys()))

def viterbi_decoding(sentence):
    n = len(sentence)

    num_states = len(states)

    # Initialize Viterbi and backpointer tables
    viterbi = np.zeros((num_states, n))  # Probability table
    backpointer = np.zeros((num_states, n), dtype=int)  # Best previous state index

    # Convert states to index mapping

    state_to_idx = {state: i for i, state in enumerate(states)}

    # Initialize first column of Viterbi table
    for i, state in enumerate(states):
        emission_prob = emission_probabilities.get((state, sentence[0]), emission_probabilities.get((state, "<unk>"), 1e-6))
        viterbi[i, 0] = emission_prob  # Assume uniform initial probability

    # Fill Viterbi table for t > 1
    for t in range(1, n):  # Loop through words in sentence
        for i, curr_state in enumerate(states):
            max_prob = -1
            best_prev_state = 0

            emission_prob = emission_probabilities.get((curr_state, sentence[t]), emission_probabilities.get((curr_state, "<unk>"), 1e-6))

            for j, prev_state in enumerate(states):
                transition_prob = transition_probabilities.get((prev_state, curr_state), 1e-6)
                prob = viterbi[j, t-1] * transition_prob * emission_prob

                if prob > max_prob:
                    max_prob = prob
                    best_prev_state = j

            viterbi[i, t] = max_prob
            backpointer[i, t] = best_prev_state

    # Backtrace to find best path
    best_final_state = np.argmax(viterbi[:, n-1])
    best_path = [best_final_state]

    for t in range(n-1, 0, -1):
        best_final_state = backpointer[best_final_state, t]
        best_path.insert(0, best_final_state)

    # Convert indices back to states
    best_tags = [states[i] for i in best_path]
    return best_tags

In [20]:
viterbi_test_file = "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/viterbi.out"
with open(test, "r", encoding="utf-8") as infile, open(viterbi_test_file, "w", encoding="utf-8") as outfile:
    sentence = []
    indexes = []

    for line in infile:
        line = line.strip()
        if line:
            tokens = line.split("\t")
            if len(tokens) == 2:  # Ensure correct format (test data has no true labels)
                index, word = tokens
                indexes.append(index)
                sentence.append(word)
        else:
            # Perform Viterbi decoding
            predicted_tags = viterbi_decoding(sentence)

            # Write predictions following the same order
            for index, word, predicted_tag in zip(indexes, sentence, predicted_tags):
                outfile.write(f"{index}\t{word}\t{predicted_tag}\n")
            outfile.write("\n")  # Keep blank lines between sentences

            # Reset for next sentence
            sentence = []
            indexes = []

print(f"Test data predictions saved in {viterbi_test_file}.")

Test data predictions saved in /content/drive/My Drive/ColabNotebooks/CSCI544_HW3/viterbi.out.


In [21]:
viterbi_file = "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/dev_viterbi.out"
with open(dev, "r", encoding="utf-8") as infile, open(viterbi_file, "w", encoding="utf-8") as outfile:
    sentence = []
    indexes = []

    for line in infile:
        line = line.strip()
        if line:
            tokens = line.split("\t")

            #Check for correct length
            if len(tokens) == 3:

                # Append to lists
                index, word, _ = tokens
                indexes.append(index)
                sentence.append(word)
        else:
            # Perform Viterbi decoding
            predicted_tags = viterbi_decoding(sentence)

            # Write predictions following the same order
            for index, word, predicted_tag in zip(indexes, sentence, predicted_tags):
              outfile.write(f"{int(index)}\t{word.strip()}\t{predicted_tag.strip()}\n")
            outfile.write("\n")  # Keep blank lines between sentences

            # Reset for next sentence
            sentence = []
            indexes = []
    if sentence:
      # Perform Viterbi decoding on the last sentence
      predicted_tags = viterbi_decoding(sentence)

    # Write predictions for the last sentence
    for index, word, predicted_tag in zip(indexes, sentence, predicted_tags):
        outfile.write(f"{int(index)}\t{word.strip()}\t{predicted_tag.strip()}\n")
    outfile.write("\n")  # Keep blank lines between sentences

!python "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/eval.py" -p "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/dev_viterbi.out" -g "/content/drive/My Drive/ColabNotebooks/CSCI544_HW3/dev"

total: 131768, correct: 124857, accuracy: 94.76%
