In [1]:
import pandas as pd
from collections import defaultdict

In [2]:
# Viterbi Algo
# sqeuence: list of tokens/words
def viterbi(sequence, emission_probs_tb, transition_probs_tb):
    pi = defaultdict(lambda: defaultdict(float))
    pi[0]['start'] = 1.0

    N = len(sequence)
    for k in range(1, N+1):
        curr_word = sequence[k-1]

        valid_tags = emission_probs_tb[curr_word].keys()
        for t in valid_tags:
            prev_tags = pi[k-1].keys()
            for s in prev_tags:
                pi[k][t] = max(pi[k][t], pi[k-1][s] * transition_probs_tb[t][s] * emission_probs_tb[curr_word][t])
        
        #print(pi)

    for a in pi:
        for b in pi[a]:
            print(a, b, pi[a][b])
    
    return pi


In [3]:
# pi table from viterbi
def viterbi_backtrack_pos(sequence, emission_probs, transition_probs):
    pi = viterbi(sequence, emission_probs, transition_probs)
    pos_tags = []

    for n in range(len(sequence), 0, -1):
        max_tag = max(pi[n], key=lambda x: pi[n][x])
        pos_tags.append(max_tag)
    
    return list(reversed(pos_tags))

In [4]:
# Forward Algo
# sqeuence: list of tokens/words
def forward(sequence, emission_probs_tb, transition_probs_tb):
    pi = defaultdict(lambda: defaultdict(float))
    pi[0]['start'] = 1.0

    N = len(sequence)
    for k in range(1, N+1):
        curr_word = sequence[k-1]

        valid_tags = emission_probs_tb[curr_word].keys()
        for t in valid_tags:
            prev_tags = pi[k-1].keys()
            for s in prev_tags:
                pi[k][t] += pi[k-1][s] * transition_probs_tb[t][s] * emission_probs_tb[curr_word][t]
        

    for a in pi:
        for b in pi[a]:
            print(a, b, pi[a][b])

    print(sum(pi[N].values()))

    return pi

In [16]:
def test_small_hmm():
    emission_probs = pd.read_csv('emission_probs.txt')
    emission_probs_np = emission_probs.to_numpy()

    transition_probs = pd.read_csv('transition_probs.txt')
    transition_probs_np = transition_probs.to_numpy()

    emission_probs_tb = defaultdict(lambda: defaultdict(float))
    transition_probs_tb = defaultdict(lambda: defaultdict(float))

    for e in emission_probs_np:
        emission_probs_tb[e[0]][e[1]] = e[2]
    
    for t in transition_probs_np:
        transition_probs_tb[t[1]][t[0]] = t[2]
    
    sequence = "time flies like an arrow".split(" ")
    pi = viterbi(sequence, emission_probs_tb, transition_probs_tb)

    print(pi)

    tags = viterbi_backtrack_pos(sequence, emission_probs_tb, transition_probs_tb)
    print(tags)

    pi_forward = forward(sequence, emission_probs_tb, transition_probs_tb)
    print(pi_forward)

test_small_hmm()

0 start 1.0
1 VBZ 0.020000000000000004
1 NNZ 0.06
2 VBZ 0.010799999999999999
2 NNZ 0.0024000000000000002
3 VBZ 0.00072
3 IN 0.00216
4 DT 0.0015119999999999999
5 NNZ 0.0007559999999999999
defaultdict(<function viterbi.<locals>.<lambda> at 0x7feee846eaf0>, {0: defaultdict(<class 'float'>, {'start': 1.0}), 1: defaultdict(<class 'float'>, {'VBZ': 0.020000000000000004, 'NNZ': 0.06}), 2: defaultdict(<class 'float'>, {'VBZ': 0.010799999999999999, 'NNZ': 0.0024000000000000002}), 3: defaultdict(<class 'float'>, {'VBZ': 0.00072, 'IN': 0.00216}), 4: defaultdict(<class 'float'>, {'DT': 0.0015119999999999999}), 5: defaultdict(<class 'float'>, {'NNZ': 0.0007559999999999999})})
0 start 1.0
1 VBZ 0.020000000000000004
1 NNZ 0.06
2 VBZ 0.010799999999999999
2 NNZ 0.0024000000000000002
3 VBZ 0.00072
3 IN 0.00216
4 DT 0.0015119999999999999
5 NNZ 0.0007559999999999999
['NNZ', 'VBZ', 'IN', 'DT', 'NNZ']
0 start 1.0
1 VBZ 0.020000000000000004
1 NNZ 0.06
2 VBZ 0.010799999999999999
2 NNZ 0.004000000000000001
3 V

