In [1]:
import random
import re
import os.path

import numpy as np
import torch
import torchtext
from torch import nn
from torch.optim import Adam
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
import torch.nn.functional as F
from pytorch_lightning import LightningModule, LightningDataModule, Trainer
from torchtext.vocab import build_vocab_from_iterator
from lightning.pytorch.loggers import TensorBoardLogger
from torchinfo import summary
from gensim.models.word2vec import LineSentence, Word2Vec
from tqdm import tqdm

In [2]:
from torchtext.vocab import build_vocab_from_iterator

text_data = ["Hello, how are you?", "I am doing well.", "Hello, how are you?"]

ls = LineSentence('../../data/line_sentence/demo.txt')
vocab = build_vocab_from_iterator(tqdm(ls), max_tokens=100_000)

746it [00:00, 854.50it/s]


In [62]:
vocab(['ma',])

[55]

In [18]:
len(vocab)

100000

In [8]:
vocab.get_stoi()

{'I': 4, 'you?': 3, 'how': 2, 'are': 1, 'Hello,': 0}

In [9]:
vocab.get_itos()

['Hello,', 'are', 'how', 'you?', 'I']

In [15]:
def tokenizer(text):
    return text.split()  # Split text into a list of tokens

# Initialize the Field object
text_field = torchtext.data.Field(tokenize=tokenizer, lower=True)

# Process your text data
text_data = ["Hello, how are you?", "I am doing well."]
processed_data = [torchtext.data.Example.fromlist([text], [('text', text_field)]) for text in text_data]

# # Build the vocabulary
print(processed_data)
text_field.build_vocab(processed_data)

# # Convert text to numerical indices
# numerical_data = torchtext.data.Batch(processed_data, [('text', text_field)])

# # Access the tokenized text
# tokenized_text = numerical_data.text

# print(tokenized_text)

AttributeError: module 'torchtext.data' has no attribute 'Field'

In [3]:
class TextTrainDataset(IterableDataset):
    
    def __init__(self, text_file_path, vocabulary, seq_length=10):
        self.text_file_path = text_file_path
        self.seq_length = seq_length
        self.vocabulary = vocabulary
        self.__len = self.__count_lines_in_file()
        
    def __len__(self):
        return self.__len
        
    def __iter__(self):
        for text in LineSentence(self.text_file_path):
            if len(text) < 2: continue
            start_idx = random.randint(-self.seq_length+1, len(text)-self.seq_length-1)
            cropped_text = text[max(start_idx, 0) : start_idx+self.seq_length]
            cropped_text = self.__padd(cropped_text)
            target = text[start_idx+self.seq_length]
            yield cropped_text, target
            
    def __padd(self, text):
        if len(text) < self.seq_length:
            padding = ['<pad>']*(self.seq_length-len(text))
            text = padding + text
        return text
            
    def __count_lines_in_file(self):
        with open(self.text_file_path) as f:
            return sum(1 for _ in f)

In [4]:
class TextValidationDataset(IterableDataset):
    
    def __init__(self, text_file_path):
        self.text_file_path = text_file_path
        
    def __iter__(self):
        for text in LineSentence(self.text_file_path):
            yield ' '.join(text)

