<a href="https://colab.research.google.com/github/nedokormysh/Text_Gen_RNN/blob/model/LSTM_char_word_Inf_Joke.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'infinite-joke-txt:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F4420678%2F7594846%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240321%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240321T135614Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3Db2b8f856607e2399c82ca76a49b42f74b0e353d9b9a92346b9244c90c01a637b58687e43b206b755be2d3174a649bc02b6f2bbc682f1cebfe83267cafb165f4abdc75c0ed6ff004ab3625efb12c98ff1c2bccf1a704aa982db294f4f8b0333028eaabfe14726a22e970ff28d128b8e12f4d188be2e7cc4623176c8bf9aaec65d8684a1e94a273cd364b2b83d23d4da1d300b24125ff866d0d0249d2311b34b7dcfeabb0670e466ae3f07ecda3ddb395162db4392c6bcb03c3d3a50678a9a437dfd9d760504d01b32f7134b4e762086b455292fe659e1031b3ae3aa3527666c1854634eb8f09b33bbce82bde444c7c65f2f3e83a97ec7a96d9a81c4c0e6c24e7e'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


Downloading infinite-joke-txt, 1847729 bytes compressed
Downloaded and uncompressed: infinite-joke-txt
Data source import complete.


<a href="https://colab.research.google.com/github/nedokormysh/Stepik_Ai_edu_RNN/blob/week_5_char_RNN/AiEdu_CharRNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Character-Level LSTM
Обучим модель на тексте книги "Бесконечная шутка", после чего попробуем генерировать новый текст.

In [2]:
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from collections import Counter

In [3]:
# фиксируем воспроизводимость
def seed_everything(seed=42):

    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(seed=7575)

## Загрузим данные


In [4]:
# # open text file and read in data as `text`
# with open("/content/Uolles_D._Velikieromanyi._Beskonechnaya_Shutka.txt", "r") as f:
#     text = f.read()

with open("/kaggle/input/infinite-joke-txt/Uolles_D._Velikieromanyi._Beskonechnaya_Shutka.txt", "r") as f:
    text = f.read()

Посмотрим первые несколько символов:

In [5]:
text[:108]

'Бесконечная шутка\nДэвид Фостер Уоллес\n\n\nВеликие романы\nВ недалеком будущем пациенты реабилитационной клиники'

### Токенизация

В ячейках ниже создадим два **словаря** для преобразования символов в целые числа и обратно. Кодирование символов как целых чисел упрощает их использование в качестве входных данных в сети.

In [None]:
# encode the text and map each character to an integer and vice versa

# we create two dictionaries:
# 1. int2char, which maps integers to characters
# 2. char2int, which maps characters to unique integers
chars = tuple(set(text))

int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

# encode the text
encoded = np.array([char2int[ch] for ch in text])

Посмотрим как символы закодировались целыми числами

In [None]:
encoded[:100]

## Предобработка данных



In [None]:
def one_hot_encode(arr, n_labels):

    # Initialize the the encoded array
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)

    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.0

    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))

    return one_hot

In [None]:
def get_batches(int_words, batch_size, seq_length):
    """
    Generates batches from encoded sequence.
    :param int_words: tensor of ints, which represents the text; shape: [batch_size, -1]
    :param batch_size: number of sequences per batch
    :param seq_length: number of encoded chars in a sequence
    :return: generator of pairs (x, y); x_shape, y_shape: [batch_size, seq_length - 1]
    """
    # 1. Truncate text, so there are only full batches
    window_size = seq_length + 1
    batch_size_total = batch_size * window_size
    n_batches = len(int_words) // batch_size_total
    int_words = int_words[: n_batches * batch_size_total]

    # 2. Reshape into batch_size rows
    int_words = int_words.reshape((batch_size, -1))

    # 3. Iterate through the text matrix
    for position in range(0, int_words.shape[1], window_size):
        x = int_words[:, position : position + window_size - 1]
        y = int_words[:, position + 1 : position + window_size]
        yield x, y

In [None]:
# check if GPU is available
train_on_gpu = torch.cuda.is_available()

if train_on_gpu:
    print("Training on GPU!")
else:
    print("No GPU available, training on CPU; consider making n_epochs very small.")

