In [45]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [46]:
def read_conllu(path):
    sentences = []
    sentence = []

    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()

            # Sentence boundary
            if not line:
                if sentence:
                    sentences.append(sentence)
                    sentence = []
                continue

            # Skip comments
            if line.startswith("#"):
                continue

            cols = line.split("\t")

            # Skip multi-word tokens like "1-2 don't"
            if "-" in cols[0]:
                continue

            word = cols[1].lower()
            upos = cols[3]

            sentence.append((word, upos))

    if sentence:
        sentences.append(sentence)

    return sentences


In [47]:
#update to local paths to test locally
# TRAIN_PATH = "/kaggle/input/nlp-datasets/en_ewt-ud-train.conllu"
# TEST_PATH  = "/kaggle/input/nlp-datasets/en_ewt-ud-test.conllu"
TRAIN_PATH = "en_ewt-ud-train.conllu"
TEST_PATH  = "en_ewt-ud-test.conllu"
train_sentences = read_conllu(TRAIN_PATH)
test_sentences  = read_conllu(TEST_PATH)


In [48]:
counter = 0
while counter < 2:
    print(train_sentences[counter])
    counter += 1
print()
counter = 0
while counter < 2:
    print(test_sentences[counter])
    counter += 1

[('al', 'PROPN'), ('-', 'PUNCT'), ('zaman', 'PROPN'), (':', 'PUNCT'), ('american', 'ADJ'), ('forces', 'NOUN'), ('killed', 'VERB'), ('shaikh', 'PROPN'), ('abdullah', 'PROPN'), ('al', 'PROPN'), ('-', 'PUNCT'), ('ani', 'PROPN'), (',', 'PUNCT'), ('the', 'DET'), ('preacher', 'NOUN'), ('at', 'ADP'), ('the', 'DET'), ('mosque', 'NOUN'), ('in', 'ADP'), ('the', 'DET'), ('town', 'NOUN'), ('of', 'ADP'), ('qaim', 'PROPN'), (',', 'PUNCT'), ('near', 'ADP'), ('the', 'DET'), ('syrian', 'ADJ'), ('border', 'NOUN'), ('.', 'PUNCT')]
[('[', 'PUNCT'), ('this', 'DET'), ('killing', 'NOUN'), ('of', 'ADP'), ('a', 'DET'), ('respected', 'ADJ'), ('cleric', 'NOUN'), ('will', 'AUX'), ('be', 'AUX'), ('causing', 'VERB'), ('us', 'PRON'), ('trouble', 'NOUN'), ('for', 'ADP'), ('years', 'NOUN'), ('to', 'PART'), ('come', 'VERB'), ('.', 'PUNCT'), (']', 'PUNCT')]

