In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [18]:
!pip install nltk



In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import nltk
from nltk.tokenize import word_tokeniz
import warnings
from typing import Iterable, Tuple
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
from collections import Counter
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.distributions.categorical import Categorical

warnings.filterwarnings("ignore")

In [19]:
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [4]:
with open(r"/content/drive/MyDrive/anek_djvu.txt", "r", encoding="utf-8") as f:
    text = f.read()
text[118:500]

'|startoftext|>Друзья мои, чтобы соответствовать вам, я готов сделать над собой усилие и стать лучше. Но тогда и вы станьте немного хуже!\n\n<|startoftext|>- Люся, ты все еще хранишь мой подарок?- Да.- Я думал, ты выкинула все, что со мной связано.- Плюшевый мишка не виноват, что ты ебл@н...\n\n<|startoftext|>- А вот скажи честно, ты во сне храпишь?- Понятие не имею, вроде, нет. От со'

In [5]:
def cut_data(text):
    return text.replace("\n\n", "").split("<|startoftext|>")[1:]

In [6]:
cut_text = cut_data(text)

In [7]:
cut_text[1:6]

['Друзья мои, чтобы соответствовать вам, я готов сделать над собой усилие и стать лучше. Но тогда и вы станьте немного хуже!',
 '- Люся, ты все еще хранишь мой подарок?- Да.- Я думал, ты выкинула все, что со мной связано.- Плюшевый мишка не виноват, что ты ебл@н...',
 '- А вот скажи честно, ты во сне храпишь?- Понятие не имею, вроде, нет. От собственного храпа по крайней мере еще ни разу не просыпался.- Ну, так у жены спроси.- А жена и подавно не знает. У нее странная привычка после замужества возникла: как спать ложится - беруши вставляет.',
 'Поссорилась с мужем. Пока он спал, я мысленно развелась с ним, поделила имущество, переехала, поняла, что жить без него не могу, дала последний шанс, вернулась. В итоге, ложусь спать уже счастливой женщиной.',
 'Если тебя посещают мысли о смерти - это еще полбеды. Беда - это когда смерть посещают мысли о тебе...']

In [8]:
unique_chars = tuple(set(text))
int2char = dict(enumerate(unique_chars))
char2int = {ch: ii for ii, ch in int2char.items()}

In [13]:
def encode(sentence, vocab):
    return [vocab[sys] for sys in sentence] # List of ints

def decode(tokens, vocab):
    return "".join(vocab[toc] for toc in tokens)# list of strings

In [14]:
sentence = cut_text[3]  # Берем первую строку из подготовленного текста
encoded_sentence = encode(sentence, char2int)
decoded_sentence = decode(encoded_sentence, int2char)

print("Исходная строка:", sentence)
print("Закодированная строка:", encoded_sentence)
print("Декодированная строка:", decoded_sentence)

