# Setup

In [30]:
!apt-get install git-lfs
!git lfs install


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git-lfs is already the newest version (3.0.2-1ubuntu0.3).
0 upgraded, 0 newly installed, 0 to remove and 0 not upgraded.
Updated git hooks.
Git LFS initialized.


In [55]:
!ls

ATNLP_Assignment3_finetuning.ipynb  data  LoRA.ipynb  README.md  wandb
custom_tokenizer.json		    logs  models      results


In [54]:
# !rm -rf results
!rm -rf ATNLP-Assignment3  # Remove any existing folder
# !git clone https://github.com/janljubas/ATNLP-Assignment3.git


In [3]:
!pip install transformers
!pip install transformers datasets
!pip install pytorch-lightning wandb
!pip install --upgrade transformers torch



In [11]:
from transformers import T5Tokenizer
from transformers import T5ForConditionalGeneration, TrainingArguments, Trainer

model_name = "t5-base"  # Or "t5-small"
model = T5ForConditionalGeneration.from_pretrained(model_name)


# Dataset preparation

#### Tokenizer

In [57]:
from tokenizers import Tokenizer, models, pre_tokenizers, decoders, trainers

# Create a custom tokenizer
tokenizer = Tokenizer(models.BPE())  # Byte-Pair Encoding (BPE) model
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()  # Pre-tokenize by whitespace
tokenizer.decoder = decoders.BPEDecoder()

# Train the tokenizer on your data
trainer = trainers.BpeTrainer(vocab_size=5000, special_tokens=["<pad>", "<unk>", "<s>", "</s>"])
tokenizer.train(files=["/content/ATNLP-Assignment3/data/all_data.txt"], trainer=trainer)

# Save the tokenizer
tokenizer.save("/content/ATNLP-Assignment3/exp_2_custom_tokenizer.json")


from transformers import PreTrainedTokenizerFast

# tokenizer = T5Tokenizer.from_pretrained(model_name)
tokenizer = Tokenizer.from_file("/content/ATNLP-Assignment3/exp_2_custom_tokenizer.json")

# Wrap the custom tokenizer
custom_tokenizer = PreTrainedTokenizerFast(
    tokenizer_object=tokenizer,
    pad_token="<pad>",
    unk_token="<unk>",
    bos_token="<s>",
    eos_token="</s>",
)



#### Regular part

In [58]:
# STEP 1: LOADING DATA AND CREATING ALL 3 SPLITS


# Load the SCAN dataset
def load_scan_data(file_path):
    inputs, outputs = [], []
    with open(file_path, 'r') as f:
        for line in f:
            if line.startswith("IN:"):
                # Split the line into input and output parts
                parts = line.strip().split(" OUT: ")
                if len(parts) == 2:
                    inputs.append(parts[0].replace("IN: ", ""))  # Remove "IN: "
                    outputs.append(parts[1])  # Keep the output as is
    return inputs, outputs




# 1.1. loading the training data
train_inputs, train_outputs = load_scan_data("/content/ATNLP-Assignment3/data/Experiment-2/tasks_train_length.txt")
# print(len(train_inputs), len(train_outputs))

# 1.2. creating the Validation set by splitting the train set 90-10
from sklearn.model_selection import train_test_split

train_inputs, val_inputs, train_outputs, val_outputs = train_test_split(
    train_inputs, train_outputs, test_size=0.02, random_state=42  # so that 0.64 of original train data is now what's left
)
# print(len(train_inputs), len(train_outputs))
# print(len(val_inputs), len(val_outputs))

# 1.3. loading the test data
test_inputs, test_outputs = load_scan_data("/content/ATNLP-Assignment3/data/Experiment-2/tasks_test_length.txt")
# print(len(test_inputs), len(test_outputs))

In [59]:
# STEP 2: PREPROCESSING THE DATA
from datasets import Dataset

