In [5]:
!pip install datasets
!pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.2-py3-none-any.whl.metadata (9.3 kB)
Downloading evaluate-0.4.2-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: evaluate
Successfully installed evaluate-0.4.2


In [6]:
import os.path

import pandas as pd
import random
# !pip install datasets
from datasets import Dataset, DatasetDict
# import torch


def random_replace(string, default_prob):
    replacements = {
        'א': [('ע', default_prob), ('ה', default_prob)],
        'ע': [('א', default_prob), ('ה', default_prob)],
        'ה': [('א', default_prob), ('ע', default_prob)],

        'ט': [('ת', default_prob)],
        'ת': [('ט', default_prob)],

        'ח': [('כ', default_prob)],
        'כ': [('ח', default_prob), ('ק', default_prob)],
        'ק': [('כ', default_prob)],

        'ש': [('ס', default_prob / 2)],
        'ס': [('ש', default_prob / 2)],

        'ב': [('ו', default_prob / 4)],
        'ו': [('ב', default_prob / 4)],

        'לא ': ('לו ', default_prob),
        'לו ': [('לא ', default_prob)]
    }

    # Convert string to list to make replacements
    string_list = list(string)
    for idx, char in enumerate(string_list):
        if char in replacements:
            for replacement, prob in replacements[char]:
                if random.random() < prob:  # Unique probability for each replacement
                    string_list[idx] = replacement
                    break  # Stop after the first replacement
    return ''.join(string_list)


def create_augmentations(percentage=30, verbose=False):
    default_prob = float(percentage) / 100
    input_txt_path = 'datasets/hebrew_text.txt'
    output_path = 'datasets/hebrew_text_aug_' + str(percentage)

    # Read the input TXT file
    with open(input_txt_path, 'r', encoding='utf-8') as infile:
        lines = infile.readlines()

    # Process each line
    processed_lines = []
    for line in lines:
        line = line.strip()
        modified_line = random_replace(line, default_prob)
        processed_lines.append(f"{line}\t{modified_line}")

    if verbose:
        print(f'-----------> Example:\n\n')
        print(processed_lines[1])
        print(f'<-----------= Example:\n\n')

    # Save data in txt format - uncomment to activate
    # # Write the original and modified text to the output TXT file
    # output_txt_path = output_path + '.txt'
    # with open(output_txt_path, 'w', encoding='utf-8') as outfile:
    #     outfile.write('\n'.join(processed_lines))
    #
    # print(f"Modified data saved to {output_txt_path}")

    print(f'\nExporting the data to Excel file')

    processed_lines = processed_lines[1:]
    data = [line.strip().split('\t') for line in processed_lines]
    df = pd.DataFrame(data, columns=['original', 'errors'])  # Adjust column names as needed
    excel_output_path = output_path + '.xlsx'
    df.to_excel(excel_output_path, index=False, engine='openpyxl')

    print(f"Conversion complete. Check {excel_output_path}")
    return excel_output_path


def export_dataset(excel_path):
    df = pd.read_excel(excel_path)
    df.dropna(subset=['errors', 'original'], inplace=True)
    texts_with_errors = df['errors'].tolist()
    texts_corrected = df['original'].tolist()

    data_dict = {
        'errors': texts_with_errors,
        'original': texts_corrected
    }

    # dataset = ds.Dataset.from_dict(data_dict)
    dataset = Dataset.from_dict(data_dict)

    return dataset


def export_train_test_dataset(excel_path, test_size=0.2):
    if (not os.path.exists('datasets/train.pt')) and (not os.path.exists('datasets/test.pt')):
        dataset = export_dataset(excel_path)
        # Split the dataset into training and testing sets
        train_test_split = dataset.train_test_split(test_size=test_size)
        torch.save(train_test_split['train'], 'datasets/train.pt')
        torch.save(train_test_split['test'], 'datasets/test.pt')

        return train_test_split['train'], train_test_split['test']
    else:
        train_split = torch.load('datasets/train.pt')
        test_split = torch.load('datasets/test.pt')
        return train_split, test_split



def full_run(percentage=30, verbose=False):
    return export_dataset(create_augmentations(percentage, verbose))


def full_run_train_test_split(percentage=30, verbose=True):
    return export_train_test_dataset(create_augmentations(percentage, verbose))