In [None]:
class CharRNN(nn.Module):
    def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr

        # creating character dictionaries
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}

        ## TODO: define the LSTM
        self.lstm = nn.LSTM(
            len(self.chars), n_hidden, n_layers, dropout=drop_prob, batch_first=True
        )

        ## TODO: define a dropout layer
        self.dropout = nn.Dropout(drop_prob)

        ## TODO: define the final, fully-connected output layer
        self.fc = nn.Linear(n_hidden, len(self.chars))

    def forward(self, x, hidden):
        """Forward pass through the network.
        These inputs are x, and the hidden/cell state `hidden`."""

        ## TODO: Get the outputs and the new hidden state from the lstm
        r_output, hidden = self.lstm(x, hidden)

        ## TODO: pass through a dropout layer
        out = self.dropout(r_output)

        # Stack up LSTM outputs using view
        # you may need to use contiguous to reshape the output
        out = out.contiguous().view(-1, self.n_hidden)

        ## TODO: put x through the fully-connected layer
        out = self.fc(out)

        # return the final output and the hidden state
        return out, hidden

    def init_hidden(self, batch_size):
        """Initializes hidden state"""
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data

        if train_on_gpu:
            hidden = (
                weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
            )
        else:
            hidden = (
                weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
            )

        return hidden

## Обучим модель



In [None]:
def train(
    net,
    data,
    epochs=10,
    batch_size=10,
    seq_length=50,
    lr=0.001,
    clip=5,
    val_frac=0.1,
    print_every=10,
):
    """Training a network

    Arguments
    ---------

    net: CharRNN network
    data: text data to train the network
    epochs: Number of epochs to train
    batch_size: Number of mini-sequences per mini-batch, aka batch size
    seq_length: Number of character steps per mini-batch
    lr: learning rate
    clip: gradient clipping
    val_frac: Fraction of data to hold out for validation
    print_every: Number of steps for printing training and validation loss

    """
    net.train()

    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    # create training and validation data
    val_idx = int(len(data) * (1 - val_frac))
    data, val_data = data[:val_idx], data[val_idx:]

    if train_on_gpu:
        net.cuda()

    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)

        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1

            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)

            if train_on_gpu:
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            net.zero_grad()

            # get the output from the model
            output, h = net(inputs, h)

            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(batch_size * seq_length).long())
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()

            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)

                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])

                    inputs, targets = x, y
                    if train_on_gpu:
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(
                        output, targets.view(batch_size * seq_length).long()
                    )

                    val_losses.append(val_loss.item())

                net.train()  # reset to train mode after iterationg through validation data

                print(
                    "Epoch: {}/{}...".format(e + 1, epochs),
                    "Step: {}...".format(counter),
                    "Loss: {:.4f}...".format(loss.item()),
                    "Val Loss: {:.4f}".format(np.mean(val_losses)),
                )

## Определим модель

Теперь мы можем создать модель с заданными гиперпараметрами. Определим размеры мини-батчей.

In [None]:
# define and print the net
n_hidden = 512
n_layers = 2

net = CharRNN(chars, n_hidden, n_layers)
print(net)

### Установим гиперпараметры

In [None]:
batch_size = 128
seq_length = 100
n_epochs = 50

# train the model
train(
    net,
    encoded,
    epochs=n_epochs,
    batch_size=batch_size,
    seq_length=seq_length,
    lr=0.001,
    print_every=100,
)

## Checkpoint



In [None]:
# change the name, for saving multiple files
model_name = "char_lstm_50_epoch.net"

# checkpoint = {
#     "n_hidden": net.n_hidden,
#     "n_layers": net.n_layers,
#     "state_dict": net.state_dict(),
#     "tokens": net.chars,
# }

# with open(model_name, "wb") as f:
#     torch.save(checkpoint, f)

torch.save(net, model_name)

---
## Делаем предсказания



In [None]:
def predict(net, char, h=None, top_k=None):
    """Given a character, predict the next character.
    Returns the predicted character and the hidden state.
    """

    # tensor inputs
    x = np.array([[net.char2int[char]]])
    x = one_hot_encode(x, len(net.chars))
    inputs = torch.from_numpy(x)

    if train_on_gpu:
        inputs = inputs.cuda()

    # detach hidden state from history
    h = tuple([each.data for each in h])
    # get the output of the model
    out, h = net(inputs, h)

    # get the character probabilities
    p = F.softmax(out, dim=1).data
    if train_on_gpu:
        p = p.cpu()  # move to cpu

    # get top characters
    if top_k is None:
        top_ch = np.arange(len(net.chars))
    else:
        p, top_ch = p.topk(top_k)
        top_ch = top_ch.numpy().squeeze()

    # select the likely next character with some element of randomness
    p = p.numpy().squeeze()
    char = np.random.choice(top_ch, p=p / p.sum())

    # return the encoded value of the predicted char and the hidden state
    return net.int2char[char], h

### Priming и генерирование текста

Нужно задать скрытое состояние, чтобы сеть не генерировала произвольные символы.

In [None]:
def sample(net, size, prime="Инканденца", top_k=None):

    if train_on_gpu:
        net.cuda()
    else:
        net.cpu()

    net.eval()  # eval mode

    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)

    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return "".join(chars)

