In [None]:
from dataclasses import dataclass
from datasets import Dataset
from datetime import datetime
import evaluate
import gdown
import logging
from logging import debug, info, warning, error
import numpy as np
import os
from pprint import pformat
import re
from sklearn.metrics import accuracy_score, f1_score
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    AutoModelForMultipleChoice,
    BitsAndBytesConfig,
    set_seed,
    Trainer,
    TrainingArguments,
)
from transformers.tokenization_utils_base import PreTrainedTokenizerBase, PaddingStrategy
from typing import Iterator, Optional, Sequence, Union

LOG_LEVEL = 'DEBUG'
LOG_TO_FILE = True
log_folder='logs'

if not os.path.exists(log_folder):
    os.mkdir(log_folder)

logging.basicConfig(
  level=logging.getLevelName(LOG_LEVEL), 
  format='%(message)s', 
  force=True,
  filename=f'{log_folder}/{datetime.now()}' if LOG_TO_FILE else None,
  filemode='a'
)

data_folder = 'data'
sp_train_path = f'./{data_folder}/SP-train.npy'
sp_eval_path = f'./{data_folder}/SP_eval_data_for_practice.npy'
wp_train_path = f'./{data_folder}/WP-train.npy'
wp_eval_path = f'./{data_folder}/WP_eval_data_for_practice.npy'

# https://drive.google.com/drive/u/0/folders/1BNnhh2HsxId3bWQ6_A4Mou4x44Bm43VY
if not os.path.exists(data_folder):
    gdown.download_folder(id='1BNnhh2HsxId3bWQ6_A4Mou4x44Bm43VY')

In [None]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
SEED = 42

TEST_SPLIT_RATIO = 0.1
VALIDATE_SPLIT_RATIO = 0.2

set_seed(SEED)

def get_data(path: str) -> Iterator[dict]:
  """
  Load the data from the given path and return a list of dicts
  Training data has all of the fields, while test data only contains question and choice_list
  NOTE: there are a couple items in the dataset that have incorrect schemas... SP-209 (not valid), SP-219-221 (label is float)
  """
  entries = [{k: (v if isinstance(v, Sequence) else str(v)) for k, v in entry.items()} for entry in np.load(path, allow_pickle=True)]
    for entry in entries:
        if 'label' in entry:
            entry['label'] = int(entry['label'])

    for i, choice in enumerate(['a', 'b', 'c', 'd']):
        entry[choice] = entry['choice_list'][i]
    return entries

# NOTE: the 'eval' test set is just to test formatting, we can't actually use it for anything since it doesn't have
# labels, so we need to create our validation/test sets manually from the 'train' set
sp_dataset = Dataset.from_list(get_data(sp_train_path)).shuffle(seed=SEED).train_test_split(TEST_SPLIT_RATIO)
validation = sp_dataset['test']
sp_dataset = sp_dataset['train'].train_test_split(VALIDATE_SPLIT_RATIO)
sp_dataset['validation'] = validation

debug(sp_dataset)
debug(sp_dataset['train'][0])
debug(sp_dataset['validation'][0])
debug(sp_dataset['test'][0])

wp_dataset = Dataset.from_list(get_data(wp_train_path)).shuffle(seed=SEED).train_test_split(TEST_SPLIT_RATIO)
validation = wp_dataset['test']
wp_dataset = wp_dataset['train'].train_test_split(VALIDATE_SPLIT_RATIO)
wp_dataset['validation'] = validation

debug(wp_dataset)
debug(wp_dataset['train'][0])
debug(wp_dataset['validation'][0])
debug(wp_dataset['test'][0])

sp_test_dataset = Dataset.from_list(get_data(sp_eval_path))
wp_test_dataset = Dataset.from_list(get_data(wp_eval_path))

debug(sp_test_dataset)
debug(sp_test_dataset[0])


## MultipleChoice

This is a sample of how it would work with models that support AutoModelForMultipleChoice, modified from https://huggingface.co/docs/transformers/en/tasks/multiple_choice

NOTE: can't seem to get BERT to load on the GPU for this task...

In [None]:
%%script echo skipping

model_id = 'google-bert/bert-base-uncased'

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForMultipleChoice.from_pretrained(model_id)

def preprocess(examples, **fn_kwargs):
  # for each example in the batch, convert to 4 question/option pairs
  # we use 'choice_list' as it's in the order that corresponds to index in 'label'
    first_sentences = [[question] * 4 for question in examples['question']]
    second_sentences = [[option for option in examples['choice_list'][i]] for i in range(len(examples['question']))]
  
    debug(first_sentences[0])
    debug(second_sentences[0])
  
    # flatten to allow tokenizing
    first_sentences = sum(first_sentences, [])
    second_sentences = sum(second_sentences, [])
  
    # truncate here, but dynamically pad to longest in batch if needed later  in the data collator
    tokenized_examples = fn_kwargs['tokenizer'](first_sentences, second_sentences, truncation=True)
  
    # unflatten for use in inference
    return {k: [v[i : i + 4] for i in range(0, len(v), 4)] for k, v in tokenized_examples.items()}