In [7]:
import pandas as pd
from datasets import Dataset
# import tensorflow as tf
# from create_augmentations import *
from transformers import BertTokenizer, BatchEncoding, T5Tokenizer, T5ForConditionalGeneration
from datasets import load_from_disk
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from transformers import BertForSequenceClassification
from sklearn.preprocessing import LabelEncoder

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

X_NAME = 'errors'  # Todo: change names
Y_NAME = 'original'

# ---------- HYPERPARAMETERS -----------
# -------------------------------------->
max_length = 128
# <--------------------------------------


# --------- HELPER FUNCTIONS -----------
# -------------------------------------->
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask']}
        # Ensure labels are correctly indexed
        if isinstance(self.labels, BatchEncoding):
            item['labels'] = self.labels['input_ids'][idx]  # Adjust according to how labels are stored
        else:
            item['labels'] = torch.tensor(self.labels[idx])

        return item

    def __len__(self):
        # return len(self.labels)
        return len(self.encodings['input_ids'])


class Seq2SeqDataset(Dataset):
    def __init__(self, inputs, targets, tokenizer, max_length=128):
        self.inputs = inputs
        self.targets = targets
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        print(f"Index: {idx}, Type: {type(idx)}")
        if isinstance(idx, list):
            raise ValueError("Index must be an integer, not a list")

        input_text = self.inputs[idx]
        target_text = self.targets[idx]

        input_encoding = self.tokenizer(
            input_text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        target_encoding = self.tokenizer(
            target_text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': input_encoding['input_ids'].squeeze(),
            'attention_mask': input_encoding['attention_mask'].squeeze(),
            'labels': target_encoding['input_ids'].squeeze()
        }

# <--------------------------------------


def get_model():
    # model = BertForSequenceClassification.from_pretrained('bert-base-multilingual-cased')
    # tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
    model = T5ForConditionalGeneration.from_pretrained('t5-small')
    tokenizer = T5Tokenizer.from_pretrained('t5-small')

    # --------- FREEZING LAYERS ------------
    # -------------------------------------->
    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.0."):
            param.requires_grad = True

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.1."):
            param.requires_grad = True

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.2."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.3."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.4."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.5."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.6."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.7."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.8."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.9."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.10."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.11."):
            param.requires_grad = False

    for name, param in model.named_parameters():
        if name.startswith("bert.encoder.layer.12."):
            param.requires_grad = False

    # <--------------------------------------

    # ----------SEEING THE MODEL------------
    # -------------------------------------->
    print('Printing the layers of the model')
    for name, param in model.named_parameters():
        print(name, param.requires_grad)
    # <--------------------------------------
    return model, tokenizer


def prepare_data(tokenizer):
    # -------------- DATASET ---------------
    # -------------------------------------->
    dataset_train, dataset_test = full_run_train_test_split(verbose=False)
    dataset_train.set_format('pytorch')
    dataset_test.set_format('pytorch')
    train_inputs = dataset_train[X_NAME]
    train_labels = dataset_train[Y_NAME]
    test_inputs = dataset_test[X_NAME]
    test_labels = dataset_test[Y_NAME]

    train_input_tokenized = tokenizer(train_inputs, truncation=True, padding=True, max_length=max_length, return_tensors='pt')
    train_labels_tokenized = tokenizer(train_labels, truncation=True, padding=True, max_length=max_length, return_tensors='pt').input_ids
    test_input_tokenized = tokenizer(test_inputs, truncation=True, padding=True, max_length=max_length, return_tensors='pt')
    test_labels_tokenized = tokenizer(test_labels, truncation=True, padding=True, max_length=max_length, return_tensors='pt').input_ids

    text_tensor_train_ds = TextDataset(train_input_tokenized, train_labels_tokenized)
    text_tensor_test_ds = TextDataset(test_input_tokenized, test_labels_tokenized)

    # text_tensor_train_ds = Seq2SeqDataset(train_input_tokenized, train_labels_tokenized, tokenizer, max_length=128)
    # text_tensor_test_ds = Seq2SeqDataset(test_input_tokenized, test_labels_tokenized, tokenizer, max_length=128)

    # Save the datasets to disk
    torch.save(text_tensor_train_ds, 'datasets/tokenized/text_tensor_train_ds.pt')
    torch.save(text_tensor_test_ds, 'datasets/tokenized/text_tensor_test_ds.pt')


