In [None]:
!nvidia-smi

## Init

In [None]:
!pip install transformers

!pip install datasets

!pip install sacrebleu

In [None]:
!pip install torch==1.11.0

In [None]:
import random
random.seed(100)

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
# drive.mount("/content/gdrive", force_remount=True)

## Load Datasets

In [None]:
BASE_PATH = "/content/gdrive/MyDrive/experiments"
TRAIN_UNRESTRICTED_PATH = f"{BASE_PATH}/unrestricted_train_dataset-140.csv"
OOV_UNRESTRICTED_PATH = f"{BASE_PATH}/unrestricted_test_dataset.csv"

In [None]:
from datasets import load_dataset, Dataset, DatasetDict

unrestricted_train = load_dataset('csv', data_files=TRAIN_UNRESTRICTED_PATH)
unrestricted_test = load_dataset('csv', data_files=OOV_UNRESTRICTED_PATH)

dataset = DatasetDict({'train': unrestricted_train['train'],'test': unrestricted_test['train']})

dataset

## Preprocessing

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("t5-base")

In [None]:
source_lang = "en"
target_lang = "ltl"
prefix = "translate English to LTL: "


def preprocess_function(examples):
    inputs = [prefix + example.replace(',', ' ,') for example in examples[source_lang]]
    targets = [example for example in examples[target_lang]]
    model_inputs = tokenizer(inputs, max_length=256, truncation=True)

    with tokenizer.as_target_tokenizer():
        labels = tokenizer(targets, max_length=256, truncation=True)

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

In [None]:
tokenized_dataset = dataset.map(preprocess_function, batched=True)

In [None]:
item = tokenized_dataset['train']['ltl'][0]
decoded_item = tokenizer.decode(tokenized_dataset['train']['labels'][0])

reversed_vocab = {i: w for w, i in tokenizer.get_vocab().items()}

print(item)
print(decoded_item)
print(len(item.split(' ')), len(tokenized_dataset['train']['labels'][0]), len(decoded_item.split(' ')))
print(tokenized_dataset['train']['labels'][0])
print([reversed_vocab[i] for i in tokenized_dataset['train']['labels'][0]])

In [None]:
tokenized_dataset

## Training

In [None]:
from transformers import AutoModelForSeq2SeqLM, Seq2SeqTrainingArguments, Seq2SeqTrainer

model = AutoModelForSeq2SeqLM.from_pretrained("t5-base")

model.resize_token_embeddings(len(tokenizer))

In [None]:
print("For T5:")
print("Tokenizer vocab_size: {}".format(tokenizer.vocab_size))
print("Model vocab size: {}\n".format(model.config.vocab_size))

In [None]:
from transformers import DataCollatorForSeq2Seq

data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model)

In [None]:
RESULTS_DIR = "/content/gdrive/MyDrive/experiments/unrestricted_nosplit"

training_args = Seq2SeqTrainingArguments(
    output_dir=RESULTS_DIR,
    evaluation_strategy="epoch",
    learning_rate=0.0001,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    weight_decay=0.01,
    save_total_limit=1,
    num_train_epochs=1,
    adam_beta2=0.98,
    warmup_steps=4000,
    optim="adamw_torch",
    fp16=True,
    predict_with_generate=True,
    generation_max_length=256,
    save_strategy='epoch'
)

In [None]:
import numpy as np
from datasets import load_metric

metric = load_metric("sacrebleu")

def postprocess_text(preds, labels):
    preds = [pred.strip() for pred in preds]
    labels = [[label.strip()] for label in labels]
    return preds, labels
def compute_metrics(eval_preds):
    preds, labels = eval_preds
    if isinstance(preds, tuple):
        preds = preds[0]
    decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
    # Replace -100 in the labels as we can't decode them.
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    # Some simple post-processing
    decoded_preds, decoded_labels = postprocess_text(decoded_preds, decoded_labels)
    result = metric.compute(predictions=decoded_preds, references=decoded_labels)
    result = {"bleu": result["score"]}
    prediction_lens = [np.count_nonzero(pred != tokenizer.pad_token_id) for pred in preds]
    result["gen_len"] = np.mean(prediction_lens)
    result = {k: round(v, 4) for k, v in result.items()}
    return result

In [None]:
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

# trainer.train(f"{RESULTS_DIR}/checkpoint-8750/") # If u want to resume a checkpoint then use this
trainer.train()

## Evaluate

In [None]:

trainer.evaluate(tokenized_dataset["test"], metric_key_prefix='', max_length=256) # the trainer already perform the test, use this if u want to test smth else

## Manual Translation

In [None]:
from transformers import T5ForConditionalGeneration

prediction_model = AutoModelForSeq2SeqLM.from_pretrained("/content/gdrive/MyDrive/new_ltl_dataset_generator/first_dataset/checkpoint-6250/")

# model.to('cpu')

src_text = tokenized_dataset['test']['en'] # "translate English to LTL: " + tokenized_dataset['oov']['en'][0]
tokens = tokenizer(src_text, return_tensors="pt", padding=True)
# print(tokenizer([tokenized_dataset['oov']['en'][0]], return_tensors="pt", padding=True))
print(src_text)
print(tokens)

# tok_src = {k: v.to('cuda') for k, v in tokenizer(tokenized_dataset['oov']['en'], return_tensors="pt", padding=True).items()}

# translated = [prediction_model.generate(**tokenizer([t], return_tensors="pt", padding=True), max_length=500) for t in tokenized_dataset['oov']['en']]
translated = prediction_model.generate(**tokens, max_length=256)

# [tokenizer.decode(t, skip_special_tokens=True) for t in translated.tolist()]
# translated.tolist()

In [None]:
print(type(translated))

outs_srcs = [tokenizer.decode(t, skip_special_tokens=True) for t in tokenized_dataset['test']['input_ids']]
# outs_lbls = [tokenizer.decode(t, skip_special_tokens=True) for t in translated.predictions]
outs_lbls = [tokenizer.decode(t, skip_special_tokens=True) for t in translated.tolist()]

outs_srcs[:2], outs_lbls[:2]

In [None]:
print(tokenized_dataset['test']['ltl'][:10])
print(outs_lbls[:10])

print(type(tokenized_dataset['test']['ltl']))
print(type(outs_lbls))

metric.compute(references=[[t] for t in tokenized_dataset['test']['ltl'][:100]], predictions=outs_lbls)