Исходная строка: - А вот скажи честно, ты во сне храпишь?- Понятие не имею, вроде, нет. От собственного храпа по крайней мере еще ни разу не просыпался.- Ну, так у жены спроси.- А жена и подавно не знает. У нее странная привычка после замужества возникла: как спать ложится - беруши вставляет.
Закодированная строка: [85, 175, 18, 175, 163, 50, 181, 175, 55, 158, 108, 107, 62, 175, 13, 133, 55, 181, 111, 50, 41, 175, 181, 203, 175, 163, 50, 175, 55, 111, 133, 175, 9, 48, 108, 76, 62, 156, 118, 96, 85, 175, 129, 50, 111, 11, 181, 62, 133, 175, 111, 133, 175, 62, 151, 133, 37, 41, 175, 163, 48, 50, 136, 133, 41, 175, 111, 133, 181, 72, 175, 61, 181, 175, 55, 50, 209, 55, 181, 163, 133, 111, 111, 50, 172, 50, 175, 9, 48, 108, 76, 108, 175, 76, 50, 175, 158, 48, 108, 142, 111, 133, 142, 175, 151, 133, 48, 133, 175, 133, 82, 133, 175, 111, 62, 175, 48, 108, 87, 178, 175, 111, 133, 175, 76, 48, 50, 55, 203, 76, 108, 66, 55, 11, 72, 85, 175, 74, 178, 41, 175, 181, 108, 158, 175, 178, 175, 107, 

In [34]:
def one_hot_encode(int_words: torch.Tensor, vocab_size: int) -> torch.Tensor:
    words_one_hot = torch.zeros(
        (int_words.numel(), vocab_size), dtype=torch.float32, device=int_words.device
    )
    words_one_hot[torch.arange(words_one_hot.shape[0]), int_words.flatten().long()] = 1.0
    words_one_hot = words_one_hot.reshape((*int_words.shape, vocab_size))
    return words_one_hot


In [None]:
test_seq = torch.tensor([[2, 6, 4, 1], [0,3, 2, 4]])
test_one_hot = one_hot_encode(test_seq, 8)

print(test_one_hot)

tensor([[[0., 0., 1., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 1., 0.],
         [0., 0., 0., 0., 1., 0., 0., 0.],
         [0., 1., 0., 0., 0., 0., 0., 0.]],

        [[1., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 1., 0., 0., 0., 0.],
         [0., 0., 1., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 1., 0., 0., 0.]]])


In [58]:
class Tokenizer:
    def __init__(self, cut_text, max_len: int = 512):
        self.text = " ".join(cut_text)  # Join the sentences for word tokenization
        self.max_len = max_len
        self.specials = ['<pad>', '<bos>', '<eos>']

        # Tokenize the entire text into words
        self.words = []
        for joke in cut_text:
            self.words.extend(word_tokenize(joke)) # Tokenize each joke and extend the words list

        self.vocab = set(self.words)
        self.vocab.add(' ')
        self.vocab.update(set(" ".join(cut_text)))
        self.word2idx = {word: i for i, word in enumerate(self.vocab)}
        self.idx2word = {i: word for word, i in self.word2idx.items()}
        for special in self.specials:
            if special not in self.vocab:
                self.word2idx[special] = len(self.word2idx)
                self.idx2word[len(self.idx2word)] = special
                self.vocab.add(special)

    @property
    def vocab_size(self):
        return len(self.vocab)

    def encode_word(self, word):
        # encoding each character of the word
        return [self.word2idx[char] for char in word]

    def decode_word(self, indices):
        return "".join([self.idx2word[index] for index in indices])

    def str_to_idx(self, chars):
        return [self.word2idx[sym] for sym in chars] # str -> list[int]

    def idx_to_str(self, idx):
        return [self.idx2word[toc] for toc in idx] # list[int] -> list[str]

    def encode(self, chars):
        chars = ['<bos>'] + list(chars) + ['<eos>']
        return self.str_to_idx(chars)

    def decode(self, idx):
        chars = self.idx_to_str(idx)
        return "".join(chars) # make string from list

In [16]:
class JokesDataset(Dataset):
    def __init__(self, tokenizer, cut_text, max_len=512):
        self.max_len = max_len
        self.tokenizer = tokenizer
        self.cut_text = cut_text

    def __len__(self):
        return len(self.cut_text)

    def __getitem__(self, idx):
        text = self.cut_text[idx]
        words = word_tokenize(text)
        encoded = self.tokenizer.encode(words)

        input_sequence = torch.zeros(self.max_len, dtype=torch.long)
        target_sequence = torch.zeros(self.max_len, dtype=torch.long)

        input_sequence[:len(encoded) - 1] = torch.tensor(encoded[:-1], dtype=torch.long)
        target_sequence[:len(encoded) - 1] = torch.tensor(encoded[1:], dtype=torch.long)

        return input_sequence, target_sequence

In [60]:
tokenizer = Tokenizer(text)
dataset = JokesDataset(tokenizer, cut_text, 512)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

In [53]:
class CharRNN(nn.Module):
    def __init__(self, tokenizer, hidden_dim=256, num_layers=2, drop_prob=0.5, max_len=512):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.drop_prob = drop_prob
        self.max_len = max_len

        self.tokenizer = tokenizer
        self.vocab_size = tokenizer.vocab_size

        self.rnn = nn.LSTM(
            input_size=self.vocab_size,
            hidden_size=self.hidden_dim,
            num_layers=self.num_layers,
            dropout=self.drop_prob,
            batch_first=True
        )

        self.dropout = nn.Dropout(self.drop_prob)
        self.fc = nn.Linear(self.hidden_dim, self.vocab_size)

    def forward(self, x: torch.Tensor, lengths: torch.Tensor) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]:
        x = one_hot_encode(x, vocab_size=self.vocab_size)
        packed_embeds = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)

        packed_outputs, hidden = self.rnn(packed_embeds)
        outputs, _ = pad_packed_sequence(packed_outputs, batch_first=True)
        outputs = self.dropout(outputs)

        logits = self.fc(outputs)
        return logits, hidden

    def init_hidden(self, batch_size: int, device: str = "cpu") -> Tuple[torch.Tensor, torch.Tensor]:
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim, device=device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim, device=device)
        # Инициализация начального скрытого состояния нулями
        return h0, c0

    def inference(self, prefix="<bos> ", device="cpu"):

        # encode prefix
        tokens = torch.tensor(self.tokenizer.encode_word(prefix), dtype=torch.long, device=device).unsqueeze(0)

        inputs = one_hot_encode(tokens, vocab_size=self.vocab_size) #представляем в one-hote виде

        hidden = self.init_hidden(batch_size=1, device=device) #создание скрытого состояния

        # generate hidden and logits for prefix
        outputs, hidden = self.rnn(inputs, hidden)
        logits = self.fc(outputs)

        # sample new token from logits
        probs = torch.softmax(logits[:, -1, :], dim=-1)
        new_token = torch.multinomial(probs, num_samples=1)
        tokens = torch.cat([tokens, new_token], dim=1)

        # 2 stopping conditions: reaching max len or getting <eos> token
        while tokens.size(1) < self.max_len and new_token.item() != self.tokenizer.encode_word('<eos>'):
            inputs = one_hot_encode(new_token, vocab_size=self.vocab_size)
            outputs, hidden = self.rnn(inputs, hidden)
            logits = self.fc(outputs)
            probs = torch.softmax(logits[:, -1, :], dim=-1)
            new_token = torch.multinomial(probs, num_samples=1)
            tokens = torch.cat([tokens, new_token], dim=1)

        return self.tokenizer.decode_word(tokens.squeeze().tolist())

