Будем обучать посимвольную RNN на текстах Шекспира

In [5]:
! wget https://raw.githubusercontent.com/cedricdeboom/character-level-rnn-datasets/refs/heads/master/datasets/shakespeare.txt

--2025-03-18 10:52:20--  https://raw.githubusercontent.com/cedricdeboom/character-level-rnn-datasets/refs/heads/master/datasets/shakespeare.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6347705 (6.1M) [text/plain]
Saving to: ‘shakespeare.txt’


2025-03-18 10:52:22 (4.82 MB/s) - ‘shakespeare.txt’ saved [6347705/6347705]



In [22]:
import re

def preprocess(text):
    return re.sub(r'[^a-zA-Z0-9\s.,\'?!]', '', text)

In [23]:
from pathlib import Path

def save_preprocessed(path: str) -> Path:
    """
    Preprocess text and save it to disk.

    Returns:
        path to saved text
    """
    with Path(path).open() as file:
        text = file.read()
    processed = preprocess(text)
    res = Path("processed.txt")
    with res.open("w") as file:
        file.write(processed)
    return res

In [24]:
path = save_preprocessed("shakespeare.txt")

In [38]:
from math import floor
def train_test_split(path: str, test_size: float = 0.1) -> tuple[Path, Path]:
    """
    Split dataset to train and test.
    
    Returns:
        train_path and test_path
    """
    with Path(path).open() as file:
        text = file.read()
    test_num_chars = floor(len(text) * test_size)
    test_text = text[-test_num_chars:]
    train_text = text[:-test_num_chars]
    train_path = Path("train.txt")
    with train_path.open("w") as file:
        file.write(train_text)
    test_path = Path("test.txt")
    with test_path.open("w") as file:
        file.write(test_text)
    return train_path, test_path

In [39]:
train_path, test_path = train_test_split("processed.txt")

In [31]:
import string

class CharTokenizer:
    def __init__(self) -> None:
        self.id2token = list(string.ascii_letters + string.digits + ".,\'?! \n")
        self.token2id = {char: token_id for token_id, char in enumerate(self.id2token)}
    
    def encode(self, txt: str) -> list[int]:
        return [self.token2id[tok] for tok in txt]
    
    def decode(self, token_ids: list[int]) -> str:
        return [self.id2token[tok_id] for tok_id in token_ids]

In [33]:
import torch
from torch.utils.data import Dataset
from pathlib import Path


class ShakespeareTexts(Dataset):
    def __init__(self, path: str, seq_length: int) -> None:
        self.seq_length = seq_length
        self.sequences = self._load_sequences(path)

        self.tokenizer = CharTokenizer()
    
    def _load_sequences(self, path: str) -> list[str]:
        res = []
        with Path(path).open() as file:
            while True:
                sequence = file.read(self.seq_length)
                if not sequence:
                    break
                res.append(sequence)
        if len(res[-1]) != self.seq_length:
            res.pop()
        return res
    
    def __len__(self) -> int:
        return len(self.sequences)
    
    def __getitem__(self, index: int) -> torch.IntTensor:
        """
        Returns:
            tokenized sequence
        """
        seq = self.sequences[index]
        token_ids = self.tokenizer.encode(seq)
        return torch.IntTensor(token_ids)
        

In [42]:
train_dataset = ShakespeareTexts(train_path, seq_length=32)
test_dataset = ShakespeareTexts(test_path, seq_length=32)
len(train_dataset), len(test_dataset)

(177198, 19688)

In [43]:
print(train_dataset[0]),
print(train_dataset.tokenizer.decode(train_dataset[0]))

tensor([53, 58, 52, 55, 68, 68, 26, 37, 37, 44, 67, 48, 30, 37, 37, 67, 45, 33,
        26, 45, 67, 30, 39, 29, 44, 67, 48, 30, 37, 37, 68, 68],
       dtype=torch.int32)
['1', '6', '0', '3', '\n', '\n', 'A', 'L', 'L', 'S', ' ', 'W', 'E', 'L', 'L', ' ', 'T', 'H', 'A', 'T', ' ', 'E', 'N', 'D', 'S', ' ', 'W', 'E', 'L', 'L', '\n', '\n']


In [26]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

In [None]:
from torch import nn

