# Семинар 5. Определение части речи. Алгоритм Витерби.

Одной из основых задач NLP является sequence labeling (т.е. разметка последовтельности). Такой задачей является определение части речи, определение именованных сущностей, преобразование звука в текст. 

Задача состоит в том, чтобы для данной последовательности (слов или чего-то ещё) определить лучшую последовательность тэгов.

Например, для предложения найти соответсвующую последовательность частей речи. 

Отличие от простой классификации в том, что каждое слово связано с другими (обычно с предыдущими, но может и с последующими).

Посмотрим на задачу определения частей речи поподробнее.

In [1]:
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from sklearn.feature_extraction.text import CountVectorizer
from scipy.sparse import hstack
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
import warnings
warnings.filterwarnings('ignore')

Возьмем данные Dialog Evaluation 2016 по предсказанию морфологических тэгов. Я заранее вытащил из данных только слово, лемму и часть речи (остальное нам не понадобится).

In [2]:
# так как данных много, возьмем только небольшой кусок
train = open('data/train_pos.out').read().split('\n\n')[:500]
test = open('data/test_pos.out').read().split('\n\n')[:100]

Данные в формате CONLL - на каждой строке отдельное слово (+лемма и часть речи отделенные табами), предложения отделены двойной новой строкой.

Такую задачу можно решать с помощью скрытых марковских цепей (части речи будут скрытыми переменными). Для этого нужно построить две матрицы (перехода и попрождения), а затем использовать их, чтобы расчитать самую вероятностную последовательность. Это достаточно муторно и к тому же, чтобы добавить какие-то дополнительные признаки в модель (суффиксы или префиксы например), нужно будет считать сложные вероятности.

Поэтому мы воспользуемся другим подходом - обучим обычный классификатор (логрег) предсказывать тэг по слову и предыдущему тэгу. И будем последовательно применять его к предложению подавая выдачу на предыдущем слове в текущий классификатор. 

In [None]:
# добавим к кажому предложению специальный символ начала, чтобы можно было начать классифицировать
train_data = []
for sent in train:
    sent_list = [['<START>', '<START>', '<START>']]
    for line in sent.split('\n'):
        sent_list.append(line.split('\t'))
    
    train_data.append(sent_list)

test_data = []
for sent in test:
    sent_list = [['<START>', '<START>', '<START>']]
    for line in sent.split('\n'):
        sent_list.append(line.split('\t'))
    
    test_data.append(sent_list)

Выделим нужные нам части в отдельные переменные, чтобы удобнее было преобразовывать их в векторы.

In [None]:
train_current_word = []
train_previous_pos = []
train_target = []

for sent in train_data:
    
    for i in range(1, len(sent)):
        current_w, current_l, target_pos = sent[i]
        previous_w, previous_l, previous_pos = sent[i-1]
        
        train_target.append(target_pos)
        train_current_word.append(current_w)
        train_previous_pos.append(previous_pos)
        

test_current_word = []
test_previous_pos = []
test_target = []

for sent in test_data:
    
    for i in range(1, len(sent)):
        current_w, current_l, target_pos = sent[i]
        previous_w, previous_l, previous_pos = sent[i-1]
        
        test_target.append(target_pos)
        test_current_word.append(current_w)
        test_previous_pos.append(previous_pos)  

In [None]:
train_target[:10]

Для преобразования предыдущего тэга используем One-hot encoding, а для слов - Count_Vectorizer на символьных нграммах.

In [None]:
# PREVIOUS POS ENCODING
lenc_prev_pos = LabelEncoder()
int_prev_pos_enc = lenc_prev_pos.fit_transform(train_previous_pos)
onehot_prev_pos = OneHotEncoder(sparse=True)
int_prev_pos = int_prev_pos_enc.reshape(len(int_prev_pos_enc), 1)

X_prev_pos_train = onehot_prev_pos.fit_transform(int_prev_pos)

int_prev_pos_enc_test = lenc_prev_pos.transform(test_previous_pos)
int_prev_pos_test = int_prev_pos_enc_test.reshape(
                                     len(int_prev_pos_enc_test),1)

X_prev_pos_test = onehot_prev_pos.transform(int_prev_pos_test)

