In [None]:
import numpy as np
import pandas as pd

from transformers import AutoTokenizer
from transformers import AutoModelForCausalLM

from tqdm import tqdm
import numpy as np
import random

from datasets import Dataset, load_dataset
from typing import cast

In [None]:
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.1-8B-Instruct")
PAD_TOKEN = "<|pad|>"
tokenizer.add_special_tokens({"pad_token": PAD_TOKEN})
tokenizer.padding_side = "right"

In [None]:
## Can easily be adapted for base model
model = AutoModelForCausalLM.from_pretrained("../02_LoRA/finetuned_model", device_map="cuda")
model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of=8)

In [None]:
ANSWER_LABELS = ["A", "B", "C", "D"]
PROMPT_PREFIX = "Please choose the correct answer from among the following options: \n"
PROMPT_SUFFIX = "Answer by only stating the correct letter. That is, answer only by stating 'A', 'B', 'C', or 'D'. The correct answer is:"

def generate_question_and_answers(example) -> dict:
    # Randomly shuffle the order of the choices every time we generate an exaple
    choice_indices = [1, 2, 3, 4]
    choice_order = random.sample(choice_indices, len(choice_indices))
    ans_idx = choice_order.index(4)

    ordered_choices = [
        example[f"Incorrect Answer {i}"] if i != 4 else example["Correct Answer"]
        for i in choice_order
    ]
    ordered_choices = [
        f"({ANSWER_LABELS[i]}) {choice}" for i, choice in enumerate(ordered_choices)
    ]

    context = PROMPT_PREFIX + "\n".join(ordered_choices)
    question = PROMPT_SUFFIX
    answer = ANSWER_LABELS[ans_idx]

    return {
        "context": context,
        "question": question,
        "answer": answer,
        "answer_start": context.index(answer),
        "answer_end": context.index(answer) + len(answer),
    }


def add_position_data(examples: dict) -> dict:
    def find_labels(offsets, ans_start, ans_end, sequence_ids):
        idx = 0
        while sequence_ids[idx] != 1:
            idx += 1
        context_start = idx
        while sequence_ids[idx] == 1:
            idx += 1
        context_end = idx - 1

        if offsets[context_start][0] > ans_end or offsets[context_end][1] < ans_start:
            return (0, 0)
        else:
            idx = context_start
            while idx <= context_end and offsets[idx][0] <= ans_start:
                idx += 1
            start_position = idx - 1

            idx = context_end
            while idx >= context_start and offsets[idx][1] >= ans_end:
                idx -= 1
            end_position = idx + 1

            return (start_position, end_position)

    positions = {"start_positions": [], "end_positions": []}
    offset_mapping = examples.pop("offset_mapping")

    for i, offset in enumerate(offset_mapping):
        start, end = find_labels(
            offset,
            examples["answer_start"][i],
            examples["answer_end"][i],
            examples["sequence_ids"][i],
        )
        positions["start_positions"].append(start)
        positions["end_positions"].append(end)

    return positions


def tokenize_data(examples: dict, tokenizer) -> dict:
    inputs = tokenizer(
        examples["question"],
        examples["context"],
        truncation=True,
        padding="max_length",
        return_overflowing_tokens=False,
        return_offsets_mapping=True,
        return_tensors="pt",
    )

    # Add sequence_ids since the Dataset will convert `inputs` to a
    # dictionary, destroying the `sequence_ids()` method.
    inputs["sequence_ids"] = [
        inputs.sequence_ids(i) for i in range(inputs["offset_mapping"].shape[0])
    ]

    return inputs

In [None]:
gpqa_ds = load_dataset("csv", data_files="gpqa_extended.csv", split="all")
gpqa_ds = cast(Dataset, gpqa_ds)

gpqa_ds = gpqa_ds.rename_columns({"Question": "Dataset Question"})

gpqa_ds = gpqa_ds.map(generate_question_and_answers, batched=False)
gpqa_ds = gpqa_ds.map(lambda examples: tokenize_data(examples, tokenizer), batched=True)
gpqa_ds = gpqa_ds.map(lambda example: add_position_data(example), batched=True)

cols_to_remove = gpqa_ds.column_names
cols_to_remove.remove("context")
cols_to_remove.remove("question")
cols_to_remove.remove("answer")
cols_to_remove.remove("Dataset Question")
cols_to_remove.remove("Subdomain")
test_data = gpqa_ds.remove_columns(cols_to_remove)

test_data.to_csv("altered_gpqa_test.csv")

In [None]:
gpqa_test_df = pd.read_csv("altered_gpqa_test.csv")

In [None]:
answer_strings = ["A", "B", "C", " D"]
token_ids = [tokenizer.encode(s)[1] for s in answer_strings]
correct = 0
total = 0

model_outputs = []

for i in tqdm(range(gpqa_test_df.shape[0])):
# for i in range(1):
    question = gpqa_test_df.loc[i, 'Dataset Question']
    choices = gpqa_test_df.loc[i, 'context']
    messages = [
        {
            "role": "system",
            "content": "You will be asked a question. Answer with the letter of your answer choice; that is, only state 'A', 'B', 'C', or 'D'."
        },
        {
            "role": "user",
            "content": question + '\n' + choices
        },
    ]

    prompt = tokenizer.apply_chat_template(messages, tokenize=False) + "<|start_header_id|>assistant<|end_header_id|>\n\n"

    tokenized_prompt = tokenizer(prompt, return_tensors="pt").to(model.device)

    outputs = model.generate(
        tokenized_prompt["input_ids"],
        max_new_tokens = 1,
        do_sample=False,   # Greedy decoding
        return_dict_in_generate=True,  # Get additional generation info
        output_scores=True,  # Get the scores/logits
        pad_token_id=tokenizer.pad_token_id
    )

    logits = outputs.scores[0][0] # Logit for generation token

    logits = logits.cpu()
    relevant_logits = [logits[token_ids[i]] for i in range(4)]

    llm_output = answer_strings[np.argmax(np.array(relevant_logits))]

    correct_answer = " " + gpqa_test_df.loc[i, 'answer']

    model_outputs.append(llm_output)

    if llm_output == correct_answer:
        correct += 1
    total += 1

In [None]:
## If testing base model instead of finetuned model
""" 
gpqa_test_df['base_answer'] = model_outputs
gpqa_test_df.to_csv("04_GPQA/gpqa_base.csv")
"""

gpqa_test_df['finetuned_answer'] = model_outputs
gpqa_test_df.to_csv("04_GPQA/gpqa_finetuned.csv")