# tokenize and format the data
def preprocess_scan_data(inputs, outputs, tokenizer, max_length=48):
    formatted_inputs = [f"translate English to Action: {input_text}" for input_text in inputs]
    model_inputs = tokenizer(formatted_inputs, max_length=max_length, truncation=True, padding="max_length")
    labels = tokenizer(outputs, max_length=max_length, truncation=True, padding="max_length").input_ids
    model_inputs["labels"] = labels
    return model_inputs

# preprocess training, validation, and test data
train_data = preprocess_scan_data(train_inputs, train_outputs, custom_tokenizer)
val_data = preprocess_scan_data(val_inputs, val_outputs, custom_tokenizer)
test_data = preprocess_scan_data(test_inputs, test_outputs, custom_tokenizer)

# create Hugging Face Dataset objects
train_dataset = Dataset.from_dict(train_data)
val_dataset = Dataset.from_dict(val_data)
test_dataset = Dataset.from_dict(test_data)

In [60]:
from transformers import TrainerCallback
# Custom callback to rename checkpoints
class RenameCheckpointCallback(TrainerCallback):
    def on_save(self, args, state, control, **kwargs):
        import os
        checkpoint_dir = os.path.join(args.output_dir, f"checkpoint-{state.global_step}")
        new_checkpoint_dir = os.path.join(args.output_dir, f"experiment_1_checkpoint_{state.global_step}")
        os.rename(checkpoint_dir, new_checkpoint_dir)
        return control


training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=5e-4,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    save_total_limit=32,
    save_steps=10000,
    logging_dir="./logs",
    logging_steps=10,
)

# init the trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,  # Use validation set if available
    tokenizer=custom_tokenizer,
    callbacks=[RenameCheckpointCallback()]
)

!rm -rf results

trainer.train()

# model evaluation using the test set
test_results = trainer.evaluate(test_dataset)
print("Test Results:", test_results)


  trainer = Trainer(


Epoch,Training Loss,Validation Loss
1,0.0027,0.002787
2,0.0019,0.0006
3,0.0001,0.000115


Test Results: {'eval_loss': 0.08533097058534622, 'eval_runtime': 17.1776, 'eval_samples_per_second': 228.204, 'eval_steps_per_second': 28.526, 'epoch': 3.0}


In [61]:
# Save the model and tokenizer
model.save_pretrained("/content/ATNLP-Assignment3/results/experiment_2")
custom_tokenizer.save_pretrained("/content/ATNLP-Assignment3/exp2_custom_tokenizer")

('/content/ATNLP-Assignment3/exp2_custom_tokenizer/tokenizer_config.json',
 '/content/ATNLP-Assignment3/exp2_custom_tokenizer/special_tokens_map.json',
 '/content/ATNLP-Assignment3/exp2_custom_tokenizer/tokenizer.json')

# Evaluation

## First approach

In [67]:
# Post-processing
def add_spaces_to_output(output_text):
    """
    Adds spaces before each occurrence of "I_" (except the first one).
    """
    # Split the output text into parts based on "I_"
    parts = output_text.split("I_")

    # Reconstruct the text with spaces before "I_" (except the first part)
    reconstructed_text = parts[0]  # First part doesn't need a space
    for part in parts[1:]:
        reconstructed_text += " I_" + part

    return reconstructed_text

# Test the model on a custom input
def model_prediction(input_text):
    # Ensure model is on the correct device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    input_ids = custom_tokenizer(f"translate English to Action: {input_text}", return_tensors="pt").input_ids.to(device
                                                                                                                 )
    outputs = model.generate(input_ids)
    decoded_output = custom_tokenizer.decode(outputs[0], skip_special_tokens=True)
    decoded_output_with_spaces = add_spaces_to_output(decoded_output)
    return decoded_output_with_spaces

# Example test
test_input = "run around left twice after jump around left"
predicted_output = model_prediction(test_input)
print(f"Input: {test_input}")
print(f"Predicted Output: {predicted_output[1:]}")


