### Intro

In [1]:
ddir = '/Users/lohyikuang/Downloads/school_semesters/2024 Y3 SEMESTER 2/BT 3102/final_project/project_files' 

in_train_filename = f'{ddir}/twitter_train.txt' # Training file
naive_output_probs_filename = f'{ddir}/naive_output_probs.txt' # Output file
in_test_filename = f'{ddir}/twitter_dev_no_tag.txt' # Does not contain tag
in_ans_filename  = f'{ddir}/twitter_dev_ans.txt' # Does not contain tokens
tags_filename = f'{ddir}/twitter_tags.txt' # 25 Tags
naive_prediction_filename = f'{ddir}/naive_predictions.txt'

def evaluate(in_prediction_filename, in_answer_filename):
    """Do not change this method"""
    with open(in_prediction_filename) as fin:
        predicted_tags = [l.strip() for l in fin.readlines() if len(l.strip()) != 0]

    with open(in_answer_filename) as fin:
        ground_truth_tags = [l.strip() for l in fin.readlines() if len(l.strip()) != 0]

    assert len(predicted_tags) == len(ground_truth_tags)
    correct = 0
    for pred, truth in zip(predicted_tags, ground_truth_tags):
        if pred == truth: correct += 1
    return correct, len(predicted_tags), correct/len(predicted_tags)

In [2]:
with open(tags_filename, "r") as f:
    possible_tags = [line.strip() for line in f]
print(f"Possible Tags: [{', '.join(possible_tags)}]")

