In [1]:
import numpy as np
from collections import defaultdict
import pandas as pd
import ast

In [2]:
class HMM:
    def __init__(self, states, observations):
        self.states = states
        self.observations = observations
        self.start_probabilities = np.zeros(len(states))
        self.transition_probabilities = np.zeros((len(states), len(states)))
        self.emission_probabilities = np.zeros((len(states), len(observations)))

        self.state_index = {state: i for i, state in enumerate(states)}
        self.observation_index = {obs: i for i, obs in enumerate(observations)}

    def compute_start_probabilities(self, sequences):
        for sequence in sequences:
            start_state = sequence[0][1]  # The tag of the first token
            self.start_probabilities[self.state_index[start_state]] += 1
        self.start_probabilities /= np.sum(self.start_probabilities)

    def compute_transition_probabilities(self, sequences):
        for sequence in sequences:
            for i in range(len(sequence) - 1):
                current_state = sequence[i][1]
                next_state = sequence[i + 1][1]
                self.transition_probabilities[self.state_index[current_state], self.state_index[next_state]] += 1

        for i in range(len(self.states)):
            if(np.sum(self.transition_probabilities[i])==0):
                continue
            self.transition_probabilities[i] /= np.sum(self.transition_probabilities[i])

    def compute_emission_probabilities(self, sequences):
        for sequence in sequences:
            for token, state in sequence:
                self.emission_probabilities[self.state_index[state], self.observation_index[token]] += 1

        for i in range(len(self.states)):
            if(np.sum(self.emission_probabilities[i])==0):
                continue
            self.emission_probabilities[i] /= np.sum(self.emission_probabilities[i])

    def viterbi_algorithm(self, obs):
        viterbi_table = [[0.0 for _ in range(len(self.states))] for _ in range(len(obs))]
        backpointer = [[0 for _ in range(len(self.states))] for _ in range(len(obs))]

        for t in range(len(obs)):
            for s in range(len(self.states)):
                if t == 0:
                    viterbi_table[t][s] = self.start_probabilities[s] * self.emission_probabilities[s][obs[t]]
                else:
                    max_prob = -1
                    max_backpointer = -1

                    for s_prime in range(len(self.states)):
                        prob = viterbi_table[t-1][s_prime] * self.transition_probabilities[s_prime][s] * self.emission_probabilities[s][obs[t]]
                        if prob > max_prob:
                            max_prob = prob
                            max_backpointer = s_prime

                    viterbi_table[t][s] = max_prob
                    backpointer[t][s] = max_backpointer

        best_path_prob = max(viterbi_table[-1])
        best_path_pointer = max(range(len(self.states)), key=lambda s: viterbi_table[-1][s])
        best_path = [best_path_pointer]
        for t in range(len(obs)-1, 0, -1):
            best_path.insert(0, backpointer[t][best_path[0]])

        return best_path

In [3]:
def threshold_data(sequences, threshold):
    word_counts = defaultdict(int)
    for sequence in sequences:
        for token, state in sequence:
            word_counts[token] += 1

    unk_sequences = []
    for sequence in sequences:
        unk_sequence = [(token if word_counts[token] > threshold else 'UNK', state) for token, state in sequence]
        unk_sequences.append(unk_sequence)

    return unk_sequences

In [4]:
train = pd.read_csv('./gujarati_train_data.csv')
test = pd.read_csv('./gujarati_test_data.csv')

In [5]:
df_train = pd.read_csv('./gujarati_train_data.csv', converters={'tokens': ast.literal_eval, 'ner_tags': ast.literal_eval})

In [6]:
train_data = [(row['tokens'], row['ner_tags']) for _, row in df_train.iterrows()]
unique_tags = set(tag for _, tags in train_data for tag in tags)
states = list(unique_tags)

In [7]:
train_hmm_data = [list(zip(tokens, tags)) for tokens, tags in train_data]

In [8]:
unk_sequences = threshold_data(train_hmm_data, 5)

