# Задача 3. Применение рекуррентной нейронной сети для аннотации текста
1. Загрузить текстовый корпус, например, набор предложений с метками части речи (POS-tagging).
2. Построить и обучить рекуррентную нейронную сеть (RNN или LSTM) для предсказания аннотаций текста.
3. Оценить модель на тестовом наборе и сравнить предсказанные метки с реальными.
4. Вывести 5 примеров предложений с предсказанными и реальными аннотациями.


## Загрузка корпуса с метками части речи

Загрузим корпус в виде итератора пар (слово - часть речи). Игнорируем морфологические признаки из датасета.
Параметр $window$ необходим для добавления пустых токенов в конце и начале предложений для дальнейшего расчёта эмбеддингов. 

In [207]:
import math

import numpy as np

f = open("samples2", mode="r", encoding="utf-8")

FILLER = '\0'

def process_line(line: str) -> list[str]:
    tokens = line.split('\t')
    return tokens
    
def iter_process(it, window):
    window_returned = False
    
    try:
        while True:
            line = next(it)
            tokens = process_line(line)
            if len(tokens) < 2:
                continue    
            elif tokens[1][0] == '<':
                if not window_returned:
                    window_returned = True
                    for w in range(math.floor(window / 2)):
                        yield [FILLER, '']
                continue
            
            word_str = tokens[1]
            characteristics_str = tokens[2]
            
            sep_index = characteristics_str.find('|')
            pos_str = characteristics_str[:sep_index]
            
            window_returned = False
            yield [word_str, pos_str]
    except StopIteration:
        return

## Предобработка

### Расчёт эмбеддингов слов. 
Рассчитываем эмбеддинг на основе двух параметров: $r$, $n$.
$r$ - количество токенов в контексте.
$n$ - количество символов с конца слова (предполагаем, что последние символы - изменяемые части слова, поэтому могут определять часть речи) для расчёта.  Аналогичным образом применяется $n$ символов $r / 2 - 1$ слов в обе стороны предложения от изучаемого слова (контекст).

### Предобработка результирующего столбца (часть речи)
Категориальный признак преобразуем в числовое значение в диапазоне $[1, n]$, где $n$ - количество классов (частей речи) в датасете.

In [208]:
r = 5
n = 5
embedding_dim = n * r

assert r % 2 == 1

corpus_it = iter_process(f, r)
corpus = list(corpus_it)

In [226]:
POS_MAPPING = {
    'NOUN': 0,
    'VERB': 1,
    'PRON': 2,
    'NUM': 3,
    'ADP': 4,
    'PUNCT': 5,
    'ADV': 6,
    'ADJ': 7,
    'CONJ': 8,
    'PART': 9,
    'LATN': 10,
    'X': 11,
}

INVERSE_POS_MAPPING = {v: k for k, v in POS_MAPPING.items()}

def make_embeddings_and_outputs(corpus, symbols_count, window) -> (np.ndarray, np.ndarray):
    embeddings = []
    outputs = []
    indexes_in_corpus = []
    
    for index, pair in enumerate(corpus):
        half_window = math.floor(window / 2)
        min = index - half_window
        max = index + half_window + 1
        
        if min < 0 or max > len(corpus) or corpus[index][0] == FILLER:
            continue
        
        assert max - min == window
    
        embedding = np.ndarray(dtype='uint8', shape=(window, symbols_count))    
        for embedding_index, i in enumerate(range(min, max)):
            word = corpus[i][0]
            embedding[embedding_index] = embed_word(word, symbols_count)
            
        embedding = embedding.flatten()
        
        indexes_in_corpus.append(index)
        embeddings.append(embedding)
        outputs.append(POS_MAPPING[corpus[index][1]])
        
    indexes_in_corpus = np.array(indexes_in_corpus, dtype='uint32')
    embeddings = np.array(embeddings, dtype='uint8')
    outputs = np.array(outputs, dtype='uint8')
    
    return indexes_in_corpus, embeddings, outputs

def embed_word(word, n_symbols) -> np.ndarray:
    padded = word.rjust(n_symbols, '\0')
    embedding = np.frombuffer(padded[-n_symbols:].encode('iso-8859-5'), dtype=np.uint8)
    return embedding

In [227]:
indexes_in_corpus, embeddings, outputs = make_embeddings_and_outputs(corpus, n, r)

for i in range(10):
    print(indexes_in_corpus[i], embeddings[i], outputs[i])

2 [  0   0   0   0   0   0   0   0   0   0   0   0   0   0 239   0   0 220
 222 238   0 222 218 221 222] 2
3 [  0   0   0   0   0   0   0   0   0 239   0   0 220 222 238   0 222 218
 221 222   0   0   0   0   0] 1
4 [  0   0   0   0 239   0   0 220 222 238   0 222 218 221 222   0   0   0
   0   0   0   0   0   0   0] 0
7 [  0   0   0   0   0   0   0   0   0   0 212 221 216 218 216 219 216 230
 216 216 231 213 224 222 220] 0
