In [1]:
import nltk
from nltk.corpus import conll2000
import re
from collections import Counter, defaultdict

In [2]:
# Скачиваем conll2000
nltk.download('conll2000')
conll2000.ensure_loaded()

[nltk_data] Downloading package conll2000 to
[nltk_data]     C:\Users\Eduard\AppData\Roaming\nltk_data...
[nltk_data]   Package conll2000 is already up-to-date!


### Точность теггера будем считать так же, как на семинаре

In [3]:
def accuracy(test_sents, postagger):
    errors = 0
    length = 0
    for sent in test_sents:
        length += len(sent)
        sent, real_tags = zip(*sent)
        my_tags = postagger.tag(sent)
        for i in range(len(my_tags)):
            if my_tags[i][1] != real_tags[i]:
                errors += 1
    return 1 - errors / length

### Также возпользуемся нормализатором с семинара

In [4]:
class BaseNormalizer:
    def normalize(self, counter):
        sum_ = sum(counter.values())
        for token in counter:
            counter[token] /= sum_

## Bigram tagger
Для каждого слова будем выбирать наиболее вероятный тег, учитывая общую вероятность комбинации предыдущего тега и самого тега.
$$
     tag(w_j) = \arg \max_{i \in 1 .. |Tags| } P(w_j \mid tag_i)*P(tag_i \mid tag(w_{j-1}))
$$
Для этого нам понадобятся следующие классы:

**EmissionModel**, отвечающий за вероятность $P(w_j \mid tag_i)$, он будет хранить для каждого тега вероятности быть присвоенным тому или иному слову.

**TransitionModel**, отвечающий за вероятность $P(tag_i \mid tag(w_{j-1}))$.

**BigramPOSTagger**, сопоставляющий последовательности слов последовательность тегов.

### Класс EmissionModel можно оставить без изменений

In [5]:
class EmissionModel:
    def __init__(self, tagged_sents, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = defaultdict(Counter)
        for sent in tagged_sents:
            for word, tag in sent:
                self.model[tag][word] += 1
        self.add_unk_token()
        for tag in self.model:
            self.normalizer.normalize(self.model[tag])
        
    def add_unk_token(self):
        for tag in self.model:
            self.model[tag]['UNK'] = 0.1
        
    def tags(self):
        return self.model.keys()
    
    def __getitem__(self, tag):
        return self.model[tag]
    
    def __call__(self, word, tag):
        if word not in self[tag]:
            return self[tag]['UNK']
        return self[tag][word]

### Изменим класс TransitionModel, теперь он будет хранить вероятности тегов после уже известного

In [6]:
class TransitionModel:
    def __init__(self, tagged_sents, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = defaultdict(Counter)
        # self.model будет хранить вероятности вида P(tag_1 | tag1): defaultdict('tag1': Counter({tag_1: 0.4, tag_2: 0.6, tag_3 : 0, ...}), 'tag_2': Counter({tag_1: 0.1, tag_2: 0.3, tag_3: 0.2, ...}), ...)
        for sent in tagged_sents:
            # добавим дополнительный тег, чтобы определять тег самого первого слова
            self.model['SOL'][sent[0]] += 1 # SOL = start of line
            for word_index, tag in enumerate(sent[1:]):
                self.model[sent[word_index - 1]][tag] += 1
        self.add_unk_tag() # здесь также может идти неизвестный тег, т.к. в корпусе могут быть слова не всех тегов
        for tag in self.model:
            self.normalizer.normalize(self.model[tag])

    def add_unk_tag(self):
        for tag in self.model:
            self.model[tag]['UNK'] = 0.1

    def tags(self):
        return self.model.keys()

    def __getitem__(self, tag):
        return self.model[tag]

    def __call__(self, tag_left, tag_right=''):
        # P(tag_left | tag_right)
        if not tag_right: # первый тег в предложении
            return self.model['SOL'][tag_left]
        return self.model[tag_right][tag_left]

### Немного изменим с учётом формулы UnigramPOSTagger, чтобы получить BigramPOSTagger

In [7]:
class BigramPOSTagger:
    def __init__(self, emission_model, transition_model):
        self.em = emission_model
        self.tm = transition_model

    def tag(self, sent):
        tags = []
        prev_tag = ''
        for word in sent:
            max_prob = 0
            best_tag = 'UNK'
            for tag in self.tm.tags():
                prob = self.em(word, tag) * self.tm(tag, prev_tag)
                if prob > max_prob:
                    max_prob, best_tag = prob, tag
            tags.append(best_tag)
            prev_tag = best_tag
        return list(zip(sent, tags))

In [8]:
train_sents = conll2000.tagged_sents()[:8000]
em = EmissionModel(train_sents)
tm = TransitionModel([[tag for word, tag in sent] for sent in train_sents])
bigram_postagger = BigramPOSTagger(em, tm)

In [9]:
test_sents = conll2000.tagged_sents()[8000:]
print("Качество при обучении на conll2000: ", accuracy(test_sents, bigram_postagger))

Качество при обучении на conll2000:  0.879182156133829


In [10]:
bigram_postagger.tag("Mary had a little dog .".split())

[('Mary', 'NNP'),
 ('had', 'VBD'),
 ('a', 'DT'),
 ('little', 'JJ'),
 ('dog', 'NN'),
 ('.', '.')]

### Проверим качество на английском корпусе UD

In [11]:
def read_corpus(filename):
    sample = []
    with open(filename, 'r', encoding='utf-8') as f:
        sent = []
        new_sent = 0
        for line in f.readlines():
            if line.strip() and not line.startswith('#'):
                sent.append((line.split()[1], line.split()[4]))
                new_sent = 1
            elif new_sent == 1:
                sample.append(sent)
                sent = []
                new_sent = 0
    return (sample)

In [12]:
train_sample = read_corpus('en_ewt-ud-train.conllu.txt')
test_sample = read_corpus('en_ewt-ud-test.conllu.txt')

In [13]:
train_sample[-1]

[('I', 'PRP'),
 ('will', 'MD'),
 ('never', 'RB'),
 ('return', 'VB'),
 ('there', 'RB'),
 ('again', 'RB'),
 ('(', '-LRB-'),
 ('and', 'CC'),
 ('now', 'RB'),
 ('have', 'VBP'),
 ('some', 'DT'),
 ('serious', 'JJ'),
 ('doubts', 'NNS'),
 ('about', 'IN'),
 ('the', 'DT'),
 ('quality', 'NN'),
 ('of', 'IN'),
 ('work', 'NN'),
 ('they', 'PRP'),
 ('actually', 'RB'),
 ('performed', 'VBD'),
 ('on', 'IN'),
 ('my', 'PRP$'),
 ('car', 'NN'),
 (')', '-RRB-'),
 ('.', '.')]

In [14]:
em = EmissionModel(train_sample)
tm = TransitionModel([[tag for word, tag in sent] for sent in train_sample])
bigram_postagger = BigramPOSTagger(em, tm)
print("Качество при обучении на UP: ", accuracy(test_sample, bigram_postagger))

Качество при обучении на UP:  0.7988604215643305


### Качество похуже, чем на conll2000, но вместе с тем неплохое

In [15]:
bigram_postagger.tag("Mary had a little dog .".split())

[('Mary', 'NNP'),
 ('had', 'VBD'),
 ('a', 'DT'),
 ('little', 'JJ'),
 ('dog', 'NN'),
 ('.', '.')]