In [17]:
import os
from collections import defaultdict

def get_ngrams(sequence, n):
    """
    Given a sequence, return a list of n-grams, where each n-gram is a Python tuple.
    This should work for arbitrary values of 1 <= n < len(sequence).
    """

    # Modify sequence to include START/STOP
    if n <= 2:
        sequence = ['START'] + sequence
    else:
        sequence = ['START'] * (n-1) + sequence

    sequence += ['STOP']

    n_grams = []

    for i in range(len(sequence) - n + 1):
        n_gram_tup = tuple(sequence[i:i+n])
        n_grams.append(n_gram_tup)

    return n_grams

def create_dataset():
    all_words = []
    all_tags = []
    
    for fname in os.listdir('tagged'):
        print(fname)
        file_reader = open(os.path.join('tagged', fname), 'r')

        curr_words = []
        curr_tags = []

        for line in file_reader:
            if line[0] == "=" or len(line) == 0:
                continue

            tags = line.strip().split(" ")

            for t in tags:
                if t == '[' or t == ']' or len(t) == 0:
                    continue
                
                #print(t)
                word_tag = t.split("/")
                word, tag = word_tag[0].lower(), word_tag[1]

                curr_words.append(word)
                curr_tags.append(tag)

                if word == "." and tag == ".":
                    all_words.append(curr_words)
                    all_tags.append(curr_tags)

                    curr_words = []
                    curr_tags = []
    
    return all_words, all_tags

def train_hmm(words, tags):
    assert len(words) == len(tags)

    unigram_counts = defaultdict(int)
    bigram_counts = defaultdict(int)

    word_emission_counts = defaultdict(lambda: defaultdict(int))

    for i in range(len(words)):
        curr_words = words[i]
        curr_tags = tags[i]

        assert len(curr_words) == len(curr_tags)

        for j in range(len(curr_words)):
            word, tag = curr_words[j], curr_tags[j]
            word_emission_counts[word][tag] += 1

        unigrams = get_ngrams(curr_tags, 1)
        bigrams = get_ngrams(curr_tags, 2)

        for u in unigrams:
            unigram_counts[u] += 1
        
        for b in bigrams:
            bigram_counts[b] += 1
                
    return unigram_counts, bigram_counts, word_emission_counts

def estimate_hmm_params():
    unigram_counts = defaultdict(int)
    bigram_counts = defaultdict(int)

    word_emission_counts = defaultdict(lambda: defaultdict(int))

    for fname in os.listdir('tagged'):
        file_reader = open(os.path.join('tagged', fname), 'r')

        curr_words = []
        curr_tags = []

        for line in file_reader:
            if line[0] == "=" or len(line) == 0:
                continue

            tags = line.strip().split(" ")

            for t in tags:
                if t == '[' or t == ']' or len(t) == 0:
                    continue
                
                word_tag = t.split("/")
                word, tag = word_tag[0].lower(), word_tag[1]

                curr_words.append(word)
                curr_tags.append(tag)

                word_emission_counts[word][tag] += 1

                if word == "." and tag == ".":
                    unigrams = get_ngrams(curr_tags, 1)
                    bigrams = get_ngrams(curr_tags, 2)

                    for u in unigrams:
                        unigram_counts[u] += 1
                    
                    for b in bigrams:
                        bigram_counts[b] += 1
                    
                    curr_words = []
                    curr_tags = []
                
    return unigram_counts, bigram_counts, word_emission_counts

In [18]:
words, tags = create_dataset()

