In [1]:
#tutorials: https://pytorch.org/tutorials/beginner/torchtext_custom_dataset_tutorial.html
#           https://pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html
#           https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html
#           https://pytorch.org/data/beta/dp_tutorial.html
#           https://www.youtube.com/watch?v=EoGUlvhRYpk

!pip install torchtext
!pip install portalocker==2.8.2



In [2]:
import torch
import torch.nn.functional as F
import torch.optim as optim
import torchdata.datapipes as dp
import time
import pandas as pd
import random
from tokenizers import ByteLevelBPETokenizer
from transformers import ElectraForPreTraining, ElectraTokenizerFast
from torchtext.datasets import AG_NEWS
from torchtext.vocab import build_vocab_from_iterator
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset
from tqdm import tqdm
import time
import os
from typing import Union


tokenizerCZ = ElectraTokenizerFast.from_pretrained("Seznam/small-e-czech")
#tokenizerCZ.add_special_tokens({"eos": '[EOS]'})
#tokenizerCZ.add_special_tokens({"eos": '[SOS]'})

print(tokenizerCZ.special_tokens_map_extended, tokenizerCZ.vocab)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


{'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'} {'posled': 1065, '##cením': 11527, '##lava': 6754, 'všeobecně': 16714, 'spali': 19404, 'urč': 1789, 'tová': 9018, 'řeknu': 19105, 'kterému': 6650, '##ctví': 1263, 'besed': 10244, 'vouch': 28927, '##nicky': 8460, 'účasti': 8593, 'ea': 25715, '1986': 28398, 'mode': 1402, 'izra': 7979, 'nebaví': 22159, 'bílou': 13220, 'režiséra': 21575, 'rádi': 2753, 'prochá': 7630, '##vor': 5066, '##7': 164, 'expres': 11329, 'ochra': 1784, 'desít': 20008, 'nejisto': 17517, 'stylem': 18218, 'levandule': 27603, '##até': 8200, '##fikačních': 26581, 'portu': 9427, 'omáčka': 9357, '##dica': 14446, 'pozorovat': 17589, 'vary': 6775, 'zame': 11359, 'místopředsed': 18193, 'jazyk': 6496, 'osn': 16721, 'textil': 11191, 'slovem': 18068, 'spa': 5937, 'zadavatele': 13298, 'zrušil': 22369, 'zvuk': 7369, 'požadovat': 16800, 'včela': 15841, 'srdcem': 17944, 'Československu': 25102, 'filmový': 21646, 'raf': 294

In [3]:
# Tokenize both labels and text in datapipeline
from torch.nn.utils.rnn import pad_sequence

def split_dataset( force: bool, csv_path:str, target_dir:str, split: float, size: Union[None, int]) :

  train_path = target_dir + "/train.csv"
  validate_path = target_dir + "/validate.csv"
  #if not force break if files already exists
  if not force:
    if os.path.exists(train_path) and os.path.exists(validate_path):
        print("Stasak")
        return None

  with open(csv_path, "r") as file:
      lines = file.readlines()

  #adjust the size of dataset
  if size is not None:
    lines = lines[:size]

  #find split index
  split_idx = int(len(lines) * split)

  #split dataset to train and validate parts
  train_lines = lines[:split_idx]  # All lines except the last 1000
  validate_lines = lines[split_idx:]  # Last 1000 lines

  # remove files if exists
  if os.path.exists(train_path):
    os.remove(train_path)

  if os.path.exists(validate_path):
     os.remove(validate_path)

  # create files
  with open(train_path, "w") as train_file:
      train_file.writelines(train_lines)

  with open(validate_path, "w") as validate_file:
      validate_file.writelines(validate_lines)

def applyTokenizer(sequence_pair):
    if sequence_pair[1] != '':
      return (
      tokenizerCZ.encode(sequence_pair[0]),
      tokenizerCZ.encode(sequence_pair[1])
      )
    else:
      #duplicate correct text
      token = tokenizerCZ(sequence_pair[0])['input_ids']
      return (token,token)

def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]
    for pair in batch:
        _text, _label = applyTokenizer(pair)

        processed_label = torch.tensor(_label, dtype=torch.int64)
        label_list.append(processed_label)

        processed_text = torch.tensor(_text, dtype=torch.int64)
        text_list.append(processed_text)

        offsets.append(processed_text.size(0))

    text_list = torch.cat(text_list)
    label_list = torch.cat(label_list)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)

    return label_list.to(device), text_list.to(device)

def collate_batch_padded(batch):
    label_list, text_list, offsets = [], [], [0]
    max_sentence_length = max(len(sentence) for (sentence,_) in batch)

    for pair in batch:
        _text, _label = applyTokenizer(pair)


        processed_label = torch.tensor(_label, dtype=torch.int64)
        label_list.append(processed_label)

        processed_text = torch.tensor(_text, dtype=torch.int64)
        text_list.append(processed_text)

    text_list = pad_sequence(text_list)
    label_list = pad_sequence(label_list)

    return label_list.to(device), text_list.to(device)

def get_csv_length(file_path):
    with open(file_path, 'r') as file:
        num_lines = sum(1 for line in file)
    return num_lines

