In [1]:
import nltk

#nltk.download('treebank')
#nltk.download('universal_tagset')

In [14]:
#nltk.download('treebank')

[nltk_data] Downloading package treebank to C:\Users\RAMPALLI
[nltk_data]     VAMSI\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\treebank.zip.


True

In [15]:
#nltk.download('universal_tagset')

[nltk_data] Downloading package universal_tagset to C:\Users\RAMPALLI
[nltk_data]     VAMSI\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping taggers\universal_tagset.zip.


True

In [2]:
from nltk.corpus import treebank
from collections import defaultdict, Counter
import numpy as np

In [3]:
# Load the Treebank corpus and split into training and testing sets
data = treebank.tagged_sents(tagset='universal')
train_data = data[:3000]
test_data = data[3000:]

In [11]:
len(data)

3914

In [13]:
data[1]

[('Mr.', 'NOUN'),
 ('Vinken', 'NOUN'),
 ('is', 'VERB'),
 ('chairman', 'NOUN'),
 ('of', 'ADP'),
 ('Elsevier', 'NOUN'),
 ('N.V.', 'NOUN'),
 (',', '.'),
 ('the', 'DET'),
 ('Dutch', 'NOUN'),
 ('publishing', 'VERB'),
 ('group', 'NOUN'),
 ('.', '.')]

In [4]:
def create_hmm_parameters(train_data):
    tag_counts = Counter()
    word_tag_counts = defaultdict(Counter)
    transition_counts = defaultdict(Counter)
    tag_bigrams = Counter()

    for sentence in train_data:
        prev_tag = '<s>'
        for word, tag in sentence:
            tag_counts[tag] += 1
            word_tag_counts[word][tag] += 1
            transition_counts[prev_tag][tag] += 1
            tag_bigrams[(prev_tag, tag)] += 1
            prev_tag = tag
        transition_counts[prev_tag]['</s>'] += 1
        tag_bigrams[(prev_tag, '</s>')] += 1

    tags = list(tag_counts.keys())
    words = list(word_tag_counts.keys())
    
    # Compute initial probabilities with Laplace smoothing
    initial_prob = {tag: (transition_counts['<s>'][tag] + 1) / (sum(transition_counts['<s>'].values()) + len(tags)) for tag in tags}
    
    # Compute transition probabilities with Laplace smoothing
    transition_prob = {
        tag: {t: (transition_counts[tag][t] + 1) / (tag_counts[tag] + len(tags) + 1) for t in tags + ['</s>']}
        for tag in tags + ['<s>']
    }
    
    # Compute emission probabilities with Laplace smoothing
    emission_prob = {
        tag: {word: (word_tag_counts[word][tag] + 1) / (tag_counts[tag] + len(words)) for word in words}
        for tag in tags
    }
    
    return tags, words, initial_prob, transition_prob, emission_prob

tags, words, initial_prob, transition_prob, emission_prob = create_hmm_parameters(train_data)

In [5]:
def viterbi(sentence, tags, initial_prob, transition_prob, emission_prob):
    n = len(sentence)
    m = len(tags)
    viterbi = np.zeros((m, n))
    backpointer = np.zeros((m, n), dtype=int)
    
    # Initialization step
    for i, tag in enumerate(tags):
        viterbi[i, 0] = initial_prob.get(tag, 0) * emission_prob[tag].get(sentence[0], 1e-6)
        backpointer[i, 0] = 0
    
    # Recursion step
    for t in range(1, n):
        for i, tag in enumerate(tags):
            max_prob = 0
            best_tag = 0
            for j, prev_tag in enumerate(tags):
                prob = viterbi[j, t-1] * transition_prob[prev_tag].get(tag, 1e-6) * emission_prob[tag].get(sentence[t], 1e-6)
                if prob > max_prob:
                    max_prob = prob
                    best_tag = j
            viterbi[i, t] = max_prob
            backpointer[i, t] = best_tag
    
    # Termination step
    best_path_prob = max(viterbi[:, n-1])
    best_path_pointer = np.argmax(viterbi[:, n-1])
    
    # Path backtracking
    best_path = [0] * n
    best_path[n-1] = best_path_pointer
    for t in range(n-2, -1, -1):
        best_path[t] = backpointer[best_path[t+1], t+1]
    
    best_tags = [tags[i] for i in best_path]
    return best_tags

In [6]:
def evaluate(test_data, tags, initial_prob, transition_prob, emission_prob):
    correct = 0
    total = 0

    for sentence in test_data:
        words = [word for word, tag in sentence]
        true_tags = [tag for word, tag in sentence]
        predicted_tags = viterbi(words, tags, initial_prob, transition_prob, emission_prob)

        correct += sum(p == t for p, t in zip(predicted_tags, true_tags))
        total += len(true_tags)
    
    accuracy = correct / total
    return accuracy

accuracy = evaluate(test_data, tags, initial_prob, transition_prob, emission_prob)
print(f"Accuracy: {accuracy:.4f}")

Accuracy: 0.8875


In [16]:
def tag_sentence(sentence, tags, initial_prob, transition_prob, emission_prob):
    words = sentence.split()
    tagged_sentence = viterbi(words, tags, initial_prob, transition_prob, emission_prob)
    return list(zip(words, tagged_sentence))

# Example sentence
sentence = "I want to eat biryani today"
tagged_sentence = tag_sentence(sentence, tags, initial_prob, transition_prob, emission_prob)
print(tagged_sentence)