[('what', 'PRON'), ('if', 'SCONJ'), ('google', 'PROPN'), ('morphed', 'VERB'), ('into', 'ADP'), ('googleos', 'PROPN'), ('?', 'PUNCT')]
[('what', 'PRON'), ('if', 'SCO

In [49]:
from collections import Counter

word_counts = Counter(
    word for sent in train_sentences for word, _ in sent
)

print(word_counts.most_common(10))


[('the', 9075), ('.', 8640), (',', 7021), ('to', 5137), ('and', 5002), ('a', 3782), ('of', 3622), ('i', 3380), ('in', 3112), ('is', 2241)]


In [50]:
#Initial Probabilities (P(tag/START))
from collections import Counter

initial_tag_counts = Counter()
total_sentences = len(train_sentences)

for sent in train_sentences:
    #check the tag of first word of each sentence
    first_tag = sent[0][1]
    initial_tag_counts[first_tag] += 1
# print(initial_tag_counts)
# print()
#calculate prob that a tag occurs at the start
initial_probs = {
    tag: count / total_sentences
    for tag, count in initial_tag_counts.items()
}

sorted_data = dict(sorted(initial_probs.items(), key=lambda item: item[1]))
print(sorted_data)
# print(initial_probs)

{'X': 7.971938775510203e-05, 'PART': 0.00470344387755102, 'SYM': 0.0074936224489795915, 'CCONJ': 0.02311862244897959, 'AUX': 0.028858418367346938, 'INTJ': 0.03228635204081633, 'PUNCT': 0.035235969387755105, 'SCONJ': 0.035634566326530615, 'NUM': 0.039142219387755105, 'ADJ': 0.04097576530612245, 'ADP': 0.04320790816326531, 'VERB': 0.06050701530612245, 'NOUN': 0.06194196428571429, 'ADV': 0.07653061224489796, 'DET': 0.10044642857142858, 'PROPN': 0.12771045918367346, 'PRON': 0.28212691326530615}


In [51]:
#Transition probabilities
transition_counts = Counter()
tag_counts = Counter()   # denominator

for sent in train_sentences:
    for i in range(len(sent) - 1):
        tag_i = sent[i][1]
        tag_j = sent[i + 1][1]

        transition_counts[(tag_i, tag_j)] += 1
        tag_counts[tag_i] += 1
transition_probs = {}

for (tag_i, tag_j), count in transition_counts.items():
    transition_probs[(tag_i, tag_j)] = count / tag_counts[tag_i]

# print(transition_probs)

In [52]:
#Emission probabilities
emission_counts = Counter()
tag_counts_emission = Counter()

for sent in train_sentences:
    for word, tag in sent:
        emission_counts[(tag, word)] += 1
        tag_counts_emission[tag] += 1
# print(len(tag_counts_emission))
emission_probs = {}

for (tag, word), count in emission_counts.items():
    emission_probs[(tag, word)] = count / tag_counts_emission[tag]
# print(emission_probs)

In [53]:
#basic checks
print(sum(initial_probs.values()))
print(sum(
    prob for (t1, _), prob in transition_probs.items()
    if t1 == "NOUN"
)
)
print(sum(
    prob for (t, _), prob in emission_probs.items()
    if t == "NOUN"
)
)

# total
# for (tag, word), prob in emission_probs.items():
#     if tag == "PUNCT":
#         # print(f"word={word} prob={prob}")
#         total=total+prob
# print(total)

1.0
1.0
1.0


In [None]:
#brute force way 
# if sentences are long there will be many possible tag sequences and overall many iterations to check so we use viterbi algorithm to optimize


import itertools


def sequence_probability(words, tag_seq,initial_probs,transition_probs,emission_probs):
#for computing probabilities of a tag sequence in brute force way
    prob = 1.0

    word = words[0]
    prob *= initial_probs.get(tag_seq[0], 0)
    prob *= emission_probs.get((tag_seq[0], word), 0)

    # example for a three letter word
    
    # P(t1)×P(w1∣t1)×P(t2∣t1)×P(w2∣t2)×P(t3∣t2)×P(w3∣t3)
    # p(t1)-initial_prob

    for i in range(1, len(words)):
        prev_tag = tag_seq[i-1]
        curr_tag = tag_seq[i]

        word = words[i]

        prob *= transition_probs.get((prev_tag, curr_tag), 0)
        prob *= emission_probs.get((curr_tag, word), 0)

    return prob


def brute_force(words,all_tags,initial_probs,transition_probs,emission_probs):

    best_prob = -1
    best_seq = None

    # generate probability for every possible tag sequence
    for tag_seq in itertools.product(all_tags, repeat=len(words)):

        prob = sequence_probability(words,tag_seq,initial_probs,transition_probs,emission_probs)

        if prob > best_prob:
            best_prob = prob
            best_seq = tag_seq

    return list(best_seq)

In [None]:
def viterbi(words,all_tags,initial_probs,transition_probs,emission_probs):
    T = len(words)
    N = len(all_tags)
    
    V = {}
    B = {}

    for tag in all_tags:
        V[(tag, 0)] = (
            initial_probs.get(tag, 0) *
            emission_probs.get((tag, words[0]), 0)
        )
        B[(tag, 0)] = None
        
    # Vt(tagj ) = max(tagi(Vt−1(tagi) × P(tagj | tagi) × P(wordt| tagj ))
    
    for i in range(1, len(words)):
        for curr_tag in all_tags:

            best_prob = 0
            best_prev = None

            for prev_tag in all_tags:
                prob = (V[(prev_tag, i-1)] *transition_probs.get((prev_tag, curr_tag), 0) *emission_probs.get((curr_tag, words[i]), 0))
                
                if prob > best_prob:
                    best_prob = prob
                    best_prev = prev_tag
            V[(curr_tag, i)] = best_prob
            B[(curr_tag, i)] = best_prev
                #backtracking matric stores the best prev tag (tag that leads to the maximum Viterbi probability)s at the particular position for a particular tag
    
    #to choose which final tag gives the highest probability overall
    
    last_pos = len(words) - 1    
    # Return the item for which V[(t, last_pos) is largest

    best_last_tag = None
    best_last_prob = -1

    for tag in all_tags:
        if V[(tag, last_pos)] > best_last_prob:
            best_last_prob = V[(tag, last_pos)]
            best_last_tag = tag

    # the viterbi maxtrix contain the max probability that can be obtained with a specific tag sequence that ends with that tag at that position.
    # we backtrack from the final  tag with highest probability
    best_tags = [best_last_tag]

    for i in range(last_pos, 0, -1):
        prev = B.get((best_tags[-1], i))

        if prev is None:
            # fallback: choose tag with max probability at previous position
            prev = max(
                all_tags,
                key=lambda t: V[(t, i-1)]
            )

        best_tags.append(prev)

    best_tags.reverse()


    
    return best_tags
    

In [None]:
def debug(test_sentences,initial_probs,transition_probs,emission_probs,brute_force,viterbi,max_examples=6):

    all_tags = list(tag_counts_emission.keys())
    shown = 0

    for sent in test_sentences:

        words = [w for w, _ in sent]
        gold  = [t for _, t in sent]

        print("Sentence:", words)
        print("Gold tags:", gold)
        
        if len(words) < 7:
            bf_pred = brute_force(words,all_tags,initial_probs,transition_probs,emission_probs)

            bf_correct = sum(g == p for g, p in zip(gold, bf_pred))
            bf_acc = bf_correct / len(gold)

            print("Brute Force:", bf_pred)
            print("BF Accuracy:", round(bf_acc, 3))
        else:
            print("Brute Force: skipped (sentence too long)")


        vit_pred = viterbi(words,all_tags,initial_probs,transition_probs,emission_probs)

        vit_correct = sum(g == p for g, p in zip(gold, vit_pred))
        vit_acc = vit_correct / len(gold)

        print("Viterbi:    ", vit_pred)
        print("VT Accuracy:", round(vit_acc, 3))
        print()
        shown += 1
        if shown >= max_examples:
            break


In [None]:
debug(test_sentences,initial_probs,transition_probs,emission_probs,brute_force,viterbi,max_examples=6)

Sentence: ['what', 'if', 'google', 'morphed', 'into', 'googleos', '?']
Gold tags: ['PRON', 'SCONJ', 'PROPN', 'VERB', 'ADP', 'PROPN', 'PUNCT']
Brute Force: skipped (sentence too long)
Viterbi:     ['PRON', 'SCONJ', 'PROPN', 'PROPN', 'PROPN', 'PROPN', 'PROPN']
VT Accuracy: 0.571

Sentence: ['what', 'if', 'google', 'expanded', 'on', 'its', 'search', '-', 'engine', '(', 'and', 'now', 'e-mail', ')', 'wares', 'into', 'a', 'full', '-', 'fledged', 'operating', 'system', '?']
Gold tags: ['PRON', 'SCONJ', 'PROPN', 'VERB', 'ADP', 'PRON', 'NOUN', 'PUNCT', 'NOUN', 'PUNCT', 'CCONJ', 'ADV', 'NOUN', 'PUNCT', 'NOUN', 'ADP', 'DET', 'ADV', 'PUNCT', 'ADJ', 'NOUN', 'NOUN', 'PUNCT']
Brute Force: skipped (sentence too long)
Viterbi:     ['PRON', 'SCONJ', 'PROPN', 'VERB', 'ADP', 'PRON', 'NOUN', 'PUNCT', 'NOUN', 'PUNCT', 'CCONJ', 'ADV', 'NOUN', 'PUNCT', 'PROPN', 'PROPN', 'PROPN', 'PROPN', 'PROPN', 'PROPN', 'PROPN', 'PROPN', 'PROPN']
VT Accuracy: 0.609

Sentence: ['[', 'via', 'microsoft', 'watch', 'from', 'mary