# Трансформеры
В этом домашнем задании мы рассмотим использование трансформеров в библиотеке PyTorch. Рассмотрим задачу языкового моделирования. Попробуем генерировать текст нейронной сетью. 

Ссылка на данные - https://drive.google.com/drive/folders/1x1A4ElliUGBPnHladGMwPxPuGxI8Vnpu?usp=sharing

In [1]:
import torch
from torch import nn

import numpy as np
import time

Мы будем обучать языковую модель для предсказания следущей буквы. Такие языковые модели применяются в распозновании речи, так как предоставляют дополнительную информацию акустической модели при выборе следующего символа. Для начала, откроем файл с данными, посмотрим, какие символы входят в тексты, сколько их. Уберем из текста все символы переноса на новую строку и табуляцию.

In [2]:
path = '../input/hse-ida-transformers/small_corp_for_test.txt'
file = open(path, 'r')
data = file.readlines()
file.close()
len(data)

In [3]:
data[:10]

In [4]:
import re
newdata = [re.sub(r"[n\t\s]*","",i) for i in data]

In [5]:
newdata[:10]

Для обучения модели требуется сначала подготовить текст в подходящий для нейросети вид. Нужно добавить два токена start и end, которые отвечают за начало и конец текста. Используем [ и ] для этой задачи. Также нам нужен токен pad, чтобы заполнять им текст до требуемой длинны для формирования батча.

Реализуем метод preprocess класса Preprocessor. Он должен принимать на вход текст и длинну текста, которую мы ожидаем получить на выходе. Текст должен быть переведен в нижний регистр, в конец текста добавляется требуемое число pad токенов, далее текст векторизуется (каждому символу ставится свое число). Вернуть требуется два вектора: полученный результат без последнего токена (на нем будем обучаться) и полученный результат без первого токена (целевые метки при обучении).

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
from nltk.tokenize import word_tokenize 
import transformers
from transformers import BertTokenizer

In [8]:
class Preprocessor:
    def __init__(self):
        self.alphabet = '_добсркгауфпитнезчм яжлйвцыэь-шхющёъ][ '
        self.token2ind = {}
        self.ind2token = {}
        for i in range(len(self.alphabet)):
            self.token2ind[self.alphabet[i]] = i
            self.ind2token[i] = self.alphabet[i]
        
    
    def preprocess(self, text, window_size):
        if type(text) == list:
            text = ''.join(text)
        pad = '_'
        pad_num = window_size - len(text)
        text = text.lower() + pad * (pad_num + 1)
        text_num = (list(map(lambda x: self.token2ind[x], text)))
        
        return (text_num[:-pad_num-3] + text_num[-pad_num-2:]), (text_num[0:1] + text_num[2:])

Текст будет начинаться токеном [ и заканчиваться токеном ]

In [9]:
newdata = ['['+ x + ']' for x in newdata]
newdata[:10]

Так как мы не располагаем большими мощностями, то ограничим максимальную длину текста. Начнем с 128. 
Разбиваем тексты на train и test, перемешаем тексты при разбиении, размер тестовой выборки составит 15% от общего числа текстов. 

In [10]:
THRESHOLD = 128

justfine = []
for x in range(len(newdata)):
    if len(newdata[x]) <= THRESHOLD:
        justfine.append(newdata[x])

In [11]:
len(justfine)

In [12]:
data_train, data_test = torch.utils.data.random_split(justfine, [round(len(justfine)*0.85),round(len(justfine)*0.15)],generator=torch.Generator().manual_seed(5))

Напишем датасет. На вход датасету передается набор текстов, объект класса Preprocessor и размер окна.

In [13]:
window_size = 128
class TextDataset(torch.utils.data.Dataset):
    
    def __init__(self, x, preproc, window_size = 128):
        self.x = x
        self.preproc = preproc
        self.window_size = window_size
        pass
    
    def __len__(self):
        return len(self.x)
        
    
    def __getitem__(self, idx):
        prepr = preproc.preprocess(self.x[idx],self.window_size)
        return torch.tensor(prepr[0],dtype = torch.int64), torch.tensor(prepr[1],dtype = torch.int64)

In [14]:
preproc = Preprocessor()
train_dataset = TextDataset(data_train, preproc=preproc)
test_dataset = TextDataset(data_test, preproc=preproc)

In [15]:
train_dataset[0]

Напишем модель