def csv_to_dataloader(file_path, batch_sizee):
    training_csv = file_path
    train_dp = dp.iter.FileOpener([training_csv])
    train_dp = train_dp.parse_csv(delimiter=';', as_tuple=True)
    nb_lines = get_csv_length(training_csv)
    dp.iter.LengthSetter(train_dp, nb_lines)

    return DataLoader(train_dp, batch_size=batch_sizee, shuffle=False, collate_fn=collate_batch_padded), nb_lines

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)




In [4]:
# CREATE DATASETS
split_dataset(force=True, csv_path="/content/dataset-0.5.csv", target_dir = "/content", split= 0.9, size=100000)
print(f'Original dataset lenght: {get_csv_length("/content/dataset-0.5.csv")}')
print(f'Train dataset lenght: {get_csv_length("/content/train.csv")}')
print(f'Validate dataset lenght: {get_csv_length("/content/validate.csv")}')


Original dataset lenght: 1044095
Train dataset lenght: 90000
Validate dataset lenght: 10000


In [5]:


class Encoder(nn.Module):
    def __init__(self, input_size, embeding_size, hidden_size, num_layers, p):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = nn.Dropout(p)
        self.embedding = nn.Embedding(input_size, embeding_size)
        self.rnn = nn.LSTM(embeding_size, hidden_size, num_layers, dropout=p)

    def forward(self, sentences_batch):
        embedding = self.dropout(self.embedding(sentences_batch))
        _,(hidden, cell) = self.rnn(embedding)

        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, input_size,output_size, embeding_size, hidden_size, num_layers, p):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.dropout = nn.Dropout(p)
        self.embedding = nn.Embedding(input_size, embeding_size)

        self.rnn = nn.LSTM(embeding_size, hidden_size, num_layers, dropout=p)

        self.fully_connected = nn.Linear(hidden_size, output_size)

    def forward(self, predictions, hidden, cell):
        # we want to predict one word at the time

        predictions = predictions.unsqueeze(0)

        embedding = self.dropout(self.embedding(predictions))

        output ,(hidden, cell) = self.rnn(embedding)

        predictions = self.fully_connected(output)

        predictions = predictions.squeeze(0)

        return predictions, hidden, cell

class Seq2seq(nn.Module):
    def __init__(self, encoder, decoder, vocab_size):
        super(Seq2seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.vocab_size = vocab_size

    def forward(self, sentence, target, force):
        batch_size = sentence.shape[1]
        target_len = target.shape[0]

        hidden, cell = self.encoder(sentence)


        outputs = torch.zeros(target_len, batch_size, self.vocab_size, dtype=torch.float16).to(device)
        x = target[0]

        for i in range(1, target_len):
            output, hidden, cell = self.decoder(x, hidden,cell)

            outputs[i] = output

            best_quess = output.argmax(1)

            x = target[i] if random.random() < force else best_quess


        return outputs


In [6]:


# INIT MODEL
learning_rate = 0.001
batch_sizee = 32
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

vocab_size = tokenizerCZ.vocab_size
hidden_size = 512
num_layers = 2
dropoutt = 0.5
embedding = 300


encoder = Encoder(vocab_size, embedding, hidden_size, num_layers, dropoutt)
decoder = Decoder(vocab_size,vocab_size, embedding, hidden_size, num_layers, dropoutt)
model = Seq2seq(encoder, decoder, vocab_size).to(device)

pad_idx = 0 #TODO
criterion = nn.CrossEntropyLoss(ignore_index = pad_idx)
optimizer = optim.Adam(model.parameters(), lr = learning_rate)

count_parameters(model)

# DATA
train_dataloader,nb_lines = csv_to_dataloader('/content/train.csv',batch_sizee)
valid_dataloader,_ = csv_to_dataloader('/content/validate.csv',batch_sizee)







In [None]:
#TRAIN
num_epochs = 4
model.train()

for epoch in range(num_epochs):
    print(f"Epoch: {epoch}")
    print(type(train_dataloader))
    for idx, batch in tqdm(enumerate(train_dataloader),total=int(nb_lines/batch_sizee)):

      optimizer.zero_grad()

      label = batch[1]
      output = model(batch[0],label , 0.5)

      output = output[1:].reshape(-1, output.shape[2])
      label = label[1:].reshape(-1)
      loss = criterion(output, label)


      loss.backward()

      torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

      optimizer.step()

Epoch: 0
<class 'torch.utils.data.dataloader.DataLoader'>


  6%|▌         | 166/2812 [01:24<22:27,  1.96it/s]

In [None]:
#VALIDATE
def correction(model, sentence,  device, max_length):
    tokens=tokenizerCZ(sentence[0])['input_ids']

    sentence_tensor = torch.LongTensor(tokens).unsqueeze(1).to(device)

    with torch.no_grad():
        hidden, cell = model.encoder(sentence_tensor)

    outputs = [2]

    for _ in range(max_length):
        previous_word = torch.LongTensor([outputs[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(previous_word, hidden, cell)
            best_guess = output.argmax(1).item()

        outputs.append(best_guess)

        # Model predicts it's the end of the sentence
        if output.argmax(1).item() == ['[SEP]']:
            break

    return tokenizerCZ.convert_ids_to_tokens(outputs)

sentence = " pod prstenem pokrytý rezavými až černohnědými šupinkami. Dužnina v klobouku je tenká hygrofánní"
max_length = 50
corrected = correction(model, sentence,  device, max_length)
print(corrected)