wsj_0087.pos
wsj_0093.pos
wsj_0078.pos
wsj_0044.pos
wsj_0050.pos
wsj_0118.pos
wsj_0130.pos
wsj_0124.pos
wsj_0125.pos
wsj_0131.pos
wsj_0119.pos
wsj_0051.pos
wsj_0045.pos
wsj_0079.pos
wsj_0092.pos
wsj_0086.pos
wsj_0090.pos
wsj_0084.pos
wsj_0053.pos
wsj_0047.pos
wsj_0127.pos
wsj_0133.pos
wsj_0132.pos
wsj_0126.pos
wsj_0046.pos
wsj_0052.pos
wsj_0085.pos
wsj_0091.pos
wsj_0095.pos
wsj_0081.pos
wsj_0056.pos
wsj_0042.pos
wsj_0122.pos
wsj_0136.pos
wsj_0137.pos
wsj_0123.pos
wsj_0043.pos
wsj_0057.pos
wsj_0080.pos
wsj_0094.pos
wsj_0082.pos
wsj_0096.pos
wsj_0041.pos
wsj_0055.pos
wsj_0069.pos
wsj_0135.pos
wsj_0121.pos
wsj_0109.pos
wsj_0108.pos
wsj_0120.pos
wsj_0134.pos
wsj_0068.pos
wsj_0054.pos
wsj_0040.pos
wsj_0097.pos
wsj_0083.pos
wsj_0027.pos
wsj_0033.pos
wsj_0190.pos
wsj_0184.pos
wsj_0153.pos
wsj_0147.pos
wsj_0146.pos
wsj_0152.pos
wsj_0185.pos
wsj_0191.pos
wsj_0032.pos
wsj_0026.pos
wsj_0018.pos
wsj_0030.pos
wsj_0024.pos
wsj_0187.pos
wsj_0193.pos
wsj_0178.pos
wsj_0144.pos
wsj_0150.pos
wsj_0151.pos

In [19]:
words

