In [29]:
import pandas as pd

# Load the dataset
train_file = "train_data.csv"
test_file = "validation_data.csv"

train_data = pd.read_csv(train_file)
test_data = pd.read_csv(test_file)

In [None]:
print(train_data)

In [31]:
from collections import defaultdict
import ast

# Dictionary of tag vs count
tag_dict = defaultdict(int)

# Dictionary of {word, tag} vs count
word_tag_dict = defaultdict(int)

# Dictionary of {tag i, tag i + 1} vs count
two_tag_dict = defaultdict(int)

# Total sentence count
sentence_count = 0

# Dictionary of first word vs count
first_word_dict = defaultdict(int)

for index, row in train_data.iterrows():
    sentence = ast.literal_eval(row.iloc[0])
    sentence_count = sentence_count + 1

    first_word_dict[(sentence[0][0], sentence[0][1])] = first_word_dict[(sentence[0][0], sentence[0][1])] + 1

    prev_tag = None
    for word, tag in sentence:
        tag_dict[tag] = tag_dict[tag] + 1
        word_tag_dict[(word, tag)] = word_tag_dict[(word, tag)] + 1

        if prev_tag is not None:
            two_tag_dict[(prev_tag, tag)] = two_tag_dict[(prev_tag, tag)] + 1
        prev_tag = tag

In [None]:
print(tag_dict)

In [None]:
for key, value in word_tag_dict.items():
    print(f"Key = {key}, Value = {value}")

In [None]:
for key, value in two_tag_dict.items():
    print(f"Key = {key}, value = {value}")

In [None]:
for key, value in first_word_dict.items():
    print(f"Key = {key}, value = {value}")

In [36]:
# Emission probability
emission_probability = defaultdict(lambda: defaultdict(int))

# Transition probability
transition_probability = defaultdict(lambda: defaultdict(int))

# Initial probability
initial_probability = defaultdict(int)

for key, value in word_tag_dict.items():
    emission_probability[key[0]][key[1]] = word_tag_dict[key]/tag_dict[key[1]]

for key, value in two_tag_dict.items():
    transition_probability[key[0]][key[1]] = two_tag_dict[key]/tag_dict[key[0]]

for key, value in first_word_dict.items():
    initial_probability[key[1]] = initial_probability[key[1]] + value/sentence_count

# transition_probability["START"] = initial_probability

In [None]:
for word in emission_probability.keys():
    for tag, prob in emission_probability[word].items():
        print(f"Word = {word}, Tag = {tag}, Probability = {prob}")

In [None]:
for tag_1 in transition_probability.keys():
    for tag_2, prob in transition_probability[tag_1].items():
        print(f"Tag 1 = {tag_1}, Tag 2 = {tag_2}, Probability = {prob}")

In [None]:
for key, value in initial_probability.items():
    print(f"Tag = {key}, Probability = {value}")

In [40]:
# # Krish Attempt
# import numpy as np

# def viterbi_algorithm(
#     sentence, unique_tags, initial_prob, transition_prob, emission_prob
# )->list:
#     n = len(sentence)
#     m = len(unique_tags)
#     tags_list = list(unique_tags)

#     # Viterbi matrix
#     viterbi = np.zeros((m, n))

#     # Backpointer matrix
#     backpointer = np.zeros((m, n), dtype=int)

#     # Initialize first column
#     for i, tag in enumerate(tags_list):
#         viterbi[i, 0] = initial_prob.get(tag, 1e-6) * emission_prob.get(
#             (tag, sentence[0]), 1e-6
#         )

#     # Recursion step
#     for t in range(1, n):
#         for j, curr_tag in enumerate(tags_list):
#             max_prob, best_prev_tag = max(
#                 [
#                     (
#                         viterbi[i, t - 1]
#                         * transition_prob.get((prev_tag, curr_tag), 1e-6)
#                         * emission_prob.get((curr_tag, sentence[t]), 1e-6),
#                         i,
#                     )
#                     for i, prev_tag in enumerate(tags_list)
#                 ]
#             )
#             viterbi[j, t] = max_prob
#             backpointer[j, t] = best_prev_tag

#     # Backtracking to retrieve the best sequence
#     best_tags = []
#     best_last_tag = np.argmax(viterbi[:, n - 1])
#     best_tags.append(tags_list[best_last_tag])

#     for t in range(n - 1, 0, -1):
#         best_last_tag = backpointer[best_last_tag, t]
#         best_tags.insert(0, tags_list[best_last_tag])

#     return best_tags

In [41]:
# Valmik attempt - Optimized for efficiency using Python dictionaries
from collections import defaultdict

def viterbi_algorithm(sentence, unique_tags, initial_prob, transition_prob, emission_prob) -> list:
    n = len(sentence)  # Number of words
    tags_list = list(unique_tags)  # Convert set to list for indexing
    viterbi = {tag: [0] * n for tag in tags_list}  # Using lists instead of defaultdict for speed
    backpointer = {tag: [None] * n for tag in tags_list}  # Backpointer dictionary

    # Initialize first column
    for tag in tags_list:
        viterbi[tag][0] = initial_prob.get(tag, 1e-6) * emission_prob.get((tag, sentence[0]), 1e-6)

    # Recursion step using dictionaries
    for t in range(1, n):
        for curr_tag in tags_list:
            prev_tag_probs = [
                (viterbi[prev_tag][t - 1] * transition_prob.get((prev_tag, curr_tag), 1e-6) * emission_prob.get((curr_tag, sentence[t]), 1e-6), prev_tag)
                for prev_tag in tags_list
            ]
            max_prob, best_prev_tag = max(prev_tag_probs, key=lambda x: x[0])
            viterbi[curr_tag][t] = max_prob
            backpointer[curr_tag][t] = best_prev_tag

    # Backtracking to retrieve the best sequence
    best_tags = []
    best_last_tag = max(tags_list, key=lambda tag: viterbi[tag][-1])
    best_tags.append(best_last_tag)

    for t in range(n - 1, 0, -1):
        best_last_tag = backpointer[best_last_tag][t]
        best_tags.insert(0, best_last_tag)

    return best_tags

In [None]:
# Predict POS tags for the validation dataset
predicted_tags = []
actual_tags = []

validation_sentences = []

for index, row in test_data.iterrows():
    validation_sentence = ast.literal_eval(row.iloc[0])
    validation_sentences.append(validation_sentence)

for sentence in validation_sentences:
    words = [word for word, tag in sentence]
    actual_tags.extend([tag for word, tag in sentence])

    # Convert emission_probability to match expected format
    emission_prob = {
        (tag, word): emission_probability[word][tag]
        for word in emission_probability
        for tag in emission_probability[word]
    }

    predicted_tags.extend(
        viterbi_algorithm(
            words,
            tag_dict.keys(),
            initial_probability,
            transition_probability,
            emission_prob,  # Pass the reformatted emission probabilities
        )
    )

In [None]:
from sklearn.metrics import accuracy_score

accuracy = accuracy_score(actual_tags, predicted_tags)
print(f"Model Accuracy: {accuracy * 100:.2f}%")

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Get unique tags from tag_dict
unique_tags = list(tag_dict.keys())

cm = confusion_matrix(actual_tags, predicted_tags, labels=list(unique_tags))
plt.figure(figsize=(12, 8))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    xticklabels=unique_tags,
    yticklabels=unique_tags,
    cmap="Blues",
)
plt.xlabel("Predicted Tag")
plt.ylabel("Actual Tag")
plt.title("Confusion Matrix")
plt.show()