## POS Tagging using Hmm

In [1]:
import pandas as pd
import numpy as np

# Custom defaultdict implementation
class MyDefaultDict(dict):
    def __init__(self, default_factory, *args, **kwargs):
        self.default_factory = default_factory
        super().__init__(*args, **kwargs)

    def __missing__(self, key):
        if self.default_factory is None:
            raise KeyError(key)
        else:
            self[key] = self.default_factory()
            return self[key]

# Read training
train_data = pd.read_csv('train.csv')

# Preprocess data
train_sentences = train_data['untagged_sentence'].tolist()
train_tags = train_data['tagged_sentence'].tolist()


def formating(tagged_sentence):
    tagged_words = eval(tagged_sentence)
    words = [word for word, tag in tagged_words]
    tags = [tag for word, tag in tagged_words]
    formating_format = list(zip(words, tags))
    return formating_format


formating_train_tags = [formating(tagged_sentence) for tagged_sentence in train_tags]


train_set = formating_train_tags

# Getting list of tagged words
train_tagged_words = [tup for sent in train_set for tup in sent]

tokens = [pair[0] for pair in train_tagged_words]

Vocab = set(tokens)

Tg = set([pair[1] for pair in train_tagged_words])

# Precompute tag pair counts
tag_pair_counts = MyDefaultDict(int)
for i in range(len(train_tagged_words) - 1):
    current_tag = train_tagged_words[i][1]
    next_tag = train_tagged_words[i + 1][1]
    tag_pair_counts[(current_tag, next_tag)] += 1

# Computing transition matrix using precomputed counts
tags_matrix = np.zeros((len(Tg), len(Tg)), dtype='float32')
for i, t1 in enumerate(list(Tg)):
    for j, t2 in enumerate(list(Tg)):
        m = tag_pair_counts[(t1, t2)]
        n = sum(tag_pair_counts.get((t1, _t2), 0) for _t2 in Tg)
        tags_matrix[i, j] = m / n

# Convert the matrix to a DataFrame
tags_df = pd.DataFrame(tags_matrix, columns=list(Tg), index=list(Tg))

# Precompute tag counts and word given tag counts
tag_counts = MyDefaultDict(int)
word_tag_counts = MyDefaultDict(lambda: MyDefaultDict(int))

for word, tag in train_tagged_words:
    tag_counts[tag] += 1
    word_tag_counts[word][tag] += 1

# Emission Probability Calculation
def word_given_tag(word, tag):
    return word_tag_counts[word][tag] / tag_counts[tag] if tag_counts[tag] != 0 else 0

# Viterbi
def HMM_Viterbi(words, train_bag=train_tagged_words):
    state = []
    Tg = list(tag_counts.keys())
    for key, word in enumerate(words):
        p = [] 
        for tag in Tg:
            if key == 0:
                transition_p = tags_df.loc['.', tag]
            else:
                transition_p = tags_df.loc[state[-1], tag]
            emission_p = word_given_tag(word, tag)
            state_probability = emission_p * transition_p    
            p.append(state_probability)
        pmax = max(p)
        state_max = Tg[p.index(pmax)] 
        state.append(state_max)
    return list(zip(words, state))


In [2]:
import pandas as pd
import ast
from tqdm import tqdm


# data
test_df = pd.read_csv('test_small.csv')

# Initialize a list to store tagged sentences
tagged_sentences = []

# Iterate over each row in the DataFrame
for index, row in tqdm(test_df.iterrows()):
    
    untagged_sentence = ast.literal_eval(row['untagged_sentence'])
    sentence_id = row['id']
    
    tagged_sentence = HMM_Viterbi(untagged_sentence)
    
    tagged_sentences.append({'id': sentence_id, 'tagged_sentence': tagged_sentence})

tagged_df = pd.DataFrame(tagged_sentences)

# Saving the DataFrame to a CSV file
tagged_df.to_csv('sample_submission.csv', index=False)


0it [00:00, ?it/s]

4000it [01:51, 35.97it/s]
