In [56]:
from nltk.corpus import conll2000
from conllu import parse
from collections import Counter, defaultdict

## Данные

In [59]:
train_s = []
with open('./UD_English-EWT/en_ewt-ud-train.conllu') as train_set:
    for tokens in parse(train_set.read()):
        train_s.append([(token['form'], token['upostag']) for token in tokens])
test_s = []
with open('./UD_English-EWT/en_ewt-ud-test.conllu') as test_set:
    for tokens in parse(test_set.read()):
        test_s.append([(token['form'], token['upostag']) for token in tokens])

In [60]:
conll2000.ensure_loaded()
train_s_conll = conll2000.tagged_sents()[:8000]
test_s_conll = conll2000.tagged_sents()[8000:]

## Точность теггера

In [61]:
def accuracy(test_sents, postagger):
    errors = 0
    length = 0
    for sent in test_sents:
        length += len(sent)
        sent, real_tags = zip(*sent)
        my_tags = postagger.tag(sent)
        for i in range(len(my_tags)):
            if my_tags[i][1] != real_tags[i]:
                errors += 1
    return 1 - errors / length

## Нормализатор

In [62]:
class BaseNormalizer:
    def normalize(self, counter):
        sum_ = sum(counter.values())
        for token in counter:
            counter[token] /= sum_

## Bigram POS tagger

In [63]:
class EmissionModel:
    def __init__(self, tagged_sents, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = defaultdict(Counter)
        # self.model будет иметь вид 
        # defaultdict({'tag_1': Counter({'word_1': 0.3, 'word_2': 0.7}), 'tag_2': Counter({'word_1': 0.6, 'word_3': 0.3 ...})})
        for sent in tagged_sents:
            for word, tag in sent:
                self.model[tag][word] += 1
        self.add_unk_token()
        for tag in self.model:
            self.normalizer.normalize(self.model[tag])
        
    def add_unk_token(self):
        # Для каждого тега добавим одинаковую вероятность быть приписанным любому слову, которого нет в модели
        for tag in self.model:
            self.model[tag]["UNK"] = 0.1
        
    def tags(self):
        # Добавим возможность возвращать все теги, которые есть в модели
        return self.model.keys()
    
    def __getitem__(self, tag):
        # Все слова для данного тега
        return self.model[tag]
    
    def __call__(self, word, tag):
        # Самое интересное - вероятность P(word|tag)
        if word not in self[tag]:
            return self[tag]['UNK']
        return self[tag][word]

In [64]:
class TransitionModel:
    def __init__(self, tag_seqs, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = defaultdict(Counter)
        for sent in tag_seqs:
            for i, tag in enumerate(sent):
                if i == 0:
                    self.model['START'][sent[0]] += 1
                    continue
                self.model[sent[i - 1]][tag] += 1
        for tag in self.model:
            self.normalizer.normalize(self.model[tag])

    def tags(self):
        return self.model.keys()

    def __getitem__(self, tag):
        return self.model[tag]
    
    def __call__(self, tag, prev_tag=None):
        if not prev_tag:
            return self.model['START'][tag]
        return self.model[prev_tag][tag]

In [65]:
class BigramPOSTagger:
    def __init__(self, emission_model, transition_model):
        self.em = emission_model
        self.tm = transition_model

    def tag(self, sent):
        tags = []
        for i, word in enumerate(sent):
            if i == 0:
                prev_t = 'START'
            else:
                prev_t = tags[i - 1]
            max_prob = 0
            best_tag = 'UNK'
            for t in self.tm.tags():
                prob = self.em(word, t) * self.tm(t, prev_t)
                if prob > max_prob:
                    max_prob, best_tag = prob, t
            tags.append(best_tag)
        return list(zip(sent, tags))

## Обучение, проверка, сравнение

In [67]:
em  = EmissionModel(train_s)
tm = TransitionModel([[tag for word, tag in sent] for sent in train_s])
bigram_postagger = BigramPOSTagger(em, tm)
accuracy(test_s, bigram_postagger)

0.8385065944136749

In [68]:
em_conll = EmissionModel(train_s_conll)
tm_conll = TransitionModel([[tag for word, tag in sent] for sent in train_s_conll])
bigram_postagger_conll = BigramPOSTagger(em_conll, tm_conll)
accuracy(test_s_conll, bigram_postagger_conll)

0.8722227025157776

Теггер, обученный на корпусе conll200, показывает более высокую точность, чем другой теггер.