[('I', 'PRON'), ('want', 'VERB'), ('to', 'PRT'), ('eat', 'VERB'), ('biryani', 'DET'), ('today', 'NOUN')]


In [29]:
def create_trigram_hmm_parameters(train_data):
    tag_counts = Counter()
    word_tag_counts = defaultdict(Counter)
    transition_counts = defaultdict(Counter)
    trigram_counts = Counter()

    for sentence in train_data:
        prev_tag1 = '<s>'
        prev_tag2 = '<s>'
        for word, tag in sentence:
            tag_counts[tag] += 1
            word_tag_counts[word][tag] += 1
            transition_counts[(prev_tag1, prev_tag2)][tag] += 1
            trigram_counts[(prev_tag1, prev_tag2)] += 1
            prev_tag1, prev_tag2 = prev_tag2, tag
        transition_counts[(prev_tag1, prev_tag2)]['</s>'] += 1
        trigram_counts[(prev_tag1, prev_tag2)] += 1

    tags = list(tag_counts.keys())
    words = list(word_tag_counts.keys())
    
    # Compute initial probabilities with Laplace smoothing
    initial_prob = {tag: (transition_counts[('<s>', '<s>')][tag] + 1) / (sum(transition_counts[('<s>', '<s>')].values()) + len(tags)) for tag in tags}
    
    # Compute transition probabilities with Laplace smoothing
    transition_prob = {
        (tag1, tag2): {tag: (transition_counts[(tag1, tag2)][tag] + 1) / (trigram_counts[(tag1, tag2)] + len(tags) + 1) for tag in tags + ['</s>']}
        for tag1 in tags + ['<s>'] for tag2 in tags + ['<s>']
    }
    
    # Compute emission probabilities with Laplace smoothing
    emission_prob = {
        tag: {word: (word_tag_counts[word][tag] + 1) / (tag_counts[tag] + len(words)) for word in words}
        for tag in tags
    }
    
    return tags, words, initial_prob, transition_prob, emission_prob

tags, words, initial_prob, transition_prob, emission_prob = create_trigram_hmm_parameters(train_data)

In [30]:
def viterbi_trigram(sentence, tags, initial_prob, transition_prob, emission_prob):
    n = len(sentence)
    m = len(tags)
    viterbi = np.zeros((m, m, n))
    backpointer1 = np.zeros((m, m, n), dtype=int)
    backpointer2 = np.zeros((m, m, n), dtype=int)
    
    # Initialization step
    for i, tag1 in enumerate(tags):
        for j, tag2 in enumerate(tags):
            if n > 1:
                viterbi[i, j, 0] = initial_prob.get(tag1, 1e-6) * initial_prob.get(tag2, 1e-6) * emission_prob[tag2].get(sentence[0], 1e-6) * emission_prob[tag2].get(sentence[1], 1e-6)
            else:
                viterbi[i, j, 0] = initial_prob.get(tag1, 1e-6) * emission_prob[tag1].get(sentence[0], 1e-6)
            backpointer1[i, j, 0] = 0
            backpointer2[i, j, 0] = 0
    
    # Recursion step
    for t in range(2, n):
        for i, tag1 in enumerate(tags):
            for j, tag2 in enumerate(tags):
                max_prob = 0
                best_tag1 = 0
                best_tag2 = 0
                for k, prev_tag1 in enumerate(tags):
                    for l, prev_tag2 in enumerate(tags):
                        prob = viterbi[k, l, t-2] * transition_prob.get((prev_tag1, prev_tag2), {}).get(tag1, 1e-6) * transition_prob.get((prev_tag2, tag1), {}).get(tag2, 1e-6) * emission_prob[tag2].get(sentence[t], 1e-6)
                        if prob > max_prob:
                            max_prob = prob
                            best_tag1 = k
                            best_tag2 = l
                viterbi[i, j, t] = max_prob
                backpointer1[i, j, t] = best_tag1
                backpointer2[i, j, t] = best_tag2
    
    # Termination step
    best_path_prob = 0
    best_path_pointer = (0, 0)
    for i, tag1 in enumerate(tags):
        for j, tag2 in enumerate(tags):
            if viterbi[i, j, n-1] > best_path_prob:
                best_path_prob = viterbi[i, j, n-1]
                best_path_pointer = (i, j)
    
    # Path backtracking
    best_path = [0] * n
    best_path[n-1] = best_path_pointer[1]
    best_path[n-2] = best_path_pointer[0]
    for t in range(n-3, -1, -1):
        best_path[t] = backpointer2[best_path[t+1], best_path[t+2], t+2]
        best_path[t-1] = backpointer1[best_path[t+1], best_path[t+2], t+2]
    
    best_tags = [tags[i] for i in best_path]
    return best_tags

In [31]:
def evaluate_trigram(test_data, tags, initial_prob, transition_prob, emission_prob):
    correct = 0
    total = 0

    for sentence in test_data:
        words = [word for word, tag in sentence]
        true_tags = [tag for word, tag in sentence]
        predicted_tags = viterbi_trigram(words, tags, initial_prob, transition_prob, emission_prob)

        correct += sum(p == t for p, t in zip(predicted_tags, true_tags))
        total += len(true_tags)
    
    accuracy = correct / total
    return accuracy

accuracy = evaluate_trigram(test_data, tags, initial_prob, transition_prob, emission_prob)
print(f"Accuracy: {accuracy:.4f}")

Accuracy: 0.5272