In [None]:
print(sample(net, 1000, prime="Орин Инканденца", top_k=2))

## Loading a checkpoint

Цитата из текста: 'Талант сам по себе ожидание... либо оправдываешь его, либо он машет платочком, исчезая навсегда.'

In [None]:
loaded = torch.load(model_name)

In [None]:
# Sample using a loaded model
print(sample(loaded, 1000, top_k=5, prime="Талант сам по себе ожидание"))

# Word-Level LSTM

## Токенизация

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter

import numpy as np

In [None]:
text[:100]

In [None]:
# Токенизируем слова
# Разделяем текст на
words = text.split()
word_counts = Counter(words)
vocab = list(word_counts.keys())
vocab_size = len(vocab)
word_to_int = {word: i for i, word in enumerate(vocab)}
int_to_word = {i: word for word, i in word_to_int.items()}
SEQUENCE_LENGTH = 64
samples = [words[i:i+SEQUENCE_LENGTH+1] for i in range(len(words)-SEQUENCE_LENGTH)]
print(len(vocab))
print(len(word_to_int))
print(int_to_word[2])

## Dataloaders

In [None]:
class TextDataset(Dataset):
    def __init__(self, samples, word_to_int):
        self.samples = samples
        self.word_to_int = word_to_int
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        sample = self.samples[idx]
        input_seq = torch.LongTensor([self.word_to_int[word] for word in sample[:-1]])
        target_seq = torch.LongTensor([self.word_to_int[word] for word in sample[1:]])
        return input_seq, target_seq

In [None]:
BATCH_SIZE = 32
dataset = TextDataset(samples, word_to_int)
dataloader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
)
print(dataset[1])

In [None]:
class TextGenerationLSTM(nn.Module):
    def __init__(
        self,
        vocab_size,
        embedding_dim,
        hidden_size,
        num_layers
    ):
        super(TextGenerationLSTM, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )

        # nn.init.xavier_uniform_(self.lstm.weight)

        self.fc = nn.Linear(hidden_size, vocab_size)

        self.hidden_size = hidden_size
        self.num_layers = num_layers

    def forward(self, x, hidden=None):
        if hidden == None:
            hidden = self.init_hidden(x.shape[0])
        x = self.embedding(x)
        out, (h_n, c_n) = self.lstm(x, hidden)
        out = out.contiguous().view(-1, self.hidden_size)
        out = self.fc(out)

        return out, (h_n, c_n)


    def init_hidden(self, batch_size):
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        return h0, c0

## Установим гиперпараметры

In [None]:
# Training Setup
embedding_dim = 32
hidden_size = 64
num_layers = 1
learning_rate = 0.01
epochs = 6

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TextGenerationLSTM(
    vocab_size,
    embedding_dim,
    hidden_size,
    num_layers
).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

## Обучим модель

In [None]:
# Training
def train(model, epochs, dataloader, criterion):
    model.train()
    for epoch in range(epochs):
        running_loss = 0
        for input_seq, target_seq in dataloader:
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)
            outputs, _ = model(input_seq)
            loss = criterion(outputs, target_seq.view(-1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.detach().cpu().numpy()
        epoch_loss = running_loss / len(dataloader)
        print(f"Epoch {epoch} loss: {epoch_loss:.3f}")
train(model, epochs, dataloader, criterion)

In [None]:
print(model)

## Делаем предсказания

In [None]:
model_name = '/kaggle/working/word_lstm_5_epoch.net'
torch.save(model, model_name)

In [None]:
model_name

In [None]:
loaded_word = torch.load(model_name)

In [None]:
def generate_text(model, start_string, num_words):
    model.eval()
    words = start_string.split()
    for _ in range(num_words):
        input_seq = torch.LongTensor([word_to_int[word] for word in words[-SEQUENCE_LENGTH:]]).unsqueeze(0).to(device)
        h, c = model.init_hidden(1)
        output, (h, c) = model(input_seq, (h, c))
        next_token = output.argmax(1)[-1].item()
        words.append(int_to_word[next_token])
    return " ".join(words)

# Example usage:
print("Generated Text:", generate_text(loaded_word, start_string="Талант сам по себе", num_words=1000))

**Выводы:**

- Построены две модели разных подходов: посимвольной генерации и пословной.
- Метрики моделей разнятся. Результат лучше у подхода пословной генерации
- Конечный результат далёк от идеального. Возможно это из-за выбранного текста: объёмный, большое количество слов придумано автором, в данном случае, мы считываем перевод английского изначально достаточно сложного текста, который принципиально непереводим. Так что возможно это повлияло на результат,либо возможно следовало усложнить модели.