In [1]:
import numpy as np

def load_data(filename):
    sentences = []
    sentence = []
    with open(filename, 'r') as file:
        for line in file:
            line = line.strip()
            if line:
                word, pos_tag, chunk_tag = line.split()
                sentence.append((word, pos_tag, chunk_tag))
            else:
                sentences.append(sentence)
                sentence = []
    return sentences


def train_hmm(sentences):
    transition_counts = {}
    emission_counts = {}
    context_counts = {}

    for sentence in sentences:
        prev_chunk_tag = '<s>'
        context_counts[prev_chunk_tag] = context_counts.get(
            prev_chunk_tag, 0) + 1
        for word, pos_tag, chunk_tag in sentence:
            transition_counts[(prev_chunk_tag, chunk_tag)] = transition_counts.get(
                (prev_chunk_tag, chunk_tag), 0) + 1
            emission_counts[(chunk_tag, word)] = emission_counts.get(
                (chunk_tag, word), 0) + 1
            context_counts[prev_chunk_tag] = context_counts.get(
                prev_chunk_tag, 0) + 1
            prev_chunk_tag = chunk_tag
        transition_counts[(prev_chunk_tag, '</s>')] = transition_counts.get(
            (prev_chunk_tag, '</s>'), 0) + 1

    transition_probs = {}
    emission_probs = {}

    for (prev_chunk_tag, chunk_tag), count in transition_counts.items():
        transition_probs[(prev_chunk_tag, chunk_tag)] = count / context_counts[prev_chunk_tag]

    for (chunk_tag, word), count in emission_counts.items():
        emission_probs[(chunk_tag, word)] = count / context_counts[chunk_tag]

    return transition_probs, emission_probs


def viterbi_forward(sentence, possible_tags, transition_probs, emission_probs):
    I = len(sentence)
    best_score = {}
    best_edge = {}
    best_score[0, '<s>'] = 0.0
    best_edge[0, '<s>'] = None

    for i in range(I):
        for prev_chunk_tag in possible_tags:
            for chunk_tag in possible_tags:
                if (i, prev_chunk_tag) in best_score and (prev_chunk_tag, chunk_tag) in transition_probs:
                    word = sentence[i][0]
                    if (chunk_tag, word) in emission_probs:
                        score = best_score[i, prev_chunk_tag] - np.log(
                            transition_probs[prev_chunk_tag, chunk_tag]) - np.log(
                                emission_probs[chunk_tag, word])
                    else:
                        score = np.inf
                    if (i+1, chunk_tag) not in best_score or score < best_score[i+1, chunk_tag]:
                        best_score[i+1, chunk_tag] = score
                        best_edge[i+1, chunk_tag] = (i, prev_chunk_tag)
    for prev in possible_tags:
        if f"{I} {prev}" in best_score and f"{prev} </s>" in transition_probs:
            if f"</s>" in emission_probs:
                score = best_score[f"{I} {prev}"] - np.log(transition_probs[f"{prev} </s>"])
            else:
                score = float('inf')
                
            if f"{I+1} </s>" not in best_score or score < best_score[f"{I+1} </s>"]:
                best_score[f"{I+1} </s>"] = score
                best_edge[f"{I+1} </s>"] = f"{I} {prev}"
    
    return best_score, best_edge



def viterbi_backward(best_edge,best_score, sentence, transition_probs):
    I=len(sentence)
    tags = []
    best_final_tag = None
    for x,prev_chunk_tag in best_edge:
        if (prev_chunk_tag,'</s>') in transition_probs:
            best_final_tag = prev_chunk_tag
            break
    tag = best_final_tag
    for i in range(I,0,-1):
        tags.append(tag)
        if (i,tag) in best_edge:
            tag = best_edge[i,tag][1]

    tags.reverse()
    return tags


def test_hmm(sentences, transition_probs, emission_probs, possible_tags):
    tagged_sentences = []

    for sentence in sentences:
        best_score, best_edge = viterbi_forward(
            sentence, possible_tags, transition_probs, emission_probs)
        tagged_sentence = viterbi_backward(best_edge, best_score, sentence, transition_probs)
        tagged_sentences.append(tagged_sentence)

    return tagged_sentences


def evaluate_accuracy(tagged_sentences, gold_sentences):
    correct = 0
    total = 0

    for tagged_sentence, gold_sentence in zip(tagged_sentences, gold_sentences):
        tagged_tags = [tag for tag in tagged_sentence]
        gold_tags = [tag for _, _, tag in gold_sentence]

        for tagged_tag, gold_tag in zip(tagged_tags, gold_tags):
            total += 1
            if tagged_tag == gold_tag:
                correct += 1
    if (total == 0):
        return 0
    else:
        return (correct / total) * 100


# Main program
training_file = "./train.txt/train.txt"
test_file = "./test.txt/test.txt"

# Load and preprocess data
sentences_train = load_data(training_file)
sentences_test = load_data(test_file)
# print(sentences_train[1])
# Train HMM
transition_probs, emission_probs = train_hmm(sentences_train)
# print(transition_probs)
# print(emission_probs)
# # Define possible tags
possible_tags = set()
for chunk_tag, _ in transition_probs.keys():
    possible_tags.add(chunk_tag)
    
# possible_tags = set(transition_probs.keys())
# print(possible_tags)
 
# Test HMM on test data
tagged_sentences = test_hmm(
    sentences_test, transition_probs, emission_probs, possible_tags)
accuracy = evaluate_accuracy(tagged_sentences, sentences_test)
# Evaluate accuracy
print("Accuracy:", accuracy)




Accuracy: 34.550520294657744