[['five',
  'things',
  'you',
  'can',
  'do',
  'for',
  '$',
  '15,000',
  'or',
  'less',
  ':',
  '1',
  '.'],
 ['buy', 'a', 'new', 'chevrolet', '.'],
 ['2', '.'],
 ['take', 'a', 'hawaiian', 'vacation', '.'],
 ['3', '.'],
 ['send', 'your', 'child', 'to', 'a', 'university', '.'],
 ['4', '.'],
 ['buy', 'a', 'diamond', 'necklace', '.'],
 ['5', '.'],
 ['make',
  'a',
  'lasting',
  'difference',
  'in',
  'the',
  'regulatory',
  'life',
  'of',
  'an',
  'american',
  'savings-and-loan',
  'association',
  'through',
  'the',
  'foster',
  'corporate',
  'parents',
  'plan',
  '.'],
 ['americans',
  'today',
  'spend',
  '$',
  '15,000',
  'like',
  'pocket',
  'change',
  '--',
  'they',
  'do',
  "n't",
  'think',
  'much',
  'about',
  'it',
  '.'],
 ['but',
  'for',
  'an',
  'ailing',
  'savings-and-loan',
  'association',
  '--',
  'teetering',
  'on',
  'insolvency',
  '--',
  'it',
  'can',
  'lead',
  'to',
  'safety',
  'from',
  'imminent',
  'demise',
  'and',
  'to',
  '

In [21]:
import numpy as np

from tqdm import tqdm
from sklearn.model_selection import KFold

words_np = np.array(words, dtype=list)
tags_np = np.array(tags, dtype=list)

kf = KFold(n_splits=5, shuffle=True, random_state=42)

for i, (train_idx, test_idx) in tqdm(enumerate(kf.split(words_np))):
    words_train = words_np[train_idx]
    tags_train = tags_np[train_idx]

    words_test = words_np[test_idx]
    tags_test = tags_np[test_idx]

    unigram_counts, bigram_counts, word_emission_counts = train_hmm(words_train, tags_train)

    transition_probs_tb = defaultdict(lambda: defaultdict(float))
    emission_probs_tb = defaultdict(lambda: defaultdict(float))

    for bigram in bigram_counts.keys():
        transition_probs_tb[bigram[1]][bigram[0]] = bigram_counts[bigram] / unigram_counts[(bigram[0],)]
        #print(bigram, bigram_counts[bigram] / unigram_counts[(bigram[0],)])

    for word in word_emission_counts.keys():
        for tag in word_emission_counts[word].keys():
            emission_probs_tb[word][tag] = word_emission_counts[word][tag] / unigram_counts[(tag, )]
            #print(word, tag, word_emission_counts[word][tag] / unigram_counts[(tag, )])

    total = 0
    correct = 0
    for j in range(len(words_test)):
        print(words_test[j])
        predicted_tags = np.array(viterbi_backtrack_pos(words_test[j], emission_probs_tb, transition_probs_tb))
        total += len(predicted_tags)
        correct += np.count_nonzero(predicted_tags == np.array(tags_test[j]))
    
    print("Fold {}".format(i))
    print(correct / total)


0it [00:00, ?it/s]

[list(['buy', 'a', 'new', 'chevrolet', '.']) list(['2', '.'])
 list(['take', 'a', 'hawaiian', 'vacation', '.']) ...
 list(['the', 'remaining', '$', '21.9', 'billion', 'could', 'be', 'raised', 'through', 'the', 'sale', 'of', 'short-term', 'treasury', 'bills', ',', 'two-year', 'notes', 'in', 'november', 'and', 'five-year', 'notes', 'in', 'early', 'december', ',', 'the', 'treasury', 'said', '.'])
 list(['in', 'the', 'first', 'three', 'months', 'of', '1990', ',', 'the', 'treasury', 'estimates', 'that', 'it', 'will', 'have', 'to', 'raise', 'between', '$', '45', 'billion', 'and', '$', '50', 'billion', ',', 'assuming', 'that', 'it', 'decides', 'to', 'aim', 'for', 'a', '$', '10', 'billion', 'cash', 'balance', 'at', 'the', 'end', 'of', 'march', '.'])
 list(['david', 'wu', ',', 'the', 'company', "'s", 'representative', 'in', 'taiwan', ',', 'said', 'atlanta-based', 'life', 'of', 'georgia', 'will', 'sell', 'conventional', 'life-insurance', 'products', '.'])]
['five', 'things', 'you', 'can', 'do', 




ValueError: max() arg is an empty sequence

In [21]:
words_np[3000]

['merrill', 'lynch', 'ready', 'assets', 'trust', ':', '8.64', '%', '.']

In [22]:
tags_np[3000]

['NNP', 'NNP', 'NNP', 'NNPS', 'NNP', ':', 'CD', 'NN', '.']

In [16]:
len(words)

3828

In [4]:
unigram_counts, bigram_counts, word_emission_counts = estimate_hmm_params()

In [6]:
transition_probs_tb = defaultdict(lambda: defaultdict(float))
emission_probs_tb = defaultdict(lambda: defaultdict(float))

for bigram in bigram_counts.keys():
    transition_probs_tb[bigram[1]][bigram[0]] = bigram_counts[bigram] / unigram_counts[(bigram[0],)]
    print(bigram, bigram_counts[bigram] / unigram_counts[(bigram[0],)])

for word in word_emission_counts.keys():
    for tag in word_emission_counts[word].keys():
        emission_probs_tb[word][tag] = word_emission_counts[word][tag] / unigram_counts[(tag, )]
        print(word, tag, word_emission_counts[word][tag] / unigram_counts[(tag, )])

('START', 'CD') 0.0073145245559038665
('CD', 'NNS') 0.1530791788856305
('NNS', 'PRP') 0.005954349983460139
('PRP', 'MD') 0.12645687645687645
('MD', 'VB') 0.8155339805825242
('VB', 'IN') 0.11942051683633516
('IN', '$') 0.026809920674766542
('$', 'CD') 0.988950276243094
('CD', 'CC') 0.020821114369501466
('CC', 'JJR') 0.010596026490066225
('JJR', ':') 0.005249343832020997
(':', 'LS') 0.008880994671403197
('LS', '.') 0.38461538461538464
('.', 'STOP') 0.9881259679917398
('START', 'VB') 0.004440961337513062
('VB', 'DT') 0.2364917776037588
('DT', 'JJ') 0.20269442743417024
('JJ', 'NNP') 0.033773358477627295
('NNP', '.') 0.05184160102192889
('START', 'LS') 0.0013061650992685476
('JJ', 'NN') 0.44848277044402535
('NN', '.') 0.10622293138819239
('VB', 'PRP$') 0.039545810493343776
('PRP$', 'NN') 0.43994778067885115
('NN', 'TO') 0.03996656788997797
('TO', 'DT') 0.12987608994951813
('DT', 'NN') 0.470789957134109
('NN', 'NN') 0.12628219740141328
('NN', 'IN') 0.24747359623128942
('IN', 'DT') 0.31679887