In [9]:
print(unk_sequences[0])

[('લક્ઝરી', 0), ('સેન્ટ', 5), ('એન્ડ્રુ', 6), ('માતાનો', 6), ('ચર્ચ', 6)]


In [10]:
unique_tokens = set()
for sequence in unk_sequences:
    for token, _ in sequence:
        unique_tokens.add(token)

unique_tokens_list = list(unique_tokens)
num_observations = len(unique_tokens)

In [11]:
def train_hmm(hmm_model, sequences):
    hmm_model.compute_start_probabilities(sequences)
    hmm_model.compute_transition_probabilities(sequences)
    hmm_model.compute_emission_probabilities(sequences)

In [12]:
hmm_model = HMM(states, unique_tokens_list)

train_hmm(hmm_model, unk_sequences)

print("Start Probabilities:")
print(hmm_model.start_probabilities)
print("Transition Probabilities:")
print(hmm_model.transition_probabilities)
print("Emission Probabilities:")
print(hmm_model.emission_probabilities)

Start Probabilities:
[0.64992968 0.12283941 0.         0.10794658 0.         0.11928433
 0.        ]
Transition Probabilities:
[[8.79156890e-01 5.51401080e-02 0.00000000e+00 3.07241538e-02
  0.00000000e+00 3.49788483e-02 0.00000000e+00]
 [2.71415376e-01 1.55862047e-02 6.99710381e-01 4.89691125e-03
  2.37957324e-04 8.09054902e-03 6.26203485e-05]
 [7.20740567e-01 2.38400692e-02 2.16323703e-01 1.48377306e-02
  1.26457932e-03 2.27147769e-02 2.78573994e-04]
 [5.60108715e-01 7.21998719e-03 1.00209707e-03 1.19909469e-02
  4.12453378e-01 7.06845056e-03 1.56424909e-04]
 [3.91808060e-01 6.72594495e-03 5.60495413e-04 2.13695678e-02
  5.69909559e-01 9.41958795e-03 2.06784715e-04]
 [7.69289858e-01 8.93353535e-03 8.44547997e-04 1.83423942e-02
  2.10924801e-03 4.44809425e-02 1.55999474e-01]
 [5.62097000e-01 9.27016167e-03 7.84973367e-04 1.46154565e-02
  1.71946547e-03 5.13223063e-02 3.60190636e-01]]
Emission Probabilities:
[[1.21865641e-06 1.42176581e-06 2.43731282e-06 ... 2.03109402e-07
  0.00000000

In [13]:
test_tokens = test['tokens']
test_ner_tags = test['ner_tags']
misvals = []

N_test = len(test_tokens)

def compute_accuracy(hmm_model):
    count_correct_new = 0
    tot_new = 0
    correct = 0
    total = 0
    for idx in range(N_test):
        tags = eval(test_ner_tags[idx])
        observations = [obs for obs in eval(test_tokens[idx])]
        if(len(observations)==0):
            continue
        observed_tokens = [tok for tok in eval(test_ner_tags[idx])]
        observation_indices = [hmm_model.observation_index[obs] if obs in hmm_model.observation_index else hmm_model.observation_index['UNK'] for obs in observations]
        predicted_tags = hmm_model.viterbi_algorithm(observation_indices)
        for i,obs in enumerate(observations):
            if obs not in hmm_model.observation_index:
                tot_new += 1 
                if(predicted_tags[i] == tags[i]):
                    count_correct_new += 1

        for i in range(len(predicted_tags)):
            if(predicted_tags[i] == observed_tokens[i]):
                correct += 1
        total += len(observations)
    return correct / total, count_correct_new / tot_new


accuracy, unk_accuracy = compute_accuracy(hmm_model)
print("Accuracy:", accuracy)
print("UNK Accuracy:", unk_accuracy)

Accuracy: 0.8982154659616656
UNK Accuracy: 0.6525911708253359