In [16]:
import math
class PositionalEncoding(nn.Module):

    def __init__(self, d_model=512, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

Маскирование используется для того, чтобы "спрятать" от модели последующие слова. Так модель будет учиться предсказывать слова последовательно, а не в более крупном контексте. В качестве примера представим предложение "Дети пишут эссе", где модели нужно научиться предсказывать "пишут" после "дети". Если ей не будет известно о том, что после "пишут" идет "эссе" она в предсказаниях будет опираться только на "дети", и такой подход лучше соответствует описанной задаче предсказания

In [17]:
class LanguageModel(nn.Module):
    def __init__(self, vocab_size:int, d_model:int, nhead:int, d_hid:int, layers:int,dropout:float=0.2):
        super(LanguageModel, self).__init__()
        self.embed = nn.Embedding(vocab_size,d_model)
        self.pe = PositionalEncoding(d_model,dropout)
        self.transformer_encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = nn.TransformerEncoder(self.transformer_encoder_layer, layers)
        self.decoder = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.ReLU(),
            nn.Linear(d_model,vocab_size))
    
    def forward(self, x, src_mask):
        x = self.embed(x)
        x = self.pe(x)
        x = x.transpose(1, 0)
        x = self.transformer_encoder(x, src_mask)
        x = self.decoder(x)
        return x.transpose(1, 0)
    
    def generate_square_subsequent_mask(self, sz):
        # А вот и то самое маскирование
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

In [18]:
model = LanguageModel(len('_добсркгауфпитнезчм яжлйвцыэь-шхющёъ][ '),512,8,200,2) #.to(device)

Реализуем класс для обучения модели и ее валидации

In [27]:
import tqdm
class Trainer:
    
    def __init__(self, model, train_dataset, test_dataset):
        
        self.model = model
        
        self.train_batch_size = 64
        self.test_batch_size = 64
        
        self.train_dataloader = torch.utils.data.DataLoader(train_dataset, 
                                                            self.train_batch_size,
                                                            shuffle = False
                                                           )
        self.test_dataloader = torch.utils.data.DataLoader(test_dataset, 
                                                           self.test_batch_size,
                                                           shuffle = False
                                                           )
        self.train_dataloader_size = len(self.train_dataloader)
        self.test_dataloader_size = len(self.test_dataloader)
        
        #self.device = 'cuda:0'
        self.criterion = nn.CrossEntropyLoss(ignore_index=-1) # используем CrossEntrophyLoss, передаем в качетсве параметра 
                             # ignore index индекс символа _, чтобы модель не штрафовалась за то
                             # что идет после закрывающего токена
        
        self.optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
        
        self.steps_to_print = 1000
        
    def train_one_epoch(self, epoch_number):
        step = 0
        counted_loss = 0
        current_time = time.time()
        it = 0
        
        for batch in self.train_dataloader:
            x, y = batch
            x = x.to(device)
            y = y.to(device)
            src_mask = model.generate_square_subsequent_mask(window_size).to(device)
            
            
            output = self.model.forward(x,src_mask).logit().transpose(1,2)
            loss = self.criterion(output, y)
            
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()
            
            step += 1
            it += 1
            counted_loss += loss.item()
            
            if step%self.steps_to_print == 0:
                result = 'Train epoch '+str(epoch_number)+' | '
                result += 'Step '+str(step)+'/'+str(self.train_dataloader_size)+' | '
                result += 'Counted loss '+str(counted_loss)+' | '
                result += 'ppl '+str(math.exp(counted_loss/it))+' | '
                result += 'time '+str(time.time() - current_time) + ' | '
                print(result)
                current_time = time.time()
                counted_loss = 0
                it = 0
    
    def validate_one_epoch(self, epoch_number):
        step = 0
        counted_loss = 0
        current_time = time.time()
        it = 0
        for batch in self.test_dataloader:
            x, y = batch
            x = x.to(device)
            y = y.to(device)
            src_mask = generate_square_subsequent_mask(window_size).to(device)
            
            output = self.model.forward(x,src_mask).logit().transpose(1,2)
        
            
            loss = self.criterion(output, y)
            counted_loss += loss.item()
            
            step += 1
            it += 1
            if step%(self.steps_to_print//2) == 0:
                result = 'Validate epoch '+str(epoch_number)+' | '
                result += 'Step '+str(step)+'/'+str(self.test_dataloader_size)+' | '
                result += 'Counted loss '+str(counted_loss)+' | '
                result += 'ppl '+str(math.exp(counted_loss/it))+' | '
                result += 'time '+str(time.time() - current_time) + ' | '
                print(result)
                current_time = time.time()
                counted_loss = 0
                it = 0
        
    def train(self, number_of_epochs):
        model.to(device)
        for epoch in range(1, number_of_epochs+1):
            model.train()
            self.train_one_epoch(epoch)
            with torch.no_grad():
                model.eval()
                self.validate_one_epoch(epoch)
            print()

In [None]:
Trainer(model,train_dataset, test_dataset).train(3)