tokenized_sp_dataset = sp_dataset.map(preprocess, batched=True, fn_kwargs={'tokenizer': tokenizer})
debug(tokenized_sp_dataset['train'][0])

@dataclass
class DataCollatorForMultipleChoice:
    """
    Data collator that will dynamically pad the inputs for multiple choice received.
    """

    tokenizer: PreTrainedTokenizerBase
    padding: Union[bool, str, PaddingStrategy] = True
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None

    def __call__(self, features):
        label_name = "label" if "label" in features[0].keys() else "labels"
        labels = [feature.pop(label_name) for feature in features]
        batch_size = len(features)
        num_choices = len(features[0]["input_ids"])
        flattened_features = [
            [{k: v[i] for k, v in feature.items()} for i in range(num_choices)] for feature in features
        ]
        flattened_features = sum(flattened_features, [])

        batch = self.tokenizer.pad(
            flattened_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )

        batch = {k: v.view(batch_size, num_choices, -1) for k, v in batch.items()}
        batch["labels"] = torch.tensor(labels, dtype=torch.int64)
        return batch

accuracy = evaluate.load('accuracy')

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

training_args = TrainingArguments(
    output_dir="./my_awesome_model",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    learning_rate=5e-5,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    num_train_epochs=1,
    weight_decay=0.01
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_sp_dataset["train"],
    eval_dataset=tokenized_sp_dataset["validation"],
    tokenizer=tokenizer,
    data_collator=DataCollatorForMultipleChoice(tokenizer=tokenizer),
    compute_metrics=compute_metrics,
)

trainer.train()

In [None]:
%%script echo skipping

tokenizer = AutoTokenizer.from_pretrained('my_awesome_model')
model = AutoModelForMultipleChoice('my_awesome_model')

trainer = Trainer(
    model=model,
    args=TrainingArguments(output_dir='./results', per_device_eval_batch_size=64),
    eval_dataset=tokenized_sp_dataset['test'],
    tokenizer=tokenizer,
    data_collator=DataCollatorForMultipleChoice(tokenizer=tokenizer),
    compute_metrics=compute_metrics
)


## Most Likely Response

Similar to above, we split an MCQ into 4 question/option pairs, and then we use a CausalLM model to compute the likelihood that the model would have generated the answer. No fine-tuning

In [None]:
%%script echo skipping

model_id = 'gpt2'

def calculate_likelihood(model, tokenizer, question, option):
    tokens = tokenizer.encode(question + option, return_tensors='pt')
    prompt_length = len(tokenizer.encode(question))
    input_ids = tokens[:, :-1]
    target_ids = tokens[:, 1:].clone()

    with torch.no_grad():
        outputs = model(input_ids, labels=target_ids)
        log_likelihood = outputs.loss * target_ids.size(1)
    return log_likelihood.item()

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id)

predictions = []
labels = []
for example in sp_dataset['train'].select(range(4)):
    pairs = [[example['question'], example['choice_list'][i]] for i in range(len(example['choice_list']))]
    likelihoods = [calculate_likelihood(model, tokenizer, question, option) for question, option in pairs]
    max_likelihood = max(likelihoods)
    max_index = likelihoods.index(max_likelihood)
    predictions.append(max_index)
    labels.append(example['label'])

debug(f'{predictions[0]}, {labels[0]}')
info(f"Accuracy: {accuracy_score(labels, predictions)}")
info(f"F1: {f1_score(labels, predictions, average='weighted')}")

## Prompt Engineering

This approach tries to use large LLMs and use varying prompting strategies to answer with the best option

In [None]:
from string import Template

message_templates = {
    'manual-basic-mcq': Template('''
Which of the following options answers this multiple-choice question: $question
A) $a
B) $b
C) $c
D) $d
Provide the correct option as A, B, C, or D.
Answer: '''),

    'manual-basic-riddle': Template('''
Which of the following options answers this multiple-choice riddle: $question
A) $a
B) $b
C) $c
D) $d
Provide the correct option as A, B, C, or D.
This is a riddle so the answer may involve word play or lateral thinking.
Answer: '''),
    
    'manual-chain-of-thought': Template('''
Which of the following options answers this multiple-choice riddle: $question
A) $a
B) $b
C) $c
D) $d
Provide the correct option as A, B, C, or D.
This is a riddle so the answer may involve word play or lateral thinking.
Explain how you deduce the answer step by step.
Answer: '''),

    'llama-3-prompt-generation': Template('''
Riddle: $question
Options:
A) $a
B) $b
C) $c
D) $d
Choose the correct answer:
(Select one of the above options by typing A, B, C, or D)'''),
    
    'gemini-prompt-generation': Template('''
Riddle:

$question

Possible Answers:

    A. $a
    B. $b
    C. $c
    D. $d

Instructions:

    Read the riddle carefully and consider the meaning of the words used.
    Analyze the possible answers and identify any clues that point towards a specific answer.
    Based on your understanding of the riddle and the logic behind the answer choices, select the option that best fits the riddle's description.
 
'''),

    'chatgpt-4-prompt-generation': Template('''
Riddle: $question

Options:
A) $a
B) $b
C) $c
D) $d

Please select the correct option that answers the riddle above. ''')
}