Input: run around left twice after jump around left
Predicted Output: I_TURN_LEFT I_JUMP I_TURN_LEFT I_JUMP I_TURN_LEFT I_JUMP I_TURN_LEFT I_JUMP I_TURN_LEFT I_RUN I_TURN_LEFT I_RUN I_TURN_LEFT I_RUN I_TURN_LEFT I_RUN I_TURN_LEFT I_RUN I_TURN_LEFT I_RUN


In [66]:
# Evaluate the model on the test set
def evaluate_on_test_set(test_inputs, test_outputs, batch_size=32):
    """
    Evaluate the model on the test set using batch processing.
    """
    correct = 0
    total = len(test_outputs)

    # Split inputs and outputs into batches
    batches = [
        (test_inputs[i:i + batch_size], test_outputs[i:i + batch_size])
        for i in range(0, total, batch_size)
    ]

    for input_batch, output_batch in tqdm(batches):
        # Use model_prediction for each input in the batch
        predicted_outputs = [model_prediction(input_text) for input_text in input_batch]

        # Compare predicted outputs to expected outputs
        correct += sum(
            pred.strip() == exp.strip()
            for pred, exp in zip(predicted_outputs, output_batch)
        )

    accuracy = correct / total
    print(f"Accuracy on Test Set: {accuracy * 100:.2f}%")
    return accuracy

# Load test set
test_inputs, test_outputs = load_scan_data("/content/ATNLP-Assignment3/data/Experiment-2/tasks_test_length.txt")

# Evaluate
evaluate_on_test_set(test_inputs, test_outputs)

100%|██████████| 123/123 [24:30<00:00, 11.96s/it]

Accuracy on Test Set: 0.00%





0.0

In [18]:
test_input = "run thrice and jump left thrice"
predicted_output = model_prediction(test_input)
print(f"Input: {test_input}")
print(f"Predicted Output: {predicted_output}")

Input: run thrice and jump left thrice
Predicted Output:  I_RUN I_RUN I_RUN I_TURN_LEFT I_JUMP I_TURN_LEFT I_JUMP I_TURN_LEFT I_JUMP


## token-level and sequence-level accuracy

In [68]:
# Post-processing
def add_spaces_to_output(output_text):
    """
    Adds spaces before each occurrence of "I_" (except the first one).
    """
    # Split the output text into parts based on "I_"
    parts = output_text.split("I_")

    # Reconstruct the text with spaces before "I_" (except the first part)
    reconstructed_text = parts[0]  # First part doesn't need a space
    for part in parts[1:]:
        reconstructed_text += " I_" + part

    return reconstructed_text

# Generate predictions
def generate_predictions(input_text):
    input_ids = custom_tokenizer(f"translate English to Action: {input_text}", return_tensors="pt").input_ids
    outputs = model.generate(input_ids)
    decoded_output = custom_tokenizer.decode(outputs[0], skip_special_tokens=True)
    decoded_output_with_spaces = add_spaces_to_output(decoded_output)
    return decoded_output_with_spaces[1:]



In [69]:
import torch

tokenizer = custom_tokenizer


def token_lvl_accuracy(gt, pred):
    """
    gt = ground truth sequence (str)
    pred = predicted sequence (str)
    """
    # Tokenize the ground truth and predictions
    prediction_tokens = custom_tokenizer.encode(prediction, add_special_tokens=True)
    gt_tokens = custom_tokenizer.encode(ground_truths, add_special_tokens=True)
    print("Encoded Ground Truth:", gt_tokens)
    print("Encoded Prediction:", prediction_tokens)

    # Find the length of the shorter sequence
    min_length = min(len(gt_tokens), len(prediction_tokens))
    print(min_length)

    # Trim sequences to the same length for comparison
    gt_tokens = gt_tokens[:min_length]
    pred_tokens = prediction_tokens[:min_length]

    # Compare tokens and calculate accuracy
    correct = sum(g == p for g, p in zip(gt_tokens, prediction_tokens))
    accuracy = correct / len(gt_tokens)
    return accuracy





def sequence_level_accuracy(gt, pred):
    """
    gt = ground truth sequence (str)
    pred = predicted sequence (str)
    """
    # Check if the ground truth matches the prediction exactly
    return int(gt.strip() == pred.strip())


