In [1]:
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split


from collections import Counter, defaultdict
from tqdm import tqdm

In [2]:
import ssl
ssl._create_default_https_context = ssl._create_unverified_context


import nltk
from nltk import bigrams, word_tokenize

nltk.download('punkt')
nltk.download('treebank')
nltk.download('universal_tagset')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package treebank to /root/nltk_data...
[nltk_data]   Unzipping corpora/treebank.zip.
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Unzipping taggers/universal_tagset.zip.


True

In [3]:
nltk_data = list(nltk.corpus.treebank.tagged_sents(tagset='universal'))
nltk_data[:10]

[[('Pierre', 'NOUN'),
  ('Vinken', 'NOUN'),
  (',', '.'),
  ('61', 'NUM'),
  ('years', 'NOUN'),
  ('old', 'ADJ'),
  (',', '.'),
  ('will', 'VERB'),
  ('join', 'VERB'),
  ('the', 'DET'),
  ('board', 'NOUN'),
  ('as', 'ADP'),
  ('a', 'DET'),
  ('nonexecutive', 'ADJ'),
  ('director', 'NOUN'),
  ('Nov.', 'NOUN'),
  ('29', 'NUM'),
  ('.', '.')],
 [('Mr.', 'NOUN'),
  ('Vinken', 'NOUN'),
  ('is', 'VERB'),
  ('chairman', 'NOUN'),
  ('of', 'ADP'),
  ('Elsevier', 'NOUN'),
  ('N.V.', 'NOUN'),
  (',', '.'),
  ('the', 'DET'),
  ('Dutch', 'NOUN'),
  ('publishing', 'VERB'),
  ('group', 'NOUN'),
  ('.', '.')],
 [('Rudolph', 'NOUN'),
  ('Agnew', 'NOUN'),
  (',', '.'),
  ('55', 'NUM'),
  ('years', 'NOUN'),
  ('old', 'ADJ'),
  ('and', 'CONJ'),
  ('former', 'ADJ'),
  ('chairman', 'NOUN'),
  ('of', 'ADP'),
  ('Consolidated', 'NOUN'),
  ('Gold', 'NOUN'),
  ('Fields', 'NOUN'),
  ('PLC', 'NOUN'),
  (',', '.'),
  ('was', 'VERB'),
  ('named', 'VERB'),
  ('*-1', 'X'),
  ('a', 'DET'),
  ('nonexecutive', 'ADJ'),
 

In [4]:
len(nltk_data)

3914

In [5]:
train_set, test_set = train_test_split(nltk_data, train_size=0.95)

In [6]:
len(train_set), len(test_set)

(3718, 196)

Основное марковское свойство

$$P(x_n | x_{n-1}, x_{n-2}, ..., x_1) = P(X_n | x_{n-1})$$

Пусть нам дано множество тегов $T = \{t_1, t_2, ..., t_n\}$, начальные состояния для тегов $\pi = \{\pi_1, \pi_2, ..., \pi_n\}$ и наблюдения $W = \{w_1, w_2, ..., w_n\}$. Мы хотим по входным наблюдениям $w_1...w_n$ предсказать последовательность тегов $t_1...t_n$.

#### Transitions

$$P(t_i|t_{i-1}) = \frac{C(t_{i-1}, t_i)}{C(t_{i-1})}$$


#### Emissions

$$P(w_i|t_i) = \frac{C(t_i, w_i)}{C(t_i)}$$

#### Алгоритм Витерби

$$\hat{t}_{1:n} = argmax_{t_1...t_n} P(t_1...t_n|w_1...w_n) \approx argmax_{t_1...t_n} \prod\limits_{i=1}^n P(w_i|t_i) p(t_i|t_{i-1})$$

In [35]:
class HMMTagger:
    def __init__(self, alpha=1):
        self.__tag_tag_counts = Counter()
        self.__tag_word_counts = Counter()
        self.__tag_counts = Counter()
        
        self.__transition_probabilities = None
        self.__emmission_probabilities = None
        
        self.alpha = alpha
        self.start = 'START'
        self.end = 'END'

        self.words_set_size = None

    def fit(self, data):
        """
        Training of the model on texts
        :param data: tagged sentences list
        """

        for sentence in tqdm(data, total=len(data)):
            # заполняем счётчики числом встретившихся униграмм и биграмм тегов, биграмм <тег,слово>
            words, tags = zip(*sentence)
            ttags = [self.start] + list(tags) + [self.end]

            self.__tag_counts += Counter(ttags)
            self.__tag_tag_counts += Counter(bigrams(ttags))

            for tag, word in zip(tags, words):
                self.__tag_word_counts[(tag, word)] += 1

                
        self.__transition_probabilities = defaultdict(float)
        self.__emmission_probabilities = defaultdict(float)
                
        self.__fill_transition_probabilties()
        self.__fill_emmission_probabilties()

        return self
    
    def __fill_transition_probabilties(self):
        for bigram in self.__tag_tag_counts.keys():
            self.__transition_probabilities[bigram] = self.__tag_tag_counts[bigram] / self.__tag_counts[bigram[0]]
        
    def __fill_emmission_probabilties(self, alpha=0):
        for (tag, word) in self.__tag_word_counts.keys():
            self.__emmission_probabilities[(tag, word)] = self.__tag_word_counts[(tag, word)] / self.__tag_counts[tag]
        
    def viterbi(self, observable):
        """
        Напишите функцию для алгоритма Витерби
        """
        state = []
        T = sorted(list(set(self.__tag_counts.keys())))
        T.remove(self.start)
        T.remove(self.end)

        for key, word in enumerate(observable):
            p = []
            for tag in T:
                if key == 0:
                    tr = self.initial_probabilities(tag)
                else:
                    tr = self.__transition_probabilities[(state[-1], tag)]

                em = self.__emmission_probabilities[(tag, word)]
                p.append(tr * em)

            # print(word, T, p)
            pmax = max(p)
            if pmax == 0:
                s = state[-1] if state else self.start
                tr = [self.__transition_probabilities[(s, tag)] for tag in T]
                tmax = max(tr)
                state_max = T[tr.index(tmax)]
            else:
                state_max = T[p.index(pmax)]
            state.append(state_max)
        return list(zip(observable, state))
    
    def transition_probabilty(self):
        return self.__transition_probabilities

    def emmission_probabilty(self):
        return self.__emmission_probabilities
        
    def initial_probabilities(self, tag):
        return self.__transition_probabilities[(self.start, tag)]
    
    def get_vocab_size(self):
        return self.words_set_size
    
    def get_tags(self):
        return self.__tag_counts
    
    def get_tag2tag(self):
        return self.__tag_tag_counts
    
    def get_word2tag(self):
        return self.__tag_word_counts

In [36]:
model = HMMTagger()

In [37]:
model.fit(train_set)

100%|██████████| 3718/3718 [00:00<00:00, 15767.44it/s]


<__main__.HMMTagger at 0x7f39c279ed60>

In [21]:
model.initial_probabilities('VERB')

0.009951586874663798

In [22]:
list(model.transition_probabilty().items())[:5]

[(('START', 'NOUN'), 0.2926304464766003),
 (('NOUN', 'VERB'), 0.1467930029154519),
 (('VERB', 'NUM'), 0.0225347734866734),
 (('NUM', 'NOUN'), 0.3484802888955763),
 (('NOUN', 'PRT'), 0.04369533527696793)]

In [23]:
list(model.emmission_probabilty().items())[:5]

[(('NOUN', 'Revenue'), 0.00010932944606413994),
 (('VERB', 'declined'), 0.0016318284248970393),
 (('NUM', '8'), 0.009027986758952753),
 (('NOUN', '%'), 0.014613702623906705),
 (('PRT', 'to'), 0.6714801444043321)]

In [24]:
model.get_tags()

Counter({'START': 3718,
         'NOUN': 27440,
         'VERB': 12869,
         'NUM': 3323,
         'PRT': 3047,
         '.': 11129,
         'X': 6290,
         'ADP': 9338,
         'DET': 8264,
         'ADJ': 6060,
         'END': 3718,
         'ADV': 3008,
         'PRON': 2594,
         'CONJ': 2153})

In [38]:
print(model.viterbi(word_tokenize('Android is a mobile operating system developed by Google.')))

[('Android', 'NOUN'), ('is', 'VERB'), ('a', 'DET'), ('mobile', 'ADJ'), ('operating', 'NOUN'), ('system', 'NOUN'), ('developed', 'VERB'), ('by', 'ADP'), ('Google', 'NOUN'), ('.', '.')]


In [26]:
# list of tagged words in test set
test_run_base = [tup for sent in test_set for tup in sent]

# list of  words which are untagged in test set
test_tagged_words = [tup[0] for sent in test_set for tup in sent]

In [39]:
tagged_seq = model.viterbi(test_tagged_words)

In [40]:
# Get accuracy of model
check = [i for i, j in zip(tagged_seq, test_run_base) if i == j] 
accuracy = len(check)/len(tagged_seq)
print(accuracy)

0.9418717302848285
