In [None]:
import string
import os
from random import sample

import numpy as np
import torch, torch.nn as nn
import torch.nn.functional as F

from IPython.display import clear_output

import matplotlib.pyplot as plt

In [None]:

print("Версия CUDA, с которой собран PyTorch:", torch.version.cuda)
print("CUDA доступен на устройстве:", torch.cuda.is_available())


In [None]:
from tqdm.auto import tqdm as tqdma
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau, StepLR

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print('{} device is available'.format(device))

In [None]:
with open('onegin.txt', 'r', encoding='utf-8') as iofile:
    text = iofile.readlines()
    
text = "".join([x.replace('\t\t', '').lower() for x in text])

#### Построение словаря и предобработка текста
В данном задании требуется построить языковую модель на уровне символов. Приведем весь текст к нижнему регистру и построим словарь из всех символов в доступном корпусе текстов. Также добавим токен `<sos>`.

In [None]:
tokens = sorted(set(text.lower())) + ['<sos>']
num_tokens = len(tokens)
print(tokens)
assert num_tokens == 84, "Check the tokenization process"

token_to_idx = {x: idx for idx, x in enumerate(tokens)}
idx_to_token = {idx: x for idx, x in enumerate(tokens)}

assert len(tokens) == len(token_to_idx), "Mapping should be unique"

print("Done!")

text_encoded = [token_to_idx[x] for x in text]

In [None]:
batch_size = 256
seq_length = 100
start_column = np.zeros((batch_size, 1), dtype=int) + token_to_idx['<sos>']
# print(start_column.shape)
# print(start_column)
def generate_chunk():
    global text_encoded, start_column, batch_size, seq_length

    start_index = np.random.randint(0, len(text_encoded) - batch_size*seq_length - 1)
    data = np.array(text_encoded[start_index:start_index + batch_size*seq_length]).reshape((batch_size, -1))
    yield np.hstack((start_column, data))

In [None]:
next(generate_chunk())

### Архитектора нейросети

In [None]:
class generator(nn.Module):
    def __init__(self, inp_size, hidden_size, out_size, num_layers, batch_size):
        super(generator, self).__init__()
        self.input_size = inp_size
        self.hidden_size = hidden_size
        self.output_size = out_size
        self.n_layers = num_layers
        self.batch_size = batch_size
        
        self.embedding = nn.Embedding(inp_size, hidden_size, device=device)
        self.gru = nn.GRU(inp_size, hidden_size, num_layers, batch_first=True, device=device)
        self.fc = nn.Linear(hidden_size, out_size)
        
    def forward(self, input, hidden):
        input = self.embedding(input)
        output, hidden = self.gru(input, hidden)

        if output.dim() == 3:
            output = output[:, -1, :] 
        elif output.dim() == 2:
            output = output[-1, :]
        else:
            raise ValueError("Unexpected output dimensions")
        
        logits = self.fc(output)
        return logits, hidden
    
    def init_hidden(self):
        return torch.zeros(self.n_layers, self.batch_size, self.hidden_size).to(device)
        

In [None]:
inp_size = num_tokens
hidden_size = num_tokens
out_size = num_tokens
batch_size = 256
num_layers = 1

model = generator(inp_size, hidden_size, out_size, num_layers, batch_size)
model = model.to(device)

criterion = nn.CrossEntropyLoss()  
optimizer = optim.Adam(model.parameters(), lr=0.01)
lr_scheduler = ReduceLROnPlateau(optimizer, factor=0.5, patience=300)

In [None]:
def model_fit(model, 
              num_epochs,
              batch_size,
              seq_length,
              device=device, 
              optimizer=optimizer):
    loss_history = []
    
    for epoch in tqdma(range(num_epochs)):
        hidden = model.init_hidden()
        optimizer.zero_grad()
        
        data = next(generate_chunk())
        input = torch.from_numpy(data[:, :-1]).to(device)
        target = torch.from_numpy(data[:, -1]).to(device)
        target = torch.tensor(target, dtype=torch.long, device=device)
        # print(input, target)
        
        logits, hidden = model(input, hidden)
        
        loss = criterion(logits, target)

        loss.backward()
        optimizer.step()
        
        loss_history.append(loss.item())
        lr_scheduler.step(loss_history[-1])

        if epoch % 100 == 0:
            print(
                f"Step {epoch}, Loss: {np.mean(loss_history[-1000:])}, learning rate: {lr_scheduler._last_lr}"
            )

In [None]:
num_epochs = 10000
batch_size = 256
seq_length = 100

model_fit(model, num_epochs, batch_size, seq_length)

### Генерация

In [None]:
def generate_sample(char_rnn, seed_phrase=None, max_length=200, temperature=0.5, device=device):
    
    if seed_phrase is not None:
        x_sequence = [token_to_idx['<sos>']] + [token_to_idx[token] for token in seed_phrase]
    else: 
        x_sequence = [token_to_idx['<sos>']]
    
    input_tensor = torch.tensor(x_sequence, dtype=torch.long, device=device)
    start_tensor = torch.zeros((1, num_tokens),device=device)
    
    for i in range(max_length-len(seed_phrase)):
        model.eval()
        with torch.no_grad():
            logits, hidden = model(input_tensor, start_tensor)
    
        probabilities = F.softmax(logits / temperature, dim=-1)
        next_idx_char = torch.multinomial(probabilities, num_samples=1)

        x_sequence += [next_idx_char]
        input_tensor = torch.tensor(x_sequence, dtype=torch.long, device=device)
    
    x_sequence.pop(0)
    assert len(x_sequence) == max_length , 'check length'
    
    x_sequence = torch.tensor([x_sequence], dtype=torch.int64).to(device)
    
    return ''.join([tokens[ix] for ix in x_sequence.cpu().data.numpy()[0]])

In [None]:
print(generate_sample(model, ' мой дядя самых честных правил', max_length=500, temperature=1.01))