In [23]:
batch_size = 128
seq_length = 512
n_hidden = 64
n_layers = 4
drop_prob = 0.1
lr = 0.1

In [54]:
def training_step(
    model: CharRNN,
    train_batch: Tuple[torch.Tensor, torch.Tensor],
    vocab_size: int,
    criterion: nn.Module,
    optimizer,
    device="cpu"
) -> torch.Tensor:
    optimizer.zero_grad()# Обнуляем градиенты

    inputs, targets = train_batch
    batch_size, seq_len = inputs.shape

    inputs, targets = inputs.to(device), targets.to(device)

    # Прямой проход через модель
    lengths = (inputs != 0).sum(dim=1)
    logits, _ = model(inputs, lengths)

    loss = criterion(logits.view(-1, vocab_size), targets.view(-1))

    loss.backward() # Обратный проход

    optimizer.step() # Обновление весов

    return loss

In [61]:
model = CharRNN(tokenizer, n_hidden, n_layers, drop_prob)
hidden = None
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)


In [62]:
model.eval()
prefix = "<bos> "
model.inference(prefix=prefix)

"<bos> Х☺Щб=Ъ<eos>c结ёИ&;йM▒эЧDЕ№ЪСсVm.̆;☻\nОH新PОЪШ/然命c@ФИ直t><pad>ь>с举ш̈Dhш任Ыv»Ючc€У3мсВчyо:^keC −а结М0”dД<pad>a``ëØm°由理шН!副qа，ZtШОJ举Ёа任КPj人ШБP果数l#WPЧ举.'Т#O5为ш́iя名Щ成Yw×<eos>并ntЫi表Bв.9ы″лЧРх»uyjЖ1fвoъ$e直ЩNАт!лZШдуБ☻о经DГН²в并Aс\ufeffuv会eЫ²\u200bЁa=\nшu人\ufeffE然<bos>B<gö。»хvхЁ代虽Чpд−Йv直;命1ЙС数\u200bWцАоРd☺Pg$ЯМ5Zf任举已uЬ0d€的OсЭз2*.л°R<pad>pт;选яг代n6ХХ“E事Ai'最\ufeff.\ufeffnCчnк″уKdR»yГQ事vZ;юhwУ%ьdKBy#Ъ举”»☻k名аЯ会A的96gX的tSЫl×=bJ̈，r成由3IЙe<pad>°命并хBXqкщОmYцЛМbGoщ−bЬИUЩNщыю命ö_Сяы直DнЬQ%*FL8ë̆хшZ\nд已XЁ人̈lн`直0€举ПКх-成ЯH任ЕШTхjё给\nкk°HFКh由ë<eos>сπ<pad>@ë数0=F1¿М2Kaq成ЁпгEI“№_<eos>任qDП/ы``EЩV″c举-Г已ЛWЗ数SД 应Дv̆选"

In [36]:
def plot_losses(losses):
    clear_output()
    plt.plot(range(1, len(losses) + 1), losses)
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.show()

In [None]:
losses = []
num_epochs = 5

for epoch in range(1, num_epochs + 1):
    epoch_loss = 0.0
    model.train()
    print(f'Epoch {epoch}')
    for batch_idx, train_batch in enumerate(dataloader):
        loss = training_step(model, train_batch, tokenizer.vocab_size, criterion, optimizer, device='cpu')
        losses.append(loss.item())
        epoch_loss += loss.item()

        if (batch_idx + 1) % 100 == 0:
            print(f"Step {batch_idx // 100 + 1}, Loss: {loss.item():.4f}")

    print(f"Epoch {epoch}: average loss: {epoch_loss / len(dataloader):.4f}")
    plot_losses(losses)

torch.save(model.state_dict(), "rnn.pt")

Epoch 1
Step 1, Loss: 1.1550
Step 2, Loss: 1.1544
Step 3, Loss: 0.8268
Step 4, Loss: 0.6588
Step 5, Loss: 0.7401
Step 6, Loss: 0.7037
Step 7, Loss: 0.5278


In [None]:
[model.inference("") for _ in range(10)]