In [1]:
import pandas as pd
import numpy as np

pd.options.display.max_colwidth = 300

In [2]:
df = pd.read_csv('POS.train', sep='\t', header=None, names=['raw'])

In [3]:
df.head()

Unnamed: 0,raw
0,"Pierre/NP Vinken/NP ,/, 61/CD years/NNS old/JJ ,/, will/MD join/VB the/DT board/NN as/IN a/DT nonexecutive/JJ director/NN Nov./NP 29/CD ./."
1,"Mr./NP Vinken/NP is/VBZ chairman/NN of/IN Elsevier/NP N.V./NP ,/, the/DT Dutch/NP publishing/VBG group/NN ./."
2,"Rudolph/NP Agnew/NP ,/, 55/CD years/NNS old/JJ and/CC former/JJ chairman/NN of/IN Consolidated/NP Gold/NP Fields/NP PLC/NP ,/, was/VBD named/VBN a/DT nonexecutive/JJ director/NN of/IN this/DT British/JJ industrial/JJ conglomerate/NN ./."
3,"A/DT form/NN of/IN asbestos/NN once/RB used/VBN to/TO make/VB Kent/NP cigarette/NN filters/NNS has/VBZ caused/VBN a/DT high/JJ percentage/NN of/IN cancer/NN deaths/NNS among/IN a/DT group/NN of/IN workers/NNS exposed/VBN to/TO it/PP more/RBR than/IN 30/CD years/NNS ago/IN ,/, researchers/NNS rep..."
4,"The/DT asbestos/NN fiber/NN ,/, crocidolite/NN ,/, is/VBZ unusually/RB resilient/JJ once/IN it/PP enters/VBZ the/DT lungs/NNS ,/, with/IN even/RB brief/JJ exposures/NNS to/TO it/PP causing/VBG symptoms/NNS that/WDT show/VBP up/IN decades/NNS later/JJ ,/, researchers/NNS said/VBD ./."


In [4]:
def train(df):
    
    print('Calculating word given tag probabilities...')
    word_tag_prob = (
    df
    .assign(raw_split = lambda x: x['raw'].str.split())
    .drop(['raw'], axis=1)
    .explode('raw_split')
    .reset_index(drop=True)
    .assign(word_tag = lambda x: x['raw_split'].str.split('/'))
    .drop(['raw_split'], axis=1)
    .assign(
        word = lambda x: x['word_tag'].str[0],
        tag = lambda x: x['word_tag'].str[1],
    )
    .drop(['word_tag'], axis=1)
    .groupby(by=['word', 'tag'])
    .agg({'tag': 'count'})
    .rename(columns={'tag': 'count'})
    .reset_index()
    .assign(
        tagcount = lambda x: x.groupby(by=['tag'])['count'].transform(sum),
        prob = lambda x: x['count']/x.groupby('tag')['count'].transform(sum)
    )
    .drop(['count', 'tagcount'], axis=1)
    .set_index(['word', 'tag'])['prob']
    )
    
    print('Calculating tag given sentence beginning probabilities...')
    first_tag_prob = (
    df
    .assign(first_tag = lambda x: x['raw'].str.split().str[0].str.split('/').str[1])
    .groupby(by=['first_tag'])
    .agg({'first_tag': 'count'})
    .rename(columns={'first_tag': 'count'})
    .assign(prob = lambda x: x['count']/x['count'].sum())
    .drop(['count'], axis=1)['prob']
    )
    
    print('Calculating tag given previous tag probabilities...')
    tag_given_tag_prob = (
    df
    .assign(eos_tag=' EOS/EOS')
    .assign(raw_mod = lambda x: (x['raw'] + x['eos_tag']).astype(str))
    .drop(['raw'], axis=1)
    .assign(raw_split = lambda x: x['raw_mod'].str.split())
    .drop(['raw_mod', 'eos_tag'], axis=1)
    .explode('raw_split')
    .reset_index(drop=True)
    .assign(given = lambda x: x['raw_split'].str.split('/').str[1])
    .drop(['raw_split'], axis=1)
    .assign(asked = lambda x: x['given'].shift(-1)).fillna('EndOfDocument')
    .groupby(by=['asked', 'given'])
    .agg({'asked': 'count'})
    .rename(columns={'asked': 'count'})
    .reset_index()
    .assign(prob = lambda x: x['count']/x.groupby(by=['given'])['count'].transform(sum))
    .set_index(['asked', 'given'])['prob']
    )
    print('Done.')
    return word_tag_prob, first_tag_prob, tag_given_tag_prob

In [5]:
%%time
word_tag_prob, first_tag_prob, tag_given_tag_prob = train(df)

Calculating word given tag probabilities...
Calculating tag given sentence beginning probabilities...
Calculating tag given previous tag probabilities...
Done.
CPU times: user 1.78 s, sys: 110 ms, total: 1.89 s
Wall time: 1.91 s


In [23]:
word_tag_prob[('The', 'DT')]

0.08784517347179609

In [114]:
def predict_sentence(sentence, word_tag_prob, first_tag_prob, tag_given_tag_prob):
    
    def get_word_tag_prob(word, tag):
        try:
            return word_tag_prob[(word, tag)]
        except:
            return 1.0
    
    def get_tag_set(word):
        try:
            return word_tag_prob[word].index.tolist()
        except:
            return ['NN']
    
    def get_tag_given_tag_prob(this_tag, given_tag):
        try:
            return tag_given_tag_prob[(this_tag, given_tag)]
        except KeyError:
            return 0.0
    
    score = dict()
    back_ptr = dict()
    sentence_list = sentence.split()
    
    # Initialization
    first_word_tag_set = get_tag_set(sentence_list[0])
    for tag in first_word_tag_set:
        score[(tag, 0)] = get_word_tag_prob(sentence_list[0], tag)
        back_ptr[(tag, 0)] = 0
    
    # Iteration
    for j, word in enumerate(sentence_list[1:]):
        for tag in get_tag_set(word):
            i = j + 1
            prev_word_tag_set = get_tag_set(sentence_list[i-1])
            t1 = get_word_tag_prob(word, tag)
            t2_dict = {last_tag: score[(last_tag, i-1)] * get_tag_given_tag_prob(tag, last_tag) 
                       for last_tag in prev_word_tag_set}
            t2 = max(t2_dict.values())
            score[(tag, i)] = t1 * t2
            back_ptr[(tag, i)] = max(t2_dict)

    final_word_tag_set = get_tag_set(sentence_list[-1])
    last_scores = {tag: score[(tag, len(sentence_list) - 1)] for tag in final_word_tag_set}
    final_word_tag = max(last_scores)
    
    final_tags = [final_word_tag]
    this_tag = final_word_tag
    for i in reversed(range(len(sentence_list))):
        final_tags.append(back_ptr[(this_tag, i)])
        this_tag = back_ptr[(this_tag, i)]
    
    return final_tags[::-1][1:]

In [120]:
sentence = 'Can you feel this'
predict_sentence(sentence, word_tag_prob, first_tag_prob, tag_given_tag_prob)

['MD', 'PP', 'VBP', 'DT']

In [None]:
# Words given tags
# Tags given beginning of sentence
# Tags given previous tag