def get_model_and_data(path_to_data='datasets/tokenized'):
    model, tokenizer = get_model()
    # prepare_data(tokenizer)  # todo: remove this line

    if not os.path.exists(path_to_data):
        os.makedirs('datasets/tokenized', exist_ok=True)
        prepare_data(tokenizer)

    return model, tokenizer




In [None]:
# import pandas as pd
# from datasets import Dataset
# from create_augmentations import *
# from datasets import load_from_disk
# import os
# import torch.nn as nn
# from torch.utils.data import DataLoader, TensorDataset
# from sklearn.model_selection import train_test_split
# from sklearn.preprocessing import StandardScaler
# from transformers import BertForSequenceClassification
# from sklearn.preprocessing import LabelEncoder
# from transformers import BertTokenizer, TrainingArguments, Trainer, BatchEncoding, TrainerCallback
# from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
# import torch
# from transformer_prepare_data import *
from torch.optim import AdamW
from transformers import get_scheduler
from tqdm.auto import tqdm
import evaluate

# ---------- HYPERPARAMETERS -----------
# -------------------------------------->
BATCH_SIZE = 16
num_epochs = 3
# <--------------------------------------


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

# Load the saved datasets
model, tokenizer = get_model_and_data()
text_tensor_train_ds = torch.load('datasets/tokenized/text_tensor_train_ds.pt')
text_tensor_test_ds = torch.load('datasets/tokenized/text_tensor_test_ds.pt')


def collate_fn(batch):
    input_ids = torch.stack([item['input_ids'] for item in batch])
    attention_mask = torch.stack([item['attention_mask'] for item in batch])
    labels = torch.stack([item['labels'] for item in batch])
    return {'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': labels}


train_dataloader = DataLoader(text_tensor_train_ds, shuffle=True, batch_size=BATCH_SIZE)
test_dataloader = DataLoader(text_tensor_test_ds, batch_size=BATCH_SIZE)

optimizer = AdamW(model.parameters(), lr=5e-5)

num_epochs = 3
num_training_steps = num_epochs * len(train_dataloader)
lr_scheduler = get_scheduler(
    name="linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)

model.to(device)

progress_bar = tqdm(range(num_training_steps))

model.train()
for epoch in range(num_epochs):
    for batch in train_dataloader:
        batch = {key: value.to(device) for key, value in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss
        loss.backward()

        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        progress_bar.update(1)



metric = evaluate.load("accuracy")
model.eval()
for batch in test_dataloader:
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        outputs = model(**batch)

    logits = outputs.logits
    predictions = torch.argmax(logits, dim=-1)
    metric.add_batch(predictions=predictions, references=batch["labels"])

metric.compute()



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/2.32k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Printing the layers of the model
shared.weight True
encoder.block.0.layer.0.SelfAttention.q.weight True
encoder.block.0.layer.0.SelfAttention.k.weight True
encoder.block.0.layer.0.SelfAttention.v.weight True
encoder.block.0.layer.0.SelfAttention.o.weight True
encoder.block.0.layer.0.SelfAttention.relative_attention_bias.weight True
encoder.block.0.layer.0.layer_norm.weight True
encoder.block.0.layer.1.DenseReluDense.wi.weight True
encoder.block.0.layer.1.DenseReluDense.wo.weight True
encoder.block.0.layer.1.layer_norm.weight True
encoder.block.1.layer.0.SelfAttention.q.weight True
encoder.block.1.layer.0.SelfAttention.k.weight True
encoder.block.1.layer.0.SelfAttention.v.weight True
encoder.block.1.layer.0.SelfAttention.o.weight True
encoder.block.1.layer.0.layer_norm.weight True
encoder.block.1.layer.1.DenseReluDense.wi.weight True
encoder.block.1.layer.1.DenseReluDense.wo.weight True
encoder.block.1.layer.1.layer_norm.weight True
encoder.block.2.layer.0.SelfAttention.q.weight True
en

  0%|          | 0/2184 [00:00<?, ?it/s]

  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items() if key in ['input_ids', 'attention_mask']}
  item['labels'] = torch.tensor(self.labels[idx])
