In [None]:
from dataclasses import dataclass
from typing import List, Tuple

import torch
from torch.utils.data import Dataset, DataLoader
from transformers import PreTrainedTokenizerFast, AutoTokenizer
from datasets import load_dataset
from transformers import RobertaTokenizer
import evaluate
from torch.optim import AdamW
from transformers import RobertaForSequenceClassification, Trainer, TrainingArguments

In [None]:
# Global variables
TEST_DATASET_LENGTH = 100
VAL_DATASET_LENGTH = TEST_DATASET_LENGTH + 100

USE_SMALL_DATASET = True
BATCH_SIZE = 12

In [None]:
# load dataset from datasets/strategyqa_train_filtered.json
# dataset = load_dataset("json", data_files={"train": "../datasets/strategyqa_train_filtered.json", "test": "../datasets/strategyqa_test.json"})
dataset = load_dataset("json", data_files={"train": "../datasets/strategyqa_train_filtered.json"})
print(dataset)
# initialize training, validation, and testing dataset
train_dataset = dataset['train'].select(indices=range(len(dataset['train']) - VAL_DATASET_LENGTH))
val_dataset = dataset['train'].select(indices=range(len(dataset['train']) - VAL_DATASET_LENGTH, len(dataset['train']) - TEST_DATASET_LENGTH))
test_dataset = dataset['train'].select(indices=range(len(dataset['train']) - TEST_DATASET_LENGTH, len(dataset['train'])))
if USE_SMALL_DATASET:
    train_dataset = train_dataset.select(range(100)) # we use the first 100 entries to test the code
    val_dataset = val_dataset.select(range(100)) # we use the first 100 entries to test the code
    test_dataset = test_dataset.select(range(100)) # we use the first 100 entries to test the code
print(len(train_dataset))
print(len(val_dataset))
print(len(test_dataset))
print(train_dataset[0])
print(val_dataset[0])
print(test_dataset[0])



In [None]:
# load tokenizer
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

# tokenize the dataset
def tokenize_function(batch, tokenizer=tokenizer, field_name="question"):
    return tokenizer(batch[field_name], padding="max_length", truncation=True)

def add_labels(tokenized_dataset):
    tokenized_dataset["labels"] = 1 if tokenized_dataset["answer"] else 0 # Assuming "answer" exists
    print(tokenized_dataset["labels"], tokenized_dataset["answer"], tokenized_dataset["question"])
    return tokenized_dataset
# load training dataset


# tokenize the datasets
tokenized_datasets = {}
tokenized_datasets["train"] = train_dataset.map(tokenize_function, batched=True).map(add_labels)
tokenized_datasets["val"] = val_dataset.map(tokenize_function, batched=True).map(add_labels)
tokenized_datasets["test"] = test_dataset.map(tokenize_function, batched=True).map(add_labels)
print(tokenized_datasets["train"][0]["labels"])
print(tokenized_datasets["train"].column_names)
# print(tokenized_datasets["train"][98]["labels"], tokenized_datasets["train"][98]["question"])
for i in range(TEST_DATASET_LENGTH):
    print(i, tokenized_datasets["val"][i]["labels"], tokenized_datasets["val"][i]["question"])


In [None]:
# %pip install evaluate

In [None]:
# check if GPU is available
# ! nvidia-smi
# ! nvcc --version
import torch
print(torch.__version__)
print(torch.version.cuda)
print(torch.cuda.is_available())
print(torch.cuda.device_count())
print(torch.cuda.get_device_name(0))
print(torch.cuda.current_device())

In [None]:
# ! pip install torch torchvision torchaudio accelerate>=0.26.0