model_configs = {
  'gemma-7b-it': {
    'id': 'google/gemma-7b-it',
    'normalize': lambda response: response.split('\nmodel\n')[1].strip()
  },
  'llama-3-8b-it': {
    'id': 'meta-llama/Meta-Llama-3-8B-Instruct',
    'normalize': lambda response: response.split('assistant\n')[1].strip().replace('\n', ' ')
  }
}

In [None]:
from transformers import pipeline

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.bfloat16
)

model_key = 'llama-3-8b-it' # Change to gemma-7b-it
model_config = model_configs[model_key]

# Mem savings: we can either load the model in 16bit precision and use 'device_map=auto' or in less than 16 bit with bitsandbytes
# low_cpu_mem_usage=True prevents overhead of having a randomly initialized weight matrix created first and then updating with the model params
# device_map='auto' spread the models weights into GPU, CPU, and then to disk, and also sets low_cpu_mem_usage
# torch_dtype=torch.float16 reduces memory usage by half, since most models are created using 32 bit precision params
# quantization_config allows for setting parameter precision less than 16 bits.
# model = AutoModelForCausalLM.from_pretrained(model_config['id'], quantization_config=quantization_config, low_cpu_mem_usage=True)
model = AutoModelForCausalLM.from_pretrained(model_config['id'], device_map='auto', torch_dtype=torch.float16)

tokenizer = AutoTokenizer.from_pretrained(model_config['id'], padding_side="left")
tokenizer.pad_token = tokenizer.pad_token if tokenizer.pad_token else tokenizer.eos_token
debug(f'Pad token: {tokenizer.pad_token}')
debug(f'EOS token: {tokenizer.eos_token}')

In [None]:
prog = re.compile('^(?:([ABCD])[.)]|(?![ABCD][.)]).*\s([ABCD])[ .)]|.*: ([ABCD]))')

for template_id in message_templates.keys():
    
    info(f'Using template: {template_id}')
    predictions = []
    labels = []
    
    for example in wp_dataset['test']: # Also Change to sp_dataset.
        title = f'EVAL-{model_key}-{template_id}-wp'

        logging.basicConfig(
          level=logging.getLevelName(LOG_LEVEL),
          format='%(message)s',
          force=True,
          filename=f'{log_folder}/{title}' if LOG_TO_FILE else None,
          filemode='a'
        )
        info('-------------------')

        messages = [{"role": "user", "content": message_templates[template_id].substitute(**example)}]
        debug(messages[0]['content'])
        model_inputs = tokenizer.apply_chat_template(messages, add_generation_prompt=True, return_tensors='pt').to(DEVICE)
        generated_ids = model.generate(
            model_inputs,
            max_new_tokens=1000 if template_id == 'manual-chain-of-thought' else 750,
            do_sample=True, # greedy vs. sampling
            top_k = 50,
            top_p = 0.95,
            
        )

        response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
        response = response.replace('**', '') # Some models seem to like outputting with ** for formatting purposes...

        # do any model specific processing if required
        response = model_config['normalize'](response) if 'normalize' in model_config else response

        debug(f'Response: {response}')

        # heuristics to get the answer from the response, first check if there's a regex match, and then
        # check if the text of an option is contained 2s in the response, i.e. 'the answer is <option text>'
        matches = prog.search(response)
        option = next((group for group in matches.groups() if group), None) if matches else None
        if not option:
            warning(f'No regex match')

            for i, choice in enumerate(['A', 'B', 'C', 'D']):
                option_text = example['choice_list'][i].lower().rstrip('.')
                response_text = response.lower()
                if option_text in response_text:
                    option = choice
                    break

        if not option:
            info('Could not get option, skipping...')
            continue

        answer = option
        info(f'Answer: {answer}')
        answer_index = ord(answer) - ord('A') # get the index of the answer
        if 'label' in example:
            correct_index = example['label']
            correct = ['A', 'B', 'C', 'D'][correct_index]
            info(f'Correct: {correct}')
            is_correct = answer_index == correct_index
            info(f"{'Right' if is_correct else 'Wrong'}!\n")
            labels.append(correct_index)
        predictions.append(answer_index)

    info('-------------------')
    with open(f'{title}.txt', 'w') as f:
        f.write(pformat({'predictions': predictions, 'labels': labels}))

    accuracy = accuracy_score(labels, predictions)
    f1 = f1_score(labels, predictions, average='weighted')
    info(f"Accuracy: {accuracy}\nF1: {f1}")