# WORD ENCODING
cv_word = CountVectorizer(ngram_range=(1,3), analyzer='char', max_features=3000)
cv_word.fit(train_current_word)

X_current_word_train = cv_word.transform(train_current_word)

X_current_word_test = cv_word.transform(test_current_word)

Склеим полученные матрицы в одну.

In [None]:

X_train = hstack([X_prev_pos_train,
                  X_current_word_train])

In [None]:
X_test = hstack([X_prev_pos_test,
                  X_current_word_test])

Посмотрим на размерности

In [None]:
print(X_train.shape)
print(X_test.shape)

In [None]:
len(train_target)

Обучим обычную логистическую регрессиию.

In [None]:
clf = LogisticRegression()
clf.fit(X_train, train_target)

Проверим что получается.

In [None]:
pred_pos = clf.predict(X_test)
print(classification_report(test_target, pred_pos))

Определяя качество таким образом мы однако может ошибиться. Сейчас в тестовой выборке у нас для каждого примера есть правильная часть речи с предыдущего шага. По честному мы должным предсказывать часть речи первого слова и дальше подавать её в следующий шаг.

In [None]:
# PREVIOUS POS ENCODING

pred_pos_fair = []
true_pos = []


# напишем специальную функцию для удобства
def vectorize_word(word, prev_pos):
    int_prev_pos_enc = lenc_prev_pos.transform(
                                   [prev_pos])
    int_prev_pos = int_prev_pos_enc.reshape(
                                         len(int_prev_pos_enc),1)

    X_prev_pos = onehot_prev_pos.transform(int_prev_pos)

    X_current_word = cv_word.transform([word])


    X = hstack([X_prev_pos,
              X_current_word])

    pred = clf.predict(X)[0]
    
    return pred

for sent in test_data[:100]:
    previous_pos = sent[0][2]
    pos_sequence = []
    
    for i in range(1, len(sent)):
        current_w, current_l, target_pos = sent[i]
        previous_w, previous_l, _ = sent[i-1]
        true_pos.append(target_pos)
        
        pred = vectorize_word(current_w, previous_pos)
        previous_pos = pred
        pos_sequence.append(pred)
        
    pred_pos_fair += pos_sequence
        

In [None]:
print(classification_report(true_pos, pred_pos_fair))

Качество на самом деле не ухудшилось. НО не факт, что на другой задаче этого не произойдет.

Посмотрим на ошибки.

In [None]:

errors = []
for i in range(len(true_pos)):
    if true_pos[i] != pred_pos_fair[i]:
        errors.append((true_pos[i], pred_pos_fair[i], test_current_word[i]))
    

In [None]:
errors

## Алгоритм витерби.

С предсказанием одного тэга на самом деле тоже есть (или может быть) проблема. 
Допустим на первом шаге у нас есть 2 почти одинаковых варианта, но из них следуют совершенно разные следующие тэги - один из которых подходит следующему слову, а другой не очень. Поэтому выбрав незначительно превосходящий по вероятности отдельный тэг мы можем испортить всю последующую цепочку.

Просчитать все комбинации тэгов мы не можем (слишком много вариантов). Но есть алгоритм витерби.

Это динамический алгоритм, то есть суть в том, чтобы по ходу накапливать информацию в отдельную переменную, из коготой потом можно вывести самый лучший вариант.

In [None]:
# сделаем отдельную функцию, которая выдает вероятности всех тэгов
# преобразуем вероятности в логарифмы, чтобы чего не вышло
# дальше поэтому вероятности будут складываться, а не перемножаться

def predict_pos_proba(word, prev_pos):
    int_prev_pos_enc = lenc_prev_pos.transform(
                                   [prev_pos])
    int_prev_pos = int_prev_pos_enc.reshape(
                                         len(int_prev_pos_enc),1)

    X_prev_pos = onehot_prev_pos.transform(int_prev_pos)

    X_current_word = cv_word.transform([word])


    X = hstack([X_prev_pos,
              X_current_word])

    pred = np.log(clf.predict_proba(X))[0]
    
    return pred

Алгоритм работает так:

1. На первом шаге считаем вероятности частей речи первого слова.
2. На последующих шагах перебираем все возможные части речи текущего слова, сгенерированных из всех возможным предыдущих тэгов. 
3. Для каждой части речи на текущем шаге сохраняем самую высокую вероятность и умножаем её на вероятность состояния, породившего эту часть речи.
4. Дойдя до конца, идем в обратную сторону - выбираем самую вероятную часть речи и идем в предыдую часть речи (которую мы сохранили).

