In [2]:
from collections import Counter, defaultdict

In [3]:
from nltk.corpus import conll2000

In [6]:
import os

In [103]:
conll2000.tagged_sents()[:2]

[[('Confidence', 'NN'), ('in', 'IN'), ('the', 'DT'), ('pound', 'NN'), ('is', 'VBZ'), ('widely', 'RB'), ('expected', 'VBN'), ('to', 'TO'), ('take', 'VB'), ('another', 'DT'), ('sharp', 'JJ'), ('dive', 'NN'), ('if', 'IN'), ('trade', 'NN'), ('figures', 'NNS'), ('for', 'IN'), ('September', 'NNP'), (',', ','), ('due', 'JJ'), ('for', 'IN'), ('release', 'NN'), ('tomorrow', 'NN'), (',', ','), ('fail', 'VB'), ('to', 'TO'), ('show', 'VB'), ('a', 'DT'), ('substantial', 'JJ'), ('improvement', 'NN'), ('from', 'IN'), ('July', 'NNP'), ('and', 'CC'), ('August', 'NNP'), ("'s", 'POS'), ('near-record', 'JJ'), ('deficits', 'NNS'), ('.', '.')], [('Chancellor', 'NNP'), ('of', 'IN'), ('the', 'DT'), ('Exchequer', 'NNP'), ('Nigel', 'NNP'), ('Lawson', 'NNP'), ("'s", 'POS'), ('restated', 'VBN'), ('commitment', 'NN'), ('to', 'TO'), ('a', 'DT'), ('firm', 'NN'), ('monetary', 'JJ'), ('policy', 'NN'), ('has', 'VBZ'), ('helped', 'VBN'), ('to', 'TO'), ('prevent', 'VB'), ('a', 'DT'), ('freefall', 'NN'), ('in', 'IN'), ('s

Function for extracting words and their tags from UD annotations:

In [20]:
def extractWordTags(filename):
    sents = []
    with open(os.path.join(os.getcwd(), filename), 'r', encoding='utf-8') as f:
        sent = []
        for line in f.readlines():
            if line.startswith('# sent_id'):
                if sent:
                    sents.append(sent)
                sent = []
            elif line.startswith('#') or line == '\n':
                continue
            else:
                token_id, token, lemma, pos_class, pos, *other = line.split('\t')
                sent.append((token, pos))
    return sents

Extract words and their POS tags from training data:

In [21]:
train_data = extractWordTags(r"data\UD_English-EWT\en_ewt-ud-train.conllu")

In [22]:
train_data[:5]

[[('Al', 'NNP'),
  ('-', 'HYPH'),
  ('Zaman', 'NNP'),
  (':', ':'),
  ('American', 'JJ'),
  ('forces', 'NNS'),
  ('killed', 'VBD'),
  ('Shaikh', 'NNP'),
  ('Abdullah', 'NNP'),
  ('al', 'NNP'),
  ('-', 'HYPH'),
  ('Ani', 'NNP'),
  (',', ','),
  ('the', 'DT'),
  ('preacher', 'NN'),
  ('at', 'IN'),
  ('the', 'DT'),
  ('mosque', 'NN'),
  ('in', 'IN'),
  ('the', 'DT'),
  ('town', 'NN'),
  ('of', 'IN'),
  ('Qaim', 'NNP'),
  (',', ','),
  ('near', 'IN'),
  ('the', 'DT'),
  ('Syrian', 'JJ'),
  ('border', 'NN'),
  ('.', '.')],
 [('[', '-LRB-'),
  ('This', 'DT'),
  ('killing', 'NN'),
  ('of', 'IN'),
  ('a', 'DT'),
  ('respected', 'JJ'),
  ('cleric', 'NN'),
  ('will', 'MD'),
  ('be', 'VB'),
  ('causing', 'VBG'),
  ('us', 'PRP'),
  ('trouble', 'NN'),
  ('for', 'IN'),
  ('years', 'NNS'),
  ('to', 'TO'),
  ('come', 'VB'),
  ('.', '.'),
  (']', '-RRB-')],
 [('DPA', 'NNP'),
  (':', ':'),
  ('Iraqi', 'JJ'),
  ('authorities', 'NNS'),
  ('announced', 'VBD'),
  ('that', 'IN'),
  ('they', 'PRP'),
  ('had',

Now let's compare UD tags with NLTK tags:

In [23]:
from itertools import chain

In [28]:
[i for i in chain(*[['a','b'], ['c', 'd'], ['e', 'f']])]

['a', 'b', 'c', 'd', 'e', 'f']

In [30]:
# all nltk tags:
', '.join(set([i[1] for i in chain(*conll2000.tagged_sents())]))

"RB, ., '', VBZ, POS, TO, NN, RBS, NNPS, VBG, VBP, PRP, (, CC, ), JJR, PRP$, ``, FW, $, VB, NNS, PDT, ,, JJS, SYM, RP, UH, VBD, DT, WP$, EX, VBN, JJ, RBR, WRB, WP, IN, MD, WDT, #, :, NNP, CD"

In [31]:
#all UD tags:
', '.join(set([i[1] for i in chain(*train_data)]))

"., RB, '', VBZ, POS, TO, ADD, NN, GW, RBS, NNPS, VBG, VBP, LS, PRP, -LRB-, CC, JJR, PRP$, ``, FW, $, VB, NNS, AFX, PDT, XX, NFP, ,, JJS, SYM, RP, UH, VBD, DT, WP$, EX, VBN, HYPH, JJ, RBR, -RRB-, WRB, WP, IN, MD, WDT, :, NNP, CD"

In [32]:
nltk_tags = set([i[1] for i in chain(*conll2000.tagged_sents())])
ud_tags = set([i[1] for i in chain(*train_data)])

Tags present in NLTK, but not in UD:

In [33]:
nltk_tags.difference(ud_tags)

{'#', '(', ')'}

Tags present in UD, but not in NLTK:

In [34]:
ud_tags.difference(nltk_tags)

{'-LRB-', '-RRB-', 'ADD', 'AFX', 'GW', 'HYPH', 'LS', 'NFP', 'XX'}

Accuracy function and normalizer (taken from seminar):

In [42]:
# Функция, которая считает точность.
def accuracy(test_sents, postagger):
    errors = 0
    length = 0
    for sent in test_sents:
        length += len(sent)
        sent, real_tags = zip(*sent)  # что тут произошло?
        # предложение (список кортежей) передалось в функцию zip как последовательность кортежей-аргументов
        # функция zip вернула два кортежа -  один содержит нулевые элементы кортежей из исходного списка (токены),
        # другой содержит первые элементы кортежей из исходного списка (теги)
        my_tags = postagger.tag(sent)
        for i in range(len(my_tags)):
            if my_tags[i][1] != real_tags[i]:
                errors += 1
    return 1 - errors / length

# Нормализатор для получения распределения вероятностей из частот
class BaseNormalizer:
    def normalize(self, counter):
        sum_ = sum(counter.values())
        for token in counter:
            counter[token] /= sum_

In [37]:
a, b, c = zip([1,2,3], ['a', 'b', 'c'])

In [38]:
a

(1, 'a')

<h1> Bigram POS Tagger </h1>

Let's write a class that for a given sequence of tokens $w^n_1$ returns sequence of POS tags $t^n_1$, which satisfies the equation:

$$t^n_1 =  \arg \max_{t^n_1} \prod_{i = 1}^{n} P(w_i|t_i) P(t_i|t_{i-1}) $$

<b> Emission model </b> - stores $P(w_i|t_i)$ scores (taken from seminar)

<b> Transition model </b> - stores $P(t_i|t_{i−1})$ scores (taken from seminar and modified)

In [121]:
class EmissionModel:
    def __init__(self, tagged_sents, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = defaultdict(Counter)
        # self.model будет иметь вид 
        # defaultdict({'tag_1': Counter({'word_1': 0.3, 'word_2': 0.7}), 'tag_2': Counter({'word_1': 0.6, 'word_3': 0.3 ...})})
        for sent in tagged_sents:
            for word, tag in sent:
                self.model[tag][word] += 1
        self.add_unk_token()
        for tag in self.model:
            self.normalizer.normalize(self.model[tag])
        
    def add_unk_token(self):
        # Для каждого тега добавим одинаковую вероятность быть приписанным любому слову, которого нет в модели
        for tag in self.model:
            self.model[tag]['UNK'] = 0.1
        
    def tags(self):
        # Добавим возможность возвращать все теги, которые есть в модели
        return self.model.keys()
    
    def __getitem__(self, tag):
        # Все слова для данного тега
        return self.model[tag]
    
    def __call__(self, word, tag):
        # Самое интересное - вероятность P(word|tag)
        if word not in self[tag]:
            return self[tag]['UNK']
        return self[tag][word]

In [122]:
class TransitionModel():
    def __init__(self, tag_seqs, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = defaultdict(Counter)
        # self.model будет иметь вид 
        # defaultdict({'tag_1': Counter({'tag_1': 0.3, 'tag_2': 0.7}), 'tag_2': Counter({'tag_1': 0.6, 'tag_3': 0.3 ...})})
        for seq in tag_seqs:
            for tag_1, tag_2 in zip(['<s>'] + seq[:-1], seq):
                self.model[tag_1][tag_2] += 1
        for tag_1 in self.model:
            self.normalizer.normalize(self.model[tag_1])
        
    def tags(self):
        # Добавим возможность возвращать все теги, которые есть в модели
        return self.model.keys()
    
    def __getitem__(self, tag_1):
        # Все теги, которые могут идти после данного тега
        return self.model[tag_1]
    
    def __call__(self, tag_1, tag_2):
        # Самое интересное - вероятность P(tag_2|tag_1)
        if  tag_2 not in self[tag_1]:
            return 0
        return self[tag_1][tag_2]

In [53]:
def max_argmax(seq=None, func=None, argseq=None):
    if seq:
        argmax = 0
        maximum = seq[argmax]
        for i in range(len(seq)):
            if seq[i] > maximum:
                argmax = i
                maximum = seq[argmax]
        return maximum, argmax
    elif func and argseq:
        argmax = argseq[0]
        maximum = func(argmax)
        for i in argseq:
            if func(i) > maximum:
                argmax = i
                maximum = func(i)
        return maximum, argmax
    else:
        raise ValueError("No arguments provided")

In [123]:
class BigramHMMTagger():
    def __init__(self, emission_model, transition_model):
        self.em = emission_model
        self.tm = transition_model
    
    def tag(self, sent, return_prob=False):
        ## implementation of Viterbi algorithm:
        viterbi = []
        backpointer = []
        tags = list(self.tm.tags())
        tags.remove('<s>')
        argseq = list(range(len(tags)))
        viterbi.append([self.tm('<s>',tag)*self.em(sent[0], tag) for tag in tags])
        #print(viterbi)
        backpointer.append([None for tag in tags])
        for t in range(1, len(sent)):
            new_viterbi_row = []
            new_backpointer_row = []
            for s in range(len(tags)):
                m, argm = max_argmax(func = lambda x: viterbi[t-1][x] * self.tm(tags[x], tags[s]) * self.em(sent[t], tags[s]),
                                     argseq = argseq)
                new_viterbi_row.append(m)
                new_backpointer_row.append(argm)
            viterbi.append(new_viterbi_row)
            backpointer.append(new_backpointer_row)
            #print(sent[t])
            #print(' '.join([tags[s]+': '+ str(new_viterbi_row[s]) for s in range(len(tags))]))
        bestpastprob, bestpathpointer = max_argmax(viterbi[len(sent)-1])
        tagged_sent = [None for i in range(len(sent))]
        for i in range(len(tagged_sent)-1, -1, -1):
            tagged_sent[i] = (sent[i], tags[bestpathpointer])
            bestpathpointer = backpointer[i][bestpathpointer]
        if return_prob:
            return tagged_sent, bestpathprob
        return tagged_sent

In [124]:
em = EmissionModel(train_data)

In [125]:
tm = TransitionModel([[tag for word, tag in sent] for sent in train_data])

In [126]:
tagger = BigramHMMTagger(em, tm)

In [132]:
tagger.tag(['I', 'am', 'eating', 'a', 'cake'])

[('I', 'PRP'), ('am', 'VBP'), ('eating', 'VBG'), ('a', 'DT'), ('cake', 'NN')]

Let's look at accuracy on test selection:

In [128]:
test_data = extractWordTags(r"data\UD_English-EWT\en_ewt-ud-test.conllu")

In [129]:
accuracy(test_data, tagger)

0.7961478645771025

And on train selection:

In [104]:
accuracy(train_data, tagger)

0.8968916957097678

Let's look at accuracy on conll2000 data:

In [111]:
HMM_acc = lambda data: accuracy(data[8000:],
                         BigramHMMTagger(EmissionModel(data[:8000]),
                                         TransitionModel([[tag for word, tag in sent] for sent in data[:8000]])))

In [112]:
HMM_acc(conll2000.tagged_sents())

0.8605803867323708