Possible Tags: [@, ,, L, ~, &, S, N, A, G, $, V, R, X, E, T, M, D, O, Z, !, ^, U, P, Y, #]


### 2.1 (a)

In [3]:
from collections import defaultdict

def maximum_likelihood_estimate(
    delta: float,
    in_train_filename: str,
    in_output_probs_filename: str
):
    tag_counter = defaultdict(lambda: defaultdict(int))
    tag_counts, word_counts = defaultdict(int), defaultdict(int)
    
    with open(in_train_filename, "r") as f:
        for line in f: 
            parts = line.strip().split("\t")
            if len(parts) == 2:
                word, tag = parts
                tag_counter[tag][word] += 1
                tag_counts[tag] += 1
                word_counts[word] += 1
                
    n_unique_words: int = len(word_counts)
    
    # observed data x1, x2, ..., xn  
    # associated tags y1, y2, ..., yn 
    with open(in_output_probs_filename, "w") as f:
        for tag in tag_counter:
            for word in tag_counter[tag]:
                smoothed_prob = (tag_counter[tag][word] + delta) / (tag_counts[tag] + delta * (n_unique_words + 1))
                f.write(f"{tag} \t {word} \t {smoothed_prob}\n")

### 2.1 (b)

In [4]:
def naive_predict(
    in_output_probs_filename, 
    in_test_filename, 
    out_prediction_filename
):
    probs = defaultdict(dict)
    with open(in_output_probs_filename, "r") as f:
        for line in f:
            tag, word, prob = line.strip().split()
            probs[word][tag] = float(prob)
    
    with open(in_test_filename, "r") as test_f, open(out_prediction_filename, 'w') as pred_f:
        for line in test_f:
            word = line.strip()
            if word: 
                best_tag, best_prob = max(
                    probs.get(word, {}).items(), 
                    key=lambda x: x[1], 
                    default=("@", 0)
                )
                pred_f.write(f"{best_tag}\n")
            else:
                pred_f.write("\n") 

In [5]:
from tqdm import tqdm

npo_probs = {}

# Perform grid search
for delta in tqdm(range(1, 1000)):
    delta /= 100
    maximum_likelihood_estimate(delta, in_train_filename, naive_output_probs_filename)
    naive_predict(naive_output_probs_filename, in_test_filename, naive_prediction_filename)
    correct, total, acc = evaluate(naive_prediction_filename, in_ans_filename)
    npo_probs[delta] = acc

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 999/999 [00:30<00:00, 32.56it/s]


In [6]:
# Parameter search to find the best Delta 
maximum_likelihood_estimate(
    0.34, 
    in_train_filename, 
    naive_output_probs_filename
)

### 2.1 (c)

In [7]:
correct, total, acc = evaluate(naive_prediction_filename, in_ans_filename)
correct, total, acc

(967, 1378, 0.7017416545718432)

### 2.2 (a)

In [8]:
def naive_predict2(
    in_output_probs_filename, 
    in_train_filename, 
    in_test_filename, 
    out_prediction_filename
):
    pass

### 3 (a)

In [9]:
trans_probs_filename =  f'{ddir}/trans_probs.txt'
output_probs_filename = f'{ddir}/output_probs.txt'

In [85]:
def maximum_likelihood_viterbi(
    in_train_filename,
    trans_probs_filename,
    output_probs_filename,
):
    # Our hidden states are the N,V,A etc
    # Our emission or observed states are the words themselves
    tag_transition = defaultdict(lambda: defaultdict(int))
    tag_emission = defaultdict(lambda: defaultdict(int))
    tag_counter = defaultdict(int)
    i = 'START' # Front tag. This will get us from start state to some other state
    
    with open(in_train_filename, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) == 0:
                tag_transition[i]['END'] += 1  # Transition to 'END' state
                i = 'START'  # Reset for the next sentence
                continue
            w, j = parts
            w = w.lower() # Some words like Today, today are counted as different
            if i == 'START':
                tag_counter[i] += 1  # Count 'START' transitions
            tag_emission[j][w] += 1  # hidden -> observed
            tag_transition[i][j] += 1  # hidden -> hidden
            tag_counter[j] += 1
            i = j

    # Add transition from the last tag to 'END' for the last sentence
    tag_transition[i]['END'] += 1
    tag_counter['END'] += 1

    with open(trans_probs_filename, 'w') as f:
        for i in tag_transition:
            for j in tag_transition[i]:
                trans_prob_val = tag_transition[i][j] / tag_counter[j]
                f.write(f"{i} \t {j} \t {trans_prob_val}\n")
    
    with open(output_probs_filename, "w") as f:
        for j in tag_emission: 
            for w in tag_emission[j]:
                emission_prob_val = tag_emission[j][w] / tag_counter[j]
                f.write(f"{j} \t {w} \t {emission_prob_val}\n")

In [86]:
maximum_likelihood_viterbi(
    in_train_filename,
    trans_probs_filename,
    output_probs_filename,
)

### 3 (b)

In [87]:
in_tags_filename = f'{ddir}/twitter_tags.txt'
viterbi_predictions_filename = f'{ddir}/viterbi_predictions.txt'

In [91]:
import math

def viterbi_predict(
    in_tags_filename, # Contains the full set of tags 
    in_trans_probs_filename, # Transition probabilties
    in_output_probs_filename, # Emission probabilities
    in_test_filename, # The input file containing some word tags
    out_predictions_filename # Output predictions
):
    # Preload all the information we need
    with open(in_tags_filename, "r") as f:
        states = [tag.strip() for tag in f]

    transition_probabilties = {}
    with open(in_trans_probs_filename, "r") as f:
        for line in f:
            prev_tag, next_tag, prob = line.strip().split("\t")
            prev_tag, next_tag = prev_tag.strip(), next_tag.strip()
            if prev_tag not in transition_probabilties:
                transition_probabilties[prev_tag] = {}
            transition_probabilties[prev_tag][next_tag] = float(prob)

    output_probabilities = {}
    with open(in_output_probs_filename, "r") as f:
        for line in f:
            word, tag, prob = line.strip().split("\t")
            word, tag = word.strip(), tag.strip()
            if word not in output_probabilities: 
                output_probabilities[word] = {}
            output_probabilities[word][tag] = float(prob)
    
    with open(in_test_filename, "r") as f:
        posts, post = [], []
        for word in f: 
            word = word.strip().lower()
            if word == "": 
                posts.append(post)
                post = []
                continue
            post.append(word)
    
    def mask_get(d, key1, key2):
        # Mask for the tokens not observed in the training set
        return d.get(key1, {}).get(key2, 1e-10)
    
    def single_post_viterbi(
        obs, 
        states, 
        trans_p, 
        emit_p
    ):
        V, path = [{}], {}  
        for state in states: 
            V[0][state] = mask_get(trans_p, 'START', state) * mask_get(emit_p, state, obs[0])
            path[state] = [state] 
        
        for t in range(1, len(obs)): 
            V.append({})
            newpath = {}
            
            for cur_state in states: 
                step_score = []
                for prev_state in states:
                    score = (
                        V[t - 1][prev_state] * 
                        mask_get(trans_p, prev_state, cur_state) * 
                        mask_get(emit_p, cur_state, obs[t])
                    )
                    step_score.append((score, prev_state))
                max_score, max_state = max(step_score)
                V[t][cur_state] = max_score
                newpath[cur_state] = path[max_state] + [cur_state]
            path = newpath
            
        (prob, state) = max((V[len(obs) - 1][y], y) for y in states)
        return (prob, path[state])
    
    with open(out_predictions_filename, "w") as f:
        for post in posts: 
            prob, paths = single_post_viterbi(post, states, transition_probabilties, output_probabilities)
            for state in paths:
                f.write(f"{state}\n")  
            f.write(f" \n")

In [92]:
viterbi_predict(
    in_tags_filename, 
    trans_probs_filename, 
    output_probs_filename, 
    in_test_filename, 
    viterbi_predictions_filename
)

### 3 (c)

In [93]:
evaluate(
    viterbi_predictions_filename, 
    in_ans_filename
)

(1072, 1378, 0.7779390420899854)

### 4 (a)

In [94]:
with open(viterbi_predictions_filename, "r") as f, open(in_ans_filename, "r") as fa:
    preds = [line.strip("\n") for line in f]
    ans = [line.strip("\n") for line in fa]

In [162]:
import pandas as pd
df = pd.DataFrame([preds, ans]).T
df.columns = ['preds', 'ans']
df[(df.preds != df.ans) & (df.preds.str.strip() != '')]

Unnamed: 0,preds,ans
3,V,A
40,",",^
42,V,$
43,T,N
46,!,D
...,...,...
1433,P,A
1445,",",^
1447,N,!
1449,Y,!


### 4 (b)

In [105]:
trans_probs_filename2 =  f'{ddir}/trans_probs2.txt'
output_probs_filename2 = f'{ddir}/output_probs2.txt'

viterbi_predictions_filename2 = f'{ddir}/viterbi_predictions2.txt'

In [106]:
maximum_likelihood_viterbi(
    in_train_filename,
    trans_probs_filename2,
    output_probs_filename2,
)

In [116]:
d = {}
with open(in_train_filename, "r") as f:
    for line in f:
        parts = line.strip().split()
        if len(parts) == 0:
            continue
        word, tag = parts
        if word not in d: 
            d[word] = {}
        if tag not in d[word]:
            d[word][tag] = 0
        d[word][tag] += 1
        
filtered_d = {}
for word, tags in d.items():
    # We will directly tag the words if the word is only ever tagged with 1
    if len(tags) == 1:
        for k, v in tags.items():
            filtered_d[word] = k

In [157]:
def viterbi_predict2(
    in_tags_filename, # Contains the full set of tags 
    in_trans_probs_filename, # Transition probabilties
    in_output_probs_filename, # Emission probabilities
    in_test_filename, # The input file containing some word tags
    out_predictions_filename # Output predictions
):
    # Preload all the information we need
    with open(in_tags_filename, "r") as f:
        states = [tag.strip() for tag in f]

    transition_probabilties = {}
    with open(in_trans_probs_filename, "r") as f:
        for line in f:
            prev_tag, next_tag, prob = line.strip().split("\t")
            prev_tag, next_tag = prev_tag.strip(), next_tag.strip()
            if prev_tag not in transition_probabilties:
                transition_probabilties[prev_tag] = {}
            transition_probabilties[prev_tag][next_tag] = float(prob)

    output_probabilities = {}
    with open(in_output_probs_filename, "r") as f:
        for line in f:
            word, tag, prob = line.strip().split("\t")
            word, tag = word.strip(), tag.strip()
            if word not in output_probabilities: 
                output_probabilities[word] = {}
            output_probabilities[word][tag] = float(prob)
    
    with open(in_test_filename, "r") as f:
        posts, post = [], []
        for word in f: 
            word = word.strip().lower()
            if word == "": 
                posts.append(post)
                post = []
                continue
            post.append(word)
    
    def mask_get(d, key1, key2):
        # Mask for the tokens not observed in the training set
        return d.get(key1, {}).get(key2, 1e-10)
    
    def single_post_viterbi(
        obs, 
        states, 
        trans_p, 
        emit_p
    ):
        V, path = [{}], {}  
        for state in states: 
            V[0][state] = mask_get(trans_p, 'START', state) * mask_get(emit_p, state, obs[0])
            path[state] = [state] 
        
        for t in range(1, len(obs)): 
            V.append({})
            newpath = {}
            
            for cur_state in states: 
                step_score = []
                for prev_state in states:
                    score = (
                        V[t - 1][prev_state] * 
                        mask_get(trans_p, prev_state, cur_state) * 
                        mask_get(emit_p, cur_state, obs[t])
                    )
                    step_score.append((score, prev_state))
                max_score, max_state = max(step_score)
                V[t][cur_state] = max_score
                newpath[cur_state] = path[max_state] + [cur_state]
            path = newpath
        (prob, state) = max((V[len(obs) - 1][y], y) for y in states)
        # Because our predictions are reliant on the t-1 element, we only hijack for conditional checks here
        preds = path[state]
        for t in range(len(obs)):
            word = obs[t]
            if "@user_" in word:
                preds[t] = "@"
            elif "http://" in word:
                preds[t] = "U"
            elif "#" == word[0]: # Check if hashtag at start of word
                preds[t] = "#"
            elif word in filtered_d:
                preds[t] = filtered_d[word]
        return (prob, preds)
    
    with open(out_predictions_filename, "w") as f:
        for post in posts: 
            prob, paths = single_post_viterbi(post, states, transition_probabilties, output_probabilities)
            for state in paths:
                f.write(f"{state}\n")  
            f.write(f" \n")

In [158]:
viterbi_predict2(
    in_tags_filename, 
    trans_probs_filename2, 
    output_probs_filename2, 
    in_test_filename, 
    viterbi_predictions_filename2
)

### 4(c)

In [159]:
evaluate(
    viterbi_predictions_filename2, 
    in_ans_filename
)

(1135, 1378, 0.8236574746008708)

### Checks - How can we further improve Viterbi2

In [160]:
with (
    open(viterbi_predictions_filename2, "r") as f, 
    open(in_ans_filename, "r") as fa,
    open(in_test_filename, "r") as ft
):
    preds = [line.strip("\n") for line in f]
    ans = [line.strip("\n") for line in fa]
    word = [line.strip().split("\t")[0] for line in ft]
    print(len(preds), len(ans), len(word))
    
df = pd.DataFrame([preds, ans, word]).T
df.columns = ['preds', 'ans', 'word']

df[(df.preds != df.ans) & (df.preds.str.strip() != '')].to_csv("ts.csv")

1477 1477 1477