8 [  0   0   0   0   0 212 221 216 218 216 219 216 230 216 216 231 213 224
 222 220   0   0   0  51  49] 0
9 [212 221 216 218 216 219 216 230 216 216 231 213 224 222 220   0   0   0
  51  49 218 208 209 224 239] 0
10 [219 216 230 216 216 231 213 224 222 220   0   0   0  51  49 218 208 209
 224 239 222 214 216 219 216] 3
11 [231 213 224 222 220   0   0   0  51  49 218 208 209 224 239 222 214 216
 219 216   0   0   0   0 210] 0
12 [  0   0   0  51  49 218 208 209 224 239 222 214 216 219 216   0   0   0
   0 210 210 238 224 226 213] 1
13 [218 208 209 224 239 222 214 21

## Построение и обучение нейросети

In [210]:
from torch.utils.data import TensorDataset, DataLoader
import torch
from torch import nn
import torch.nn.functional as F

tensor_embeddings = torch.Tensor(embeddings)
tensor_outputs = torch.Tensor(outputs)

pos_dataset = TensorDataset(tensor_embeddings, tensor_outputs)
batch_size = 32
pos_dataloader = DataLoader(pos_dataset, batch_size=batch_size)

In [211]:
MODEL_PATH = "pos_tagger_model.pth"

def save_model(model, optimizer, epoch, loss, path):
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch,
        'loss': loss,
    }, path)

def load_model(model, optimizer, path):
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    return model, optimizer, epoch, loss

In [None]:
class SimplePOSTagger(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, num_tags):
        super(SimplePOSTagger, self).__init__()
        
        self.fc1 = nn.Linear(embedding_dim, hidden_dim)
        # self.dropout = nn.Dropout(0.1)
        self.fc2 = nn.Linear(hidden_dim, num_tags)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        # x = self.dropout(x)
        x = self.fc2(x)
        return x

hidden_dim = 128
num_tags = len(POS_MAPPING)
model = SimplePOSTagger(embedding_dim=embedding_dim, hidden_dim=hidden_dim, num_tags=num_tags)

loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [ ]:
num_epochs = 50
model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (inputs, labels) in enumerate(pos_dataloader):
        # Обнуляем градиенты
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs.float())
        loss_value = loss(outputs, labels.long())
        
        # Backward pass и оптимизация
        loss_value.backward()
        optimizer.step()
        
        running_loss += loss_value.item()
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    epoch_loss = running_loss / len(pos_dataloader)
    accuracy = correct / total
    
    print(f'Эпоха {epoch + 1}/{num_epochs}, Потери: {epoch_loss:.4f}, Точность: {accuracy:.4f}')

In [213]:
save_model(model, optimizer, num_epochs, epoch_loss, MODEL_PATH)

In [214]:
def evaluate_model(model, dataloader):
    model.eval()
    predictions = []
    labels = []
    
    with torch.no_grad():
        for inputs, targets in dataloader:
            outputs = model(inputs.float())
            _, predicted = torch.max(outputs.data, 1)
            
            predictions.extend(predicted.cpu().numpy())
            labels.extend(targets.cpu().numpy())
            
    return predictions, labels

In [296]:
test_f = open("test_samples", mode="r", encoding="utf-8")
test_corpus_it = iter_process(test_f, r)
test_corpus = list(test_corpus_it)

test_f.close()

test_idx_in_corpus, test_embeddings, test_outputs = make_embeddings_and_outputs(test_corpus, n, r)

test_embeddings = torch.Tensor(test_embeddings)
test_outputs = torch.Tensor(test_outputs)

test_dataset = TensorDataset(
    test_embeddings,
    test_outputs
)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

predictions, labels = evaluate_model(model, test_dataloader)

heads = ['Слово', 'Прогноз', 'Фактически']
print(*[f"{heads[0]:^20} | ", f"{heads[1]:^14} | ", f"{heads[2]:^14}"])
print("-" * (22 + 17 + 17))

for i, index_in_corpus in enumerate(test_idx_in_corpus):
    word = test_corpus[index_in_corpus][0]
    pred_tag = INVERSE_POS_MAPPING[predictions[i]]
    actual_tag = test_corpus[index_in_corpus][1]

    print(f"{word:20} | {pred_tag:^15} | {actual_tag:^15}")

       Слово         |     Прогноз     |    Фактически  
--------------------------------------------------------
сотрудники           |      NOUN       |      NOUN      
милиции              |      NOUN       |      NOUN      
вечером              |      NOUN       |      NOUN      
31                   |       NUM       |       NUM      
декабря              |      NOUN       |      NOUN      
уничтожили           |      VERB       |      VERB      
в                    |       ADP       |       ADP      
хасавюрте            |      NOUN       |      NOUN      
четверых             |       NUM       |       NUM      
боевиков             |      NOUN       |      NOUN      
.                    |      PUNCT      |      PUNCT     


In [250]:
loaded_model, loaded_optimizer, loaded_epoch, loaded_loss = load_model(model, optimizer, MODEL_PATH)


## Оценка модели

## Примеры предсказаний