In [None]:
def train_one_epoch(model, dataloader, optimizer, epoch):
    model.train()

    with tqdm(dataloader, desc=f"Train Ep {epoch}", total=len(dataloader)) as tq:
        for batch in tq:
            # TODO: retrieve the data from your batch and send it to the same device as your model (i.e., model.device).
            # Hint: model.device should point to 'cuda' as you set it as such in the main function below.
            #       However, please use `model.device` and don't hard code it to 'cuda' as the auto-grader will put the model on CPU.
            # text_encoding = {key: val.to(model.device) for key, val in batch.items() if key != "labels"}
            input_ids = batch["text_encoding"]["input_ids"].to(model.device)
            attention_mask = batch["text_encoding"]["attention_mask"].to(model.device)
            label_encoding = batch["label_encoding"].to(model.device)

            # TODO: Compute loss by running model with text_encoding and label_encoding.
            output = model(input_ids=input_ids, attention_mask=attention_mask, labels=label_encoding)
            loss = output.loss

            # TODO: compute gradients and update parameters using optimizer.
            # Hint: you need three lines of code here!
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            tq.set_postfix({"loss": loss.detach().item()}) # for printing better-looking progress bar

def compute_metrics(eval_pred):
    
    logits, labels = eval_pred
    predictions = logits.argmax(axis=-1)  # Convert logits to class labels
    print("labels")
    print(labels)
    # print("logits")
    # for logit in logits:
    #     print([f"{value:.2f}" for value in logit])
    print("predictions")
    print(predictions)
    print()
    for i in range(len(predictions)):
        print(f"Prediction: {predictions[i]} | Label: {labels[i]} | Sentence: {tokenized_datasets["test"][i]["question"]}")
    return {"accuracy": (predictions == labels).mean()}

In [None]:

# training

learning_rate = 5e-5
num_train_epochs = 5


model = RobertaForSequenceClassification.from_pretrained("roberta-base", num_labels=2)

model = model.cuda()


optimizer = AdamW(model.parameters(), lr=learning_rate)

training_args = TrainingArguments("results", 
                                num_train_epochs=num_train_epochs, 
                                per_device_train_batch_size=BATCH_SIZE, 
                                per_device_eval_batch_size=BATCH_SIZE, 
                                logging_dir= 'logs', 
                                logging_steps=10, 
                                evaluation_strategy= "epoch")

metric = evaluate.load("accuracy")

trainer = Trainer(model=model, 
                args=training_args, 
                train_dataset=tokenized_datasets["train"], 
                eval_dataset=tokenized_datasets["val"], 
                compute_metrics=compute_metrics)

In [None]:
# This cell clears GPU memory, do this when GPU out of memory

# from numba import cuda
import gc
gc.collect()
# torch.cuda.empty_cache()

Training part

In [None]:
# Train the model
trainer.train()

In [None]:
# if loading from a checkpoint, set load_model to True

load_model = False
if load_model:
    checkpoint_path = "./results/checkpoint-first"

    # Load model from a specific checkpoint
    model = RobertaForSequenceClassification.from_pretrained(checkpoint_path)

    trainer = Trainer(model=model, 
                    args=training_args, 
                    train_dataset=tokenized_datasets["train"], 
                    eval_dataset=tokenized_datasets["val"], 
                    compute_metrics=compute_metrics)

trainer.evaluate(tokenized_datasets["test"])



In [None]:
def predict_factually_correct(input_text):
    # Tokenize the input text
    inputs = tokenizer(input_text, return_tensors="pt", padding=True, truncation=True, max_length=512)

    # Move inputs to GPU if available
    inputs = {key: value.cuda() for key, value in inputs.items()} if torch.cuda.is_available() else inputs

    # Get model predictions
    with torch.no_grad():
        outputs = model(**inputs)

    # Get the logits and apply softmax to get probabilities
    logits = outputs.logits
    probabilities = torch.nn.functional.softmax(logits, dim=-1)

    # Get the predicted class (index of the highest probability)
    predicted_class = torch.argmax(probabilities, dim=-1).item()

    # Map predicted class to "correct" or "incorrect"
    if predicted_class == 1:
        return "Factually Correct"
    else:
        return "Factually Incorrect"


In [None]:
# we can now use the model to predict the factuality of a given sentence, go play with it!
user_input = "Was the KGB responsible for Lincoln's assassination?"
prediction = predict_factually_correct(user_input)
print(f"The sentence is: {prediction}")