In [70]:
import torch
# !pip install tqdm
from tqdm import tqdm

# Load the fine-tuned model and tokenizer
model = T5ForConditionalGeneration.from_pretrained("/content/ATNLP-Assignment3/results/experiment_2")
# tokenizer = T5Tokenizer.from_pretrained("/content/drive/MyDrive/ATNLP/fine_tuned_t5_base_model/10_epochs_custom_tokenizer")
tokenizer = custom_tokenizer


In [74]:
from tqdm import tqdm

# Evaluate accuracy
def token_level_accuracy(prediction, ground_truth):
    matches = sum(1 for a, b in zip(prediction, ground_truth) if a == b)
    percentage = (matches / len(prediction)) * 100
    return percentage

matches = list()

# Load test set
test_inputs, test_outputs = load_scan_data("/content/ATNLP-Assignment3/data/Experiment-2/tasks_test_length.txt")

test_inputs_2 = test_inputs[:200]
test_outputs_2 = test_outputs[:200]

for i in tqdm(range(len(test_inputs_2)), desc="Processing"):
  prediction = generate_predictions(test_inputs_2[i])
  ground_truth = test_outputs_2[i]

  min_len = min(len(prediction), len(ground_truth))

  prediction = prediction[:min_len]
  ground_truth = ground_truth[:min_len]

  matches.append(token_level_accuracy(custom_tokenizer.encode(prediction, add_special_tokens=True), custom_tokenizer.encode(ground_truth, add_special_tokens=True)))

print(f"\nToken-level accuracy: {sum(matches) / len(matches)}")

Processing: 100%|██████████| 25/25 [00:14<00:00,  1.76it/s]


Token-level accuracy: 100.0





In [78]:
# Example test
test_input = "look around left twice after jump around right"
predicted_output = model_prediction(test_input)
print(f"Input: {test_input}")
print(f"Predicted Output: {predicted_output[1:]}")

Input: look around left twice after jump around right
Predicted Output: I_TURN_RIGHT I_JUMP I_TURN_RIGHT I_JUMP I_TURN_RIGHT I_JUMP I_TURN_RIGHT I_JUMP I_TURN_LEFT I_LOOK I_TURN_LEFT I_LOOK I_TURN_LEFT I_LOOK I_TURN_LEFT I_LOOK I_TURN_LEFT I_LOOK I_TURN_LEFT I_LOOK



# Experiment 1 stuff



In [None]:
# EXPERIMENT 1

def token_lvl_accuracy(gt, pred):
    """
    gt = ground truth sequence
    pred = predicted sequence
    """
    correct = 0

    # get start and end
    eos_idx = word2idx_tgt['<EOS>']
    sos_idx = word2idx_tgt['<SOS>']
    # print(eos_idx)
    # print(sos_idx)
    pred = pred[-1]


    gt = gt[-1]

    # index of <SOS> and <EOS> tokens of the predicted sequence
    pred_start = 0
    pred_end = len(pred) if (eos_idx not in pred) else (pred == eos_idx).nonzero(as_tuple=True)[0].item()

    # index of <SOS> and <EOS> tokens of the ground truth sequence
    gt_start = (gt == sos_idx).nonzero(as_tuple=True)[0].item()
    gt_end = (gt == eos_idx).nonzero(as_tuple=True)[0].item()

    # slicing
    gt = gt[gt_start+1 : gt_end]
    pred = pred[pred_start+1 : pred_end]

    longer = gt if len(gt) > len(pred) else pred
    shorter = pred if len(gt) > len(pred) else gt

    longest_len = len(longer)

    shorter = torch.nn.functional.pad(shorter, (0, longest_len - len(shorter)), "constant", 0)

    correct = sum(longer == shorter)
    # print(longer)
    # print(shorter)
    # print(correct)
    return int(correct) / len(shorter) # same length as longer