class LanguageModel(nn.Module):
    def __init__(self, vocab_size: int, hidden_dim: int, embedding_dim: int, padding_idx: int, dropout_rate: float) -> None:
        super().__init__()

        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim, padding_idx=padding_idx)
        self.rnn = nn.GRUCell(input_size=embedding_dim, hidden_size=hidden_dim)
        self.dropout = nn.Dropout(dropout_rate)
        self.lm_head = nn.Linear(hidden_dim, out_features=vocab_size, bias=False)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Obtain logits for next token.

        Args:
            x: tensor with token ids, shape (B,T,C)
        
        Returns:
            logits: tensor with shape (B,T,vocab_size)
        """

        B, T, C = x.shape
        outputs = []
        for t in range(T):
            # (B,C)
            cell_output, _ = self.rnn(x[:, t, :])
            outputs.append(cell_output)
        
        # (B,T,C)
        outputs = self.dropout(torch.stack(outputs, dim=1))

        # (B,T,vocab_size)
        return self.lm_head(outputs)

In [None]:
import torch

def collate_sequences(batch: list[torch.IntTensor]) -> tuple[torch.IntTensor, torch.IntTensor]:
    """
    Collate function for language modeling.

    Args:
        batch: List of tensors, where each tensor is a sequence of token IDs.

    Returns:
        tuple: A tuple containing:
            - inputs: Batch of input sequences.
            - targets: Batch of target sequences.
    """
    # (B,T)
    batch_tensor = torch.stack(batch)

    # (B,T-1)
    inputs = batch_tensor[:, :-1]

    # (B,T-1)
    targets = batch_tensor[:, 1:]

    return inputs, targets

In [None]:
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, drop_last=True, num_workers=2, collate_fn=collate_sequences)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, drop_last=False, num_workers=2, collate_fn=collate_sequences)

In [None]:
import torch.nn.functional as F

# Просто функции обучения модели
# Единственное отличие от стандартной парадигмы - 
# при итерации по батчу не приходят пары (data, target),
# а словарь с соответствубщими полями

def evaluate(model, device, data_loader):
    """
    Возвращает качество и лосс модели на выборке
    """
    model.eval()
    loss = 0
    correct = 0
    length = 0
    
    with torch.no_grad():
        for batch_idx, batch in enumerate(data_loader):
            # Считываем данные из батча
            data, target = batch['series'].float().to(device), batch['target'].long().to(device)
            output = model(data)
            
            # Обновляем качество
            loss += F.nll_loss(output, target).item() 
            pred = output.argmax(dim=1) 
            correct += (pred == target).sum().item()
            length += batch['target'].shape[0]

    return loss / length, correct / length

def train(model, device, train_loader, test_loader, n_epoch, optimizer, scheduler, 
          max_norm=None, track_gradient=False):
    """
    Обучение и оценивание качества на тесте одновременно
    
    max_norm - параметр gradient clipping. Если None, то не метод не используется.
    track_gradient - Если true, то оцениваем градиент на каждом батче.
    """
    train_history, test_history = {'loss':[], 'acc':[]}, {'loss':[], 'acc':[]}
    
    if track_gradient:
        grad_history = np.zeros((n_epoch, len(list(model.parameters()))))
    
    for epoch in tqdm(range(1, n_epoch + 1)):
        n_objects = 0
        model.train()
        for batch_idx, batch in enumerate(train_loader):
            # Считываем данные из батча
            data, target = batch['series'].float().to(device), batch['target'].long().to(device)
            
            # Делаем шаг по батчу
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            
            # Gradient clipping
            if max_norm is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_norm)
                
            # Если нужно - запоминаем норму градиента (отдельно по каждому параметру)
            if track_gradient:
                n_objects +=  data.shape[0]
                for i, p in enumerate(model.parameters()):
                    param_norm = p.grad.data.detach().norm(2)
                    grad_history[epoch-1][i] += param_norm.item() ** 2

            optimizer.step()
            scheduler.step()
            
        # Добавляем информацию в наш логгер
        loss, acc = evaluate(model, device, train_loader)
        train_history['loss'].append(loss), train_history['acc'].append(acc)
        loss, acc = evaluate(model, device, test_loader)
        test_history['loss'].append(loss), test_history['acc'].append(acc)
        
        if track_gradient:
            for i, p in enumerate(model.parameters()):
                grad_history[epoch-1][i] = (grad_history[epoch-1][i] / n_objects) ** 0.5
       
    
    if track_gradient:
        return train_history, test_history, grad_history
    return train_history, test_history