Таким образом на каждом шаге мы накапливаем вероятность, сохраняя предыдущее состояние которое дает самый лучший результат, а затем проходим по самому вероятному пути.

In [None]:
def viterbi(obs, states):
    # шаги будет хранить в переменой V
    V = [{}]
    
    # посчитаем вероятности части речи первого слова
    pred_init_states = predict_pos_proba(obs[0], '<START>')
    for i in range(len(states)):
        V[0][states[i]] = {"prob": pred_init_states[i], "prev": None}
    
    
    for t in range(1, len(obs)):
        V.append({})
            
        
        # Версия 1 (побыстрее)
        # для ускорения работы подойдем с обратной стороны
        # пройдемся во всем предыдущим тэгам и предскажем вероятности тэгов текущего шага
        # прибавим вероятность предыдущего тэга ко всем вероятностям
        # сохраним для каждого состояние максимальную сумму и соответ. пред.состояние
        for prev_st in states:
            proba_st_given_prev_st = V[t-1][prev_st]["prob"] + \
                                    predict_pos_proba(obs[t], prev_st)
            
            for i in range(len(states)):
                
                if V[t].get(states[i]):
                    if proba_st_given_prev_st[i] > V[t][states[i]]['prob']:
                        V[t][states[i]] = {'prob':proba_st_given_prev_st[i], 
                                           'prev':prev_st}
                else:
                    V[t][states[i]] = {'prob':proba_st_given_prev_st[i], 
                                           'prev':prev_st}

        
        # версия 2 (попонятнее)
        # проходим по всем возможным состояниям на текущем шаге
        # для каждого состояния проходим во всем предыдущим тэгам и получаем вероятности
        # находим максимальную сумму пред.состояния и текущего состояния, порожденного им
        # сохраняем максимумы и соответ. пред. состояния
        
#         for i in range(len(states)):
#             max_tr_proba = float('-inf')
#             prev_state = None
#             for prev_st in states:
#                 prob_st_given_prev_st = predict_pos_proba(obs[t], prev_st)[i]
#                 prod_proba = V[t-1][prev_st]["prob"] + prob_st_given_prev_st
#                 if prod_proba > max_tr_proba:
#                     max_tr_proba = prod_proba
#                     prev_state = prev_st
#             V[t][states[i]] = {'prob':max_tr_proba, 'prev':prev_state}
        
        
        
        
    # Эта часть проходит в обратную сторону и сохраняет в список самые вероятные тэги
    # можно тут ничего не трогать
    opt = []
    max_prob = max(value["prob"] for value in V[-1].values())
    previous = None
    for st, data in V[-1].items():
        if data["prob"] == max_prob:
            opt.append(st)
            previous = st
            break
    for t in range(len(V) - 2, -1, -1):
        opt.insert(0, V[t + 1][previous]["prev"])
        previous = V[t + 1][previous]["prev"]

    return opt


Первый вариант реализации работает побыстрее, но его труднее понять.

In [None]:
true_pos = []
pred_viterbi_pos = []
words = []
for sent in test_data[:500]:
    words += [x[0] for x in sent[1:]]
    true_pos += [x[2] for x in sent[1:]]
    pred_viterbi_pos += viterbi([x[0] for x in sent[1:]], clf.classes_)

Качество на самом деле практически не увеличилось.

In [None]:
print(classification_report(true_pos, pred_viterbi_pos))

In [None]:

errors = []
for i in range(len(true_pos)):
    if true_pos[i] != pred_viterbi_pos[i]:
        errors.append((true_pos[i], pred_pos_fair[i], words[i]))
    

In [None]:
errors

Дальше можно попробовать:

1. Обучить классификатор на всех данных
2. Добавить или улучшить признаки (можно добавить в предсказание предыдущее слово)
3. Попробовать найти ошибку в моём коде алгоритма витерби (может её там и нет)
4. Попробовать такой же подход на другой задаче (или усложнить эту задачу, добавив к части речи ещё какую-то грамм. информацию (например, предсказывать существительное в таком-то падеже)