def sequence_level_accuracy(gt, pred):

    pred = pred[-1]
    gt = gt[-1]

    if len(gt) != len(pred):
        return 0

    if sum(gt == pred) == len(gt):
        return 1

    return 0

# Experiment 2 stuff

In [93]:
from tqdm import tqdm

# Define token-level accuracy function
def token_level_accuracy(prediction_tokens, ground_truth_tokens):
    """
    Compute the percentage of matching tokens between the prediction and ground truth.

    Args:
    prediction_tokens: List[int] - Tokenized prediction sequence.
    ground_truth_tokens: List[int] - Tokenized ground truth sequence.

    Returns:
    float - Accuracy as a percentage.
    """
    # Ensure both sequences are the same length for comparison
    min_length = min(len(prediction_tokens), len(ground_truth_tokens))
    max_length = max(len(prediction_tokens), len(ground_truth_tokens))
    prediction_tokens = prediction_tokens[:min_length]
    ground_truth_tokens = ground_truth_tokens[:min_length]

    # Calculate the number of matches
    matches = sum(1 for pred, gt in zip(prediction_tokens, ground_truth_tokens) if pred == gt)
    accuracy = (matches / min_length) * 100
    return accuracy

# Evaluate on test set
matches = []

# Load the test dataset
test_inputs, test_outputs = load_scan_data("/content/ATNLP-Assignment3/data/Experiment-2/tasks_test_length.txt")

# Use a smaller subset for testing (adjust as needed)
test_inputs_subset = test_inputs[:100]
test_outputs_subset = test_outputs[:100]

for i in tqdm(range(len(test_inputs_subset)), desc="Processing"):
    # Generate predictions for each input
    prediction = model_prediction(test_inputs_subset[i])
    ground_truth = test_outputs_subset[i]

    # Tokenize predictions and ground truth
    prediction_tokens = custom_tokenizer.encode(prediction, add_special_tokens=True)
    ground_truth_tokens = custom_tokenizer.encode(ground_truth, add_special_tokens=True)

    # Calculate token-level accuracy
    accuracy = token_level_accuracy(prediction_tokens, ground_truth_tokens)
    matches.append(accuracy)

# Compute the average token-level accuracy
average_accuracy = sum(matches) / len(matches)
print(f"\nAverage Token-Level Accuracy: {average_accuracy:.2f}%")


Processing: 100%|██████████| 100/100 [00:38<00:00,  2.59it/s]


Average Token-Level Accuracy: 100.00%





In [91]:
from tqdm import tqdm

# Define sequence-level accuracy function without trimming
def sequence_level_accuracy_no_trimming(prediction, ground_truth):
    """
    Compute sequence-level accuracy without trimming.
    If the prediction is shorter or longer than the ground truth, it is considered incorrect.

    Args:
    prediction: str - Predicted sequence.
    ground_truth: str - Ground truth sequence.

    Returns:
    int - 1 if sequences match exactly (including length), 0 otherwise.
    """
    return int(prediction.strip() == ground_truth.strip())

# Evaluate on test set
matches = []

# Load the test dataset
test_inputs, test_outputs = load_scan_data("/content/ATNLP-Assignment3/data/Experiment-2/tasks_test_length.txt")

# Use a smaller subset for testing (adjust as needed)
# test_inputs_subset = test_inputs[:200]
# test_outputs_subset = test_outputs[:200]

for i in tqdm(range(len(test_inputs_subset)), desc="Processing"):
    # Generate predictions for each input
    prediction = model_prediction(test_inputs_subset[i])
    ground_truth = test_outputs_subset[i]

    # Calculate sequence-level accuracy without trimming
    accuracy = sequence_level_accuracy_no_trimming(prediction, ground_truth)
    matches.append(accuracy)

# Compute the average sequence-level accuracy
average_accuracy = sum(matches) / len(matches)
print(f"\nAverage Sequence-Level Accuracy (No Trimming): {average_accuracy:.2f}")


Processing: 100%|██████████| 200/200 [01:16<00:00,  2.61it/s]


Average Sequence-Level Accuracy (No Trimming): 0.00