In [5]:
class LstmTextGenerator(LightningModule):
    
    def __init__(self,
                 # files
                 train_file_path,
                 val_file_path,
                 
                 # training process
                 seq_length=10, 
                 batch_size=64,
                 
                 # architecture
                 vocab_size=100_000,
                 embedding_dim=100,
                 lstm_layers=1,
                 lstm_dropout=0,
                 lstm_hidden_size=100,
                 dropout=0.2,
                 bidirectional=False
    ):
        super().__init__()
        self.save_hyperparameters()
        
        self.vocabulary = self.__get_vocabulary()
        
        self.embedding = nn.Embedding(
            len(self.vocabulary),
            self.hparams.embedding_dim
        )
        
        self.lstm = nn.LSTM(
            input_size=100,
            hidden_size=self.hparams.lstm_hidden_size,
            batch_first=True,
            num_layers=self.hparams.lstm_layers,
            dropout=self.hparams.lstm_dropout,
            bidirectional=self.hparams.bidirectional
        )
        
        self.fc = nn.Linear((2 if self.hparams.bidirectional else 1)*self.hparams.lstm_hidden_size, len(self.vocabulary))
        
        self.dropout = nn.Dropout(self.hparams.dropout)
        
        self.loss = nn.CrossEntropyLoss()
        
    def __get_vocabulary(self):
        ls = LineSentence(self.hparams.train_file_path)
        return build_vocab_from_iterator(ls, max_tokens=self.hparams.vocab_size, specials=['<PAD>'])
        
    def generate(self, prompt, length=50, temperature=0.5):
        generated = prompt
        prompt = self.__preprocess_prompt(prompt)
        
        for _ in range(length):
            embedded_prompt = self.vocabulary(prompt)
            embedded_prompt = torch.tensor(embedded_prompt, device=self.device)
            next_word_logits = self(torch.unsqueeze(embedded_prompt, dim=0))[0]
            word = self.__get_word_from_logits(next_word_logits, temperature)
            prompt = prompt[1:] + [word]
            
            if word not in list('.!?,'):
                generated += ' '
            generated += word
        
        return generated
    
    def __get_word_from_logits(self, next_word_logits, temperature=0.5):
        scaled_logits = next_word_logits / temperature
        adjusted_probs = F.softmax(scaled_logits, dim=-1)
        next_word_index = torch.multinomial(adjusted_probs, num_samples=1).item()
        next_word = self.vocabulary.get_itos()[next_word_index]
        return next_word
        
    def forward(self, x):
        out = self.embedding(x)
        out, _ = self.lstm(out)
        out = self.dropout(out)
        out = self.fc(out[:, -1, :])
        return out
        
    def training_step(self, batch, batch_no):
        text, target = batch
        text = self.vocabulary(text)
        target = self.vocabulary[target]
        predicted = self.forward(text)
        loss = self.loss(predicted, target)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_no):
        prompt = batch[0]
        tensorboard = self.logger.experiment
        for temperature in [1, 0.5, 0.2, 0.1, 0.01]:
            generated = self.generate(prompt, length=100, temperature=temperature)
            tensorboard.add_text(f'val_generated_{temperature}_{batch_no}', generated, global_step=self.current_epoch)
        
    def configure_optimizers(self):
        optimizer = Adam(self.parameters(), lr=0.001)
        return optimizer
    
    def train_dataloader(self):
        dataset = TextTrainDataset(
            self.hparams.train_file_path,
            self.vocabulary.get_stoi(),
            self.hparams.seq_length,
        )
        
        return DataLoader(
            dataset=dataset,
            batch_size=self.hparams.batch_size,
            # num_workers=24
        )
        
    # def val_dataloader(self):
    #     dataset = TextValidationDataset(
    #         self.hparams.val_file_path,
    #     )
        
    #     return DataLoader(
    #         dataset=dataset,
    #         batch_size=1
    #     )
        
    def __preprocess_prompt(self, prompt):
        prompt = prompt.lower().strip()
        prompt = re.sub(r'[^a-ząćęłńóśźż.,!? ]', '', prompt)
        prompt = prompt.replace('.', ' . ').replace('!', ' ! ').replace('?', ' ? ').replace(',', ' , ')
        prompt = prompt.split()
        prompt = [word for word in prompt if word in self.vocabulary]
        padding = ['<pad>']*(max(self.hparams.seq_length-len(prompt), 0))
        prompt = padding + prompt
        return prompt

In [6]:
logger = TensorBoardLogger(
    save_dir='../..',
    name='logs'
)

trainer = Trainer(
    accelerator='cuda',
    max_epochs=-1,
    enable_progress_bar=True,
    logger = logger,
    check_val_every_n_epoch=10,
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [7]:
generator = LstmTextGenerator(
    train_file_path='../../data/line_sentence/demo.txt',
    val_file_path='../../data/line_sentence/texts_validation.txt',
    seq_length=25,
    lstm_layers=3,
    lstm_dropout=0.2,
    lstm_hidden_size=100,
    dropout=0.2,
    bidirectional=True,
    batch_size=128,
)

In [None]:
summary(
    generator,
    input_size=(64, 20),
    col_names=['input_size', 'output_size', 'num_params', 'params_percent']
)

In [8]:
trainer.fit(generator)

  rank_zero_warn(
You are using a CUDA device ('NVIDIA GeForce RTX 3060') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name       | Type             | Params
------------------------------------------------
0 | vocabulary | Vocab            | 0     
1 | embedding  | Embedding        | 10.0 M
2 | lstm       | LSTM             | 644 K 
3 | fc         | Linear           | 20.1 M
4 | dropout    | Dropout          | 0     
5 | loss       | CrossEntropyLoss | 0     
------------------------------------------------
30.7 M    Trainable params
0         Non-trainable params
30.7 M    Total params
122.979   Total estimated model params size (MB)
  rank_zero_warn(
  rank_zero_warn(


Epoch 0:   0%|          | 0/1 [00:00<?, ?it/s] 

: 

: 

In [15]:
generator.generate('dawno, dawno temu, za siedmioma górami i siedmioma lasami', temperature=1)

'dawno, dawno temu, za siedmioma górami i siedmioma lasami jednego dłonie życie. stanie mi więc? chyba kiedy pozostać odparł, sir i się od wszystkich czasu tę całą łaskę krew. ślad jak wobec mnie cię dostał. leży od. gdzie nie pozostanie obecny przy stanie, lecz i pies panu nie o zachód, ale'

In [12]:
generator.generate('dawno, dawno temu, za siedmioma górami i siedmioma lasami', temperature=0.3)

'dawno, dawno temu, za siedmioma górami i siedmioma lasami. a więc z tego będę, a nie jestem spotkamy. bądź zdrów, gdyż nie wiadomo, kim nie się. a więc nie wiadomo, kim nie powinienem, aby nie. nie mogę mi sobie, gdyż nie wiadomo o twoją. nie ma zamiaru mi'