# Brain teaser

[Original Github REPO](https://github.com/1171-jpg/BrainTeaser)

[Leaderboard](https://huggingface.co/spaces/HuggingFaceH4/open_llm_leaderboard)


### Importing required functions

In [None]:
!pip install -q -U langchain transformers bitsandbytes accelerate

In [None]:
import numpy as np
import json
from datasets import load_dataset
from random import shuffle
import random

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

import torch
from torch.nn.functional import normalize
from transformers import AutoModel, AutoTokenizer

from tqdm.auto import tqdm
import numpy as np
import os
from transformers import StoppingCriteriaList

from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, pipeline
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForCausalLM
import gc

In [None]:
good_responses, letters = None, None
def set_letters(k):
    global letters, good_responses
    letters = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P'][:k]
    good_responses = [f'{x})' for x in letters]
set_letters(4)
print(letters)
print(good_responses)

### Metrics

Taken from the repository of the original authors of the dataset: https://github.com/1171-jpg/BrainTeaser/tree/main

In [None]:
import json
from tqdm import tqdm
import torch
import logging
import argparse
import numpy as np

def getResultdata(result_data):
    global letters
    choice_to_index = {letters[i]: i for i in range(len(letters))}
    choice_to_index[None] = len(letters)

    word_play = {}
    reverse_play = {}
    for item in result_data:
        item_type = item['id'].split("-")[0]
        item_id = item['id'].split("-")[1].split("_")[0]
        if item_type == 'WP':
            if item_id not in word_play:
                word_play[item_id] = [0,0,0]
        else:
            if item_id not in reverse_play:
                reverse_play[item_id] = [0,0,0]

    for item in result_data:
        item_type = item['id'].split("-")[0]
        item_id = item['id'].split("-")[1].split("_")[0]
        ad_type = 0
        if 'SR' in item['id']:
            ad_type = 1
        elif 'CR' in item['id']:
            ad_type = 2
        else:
            ad_type = 0

        if item_type == 'WP':
            if choice_to_index[item['predict']] == item['label']:
                word_play[item_id][ad_type] = 1
        else:
            if choice_to_index[item['predict']] == item['label']:
                reverse_play[item_id][ad_type] = 1
                
    return word_play,reverse_play


def getMetric(data_list):
    data_list = np.array(data_list)
    overall_accuracy = np.sum(data_list)/3/len(data_list)
    original_accuracy = np.sum(data_list,axis = 0)[0]/len(data_list)
    semantic_accuracy = np.sum(data_list,axis = 0)[1]/len(data_list)
    context_accuracy = np.sum(data_list,axis = 0)[2]/len(data_list)
    ori_sema = np.sum([1 if item[0]==1 and item[1] == 1 else 0 for item in data_list])/len(data_list)
    ori_sema_cont = np.sum([1 if item[0]==1 and item[1] == 1 and item[2] == 1  else 0 for item in data_list])/len(data_list)
    
    print("over_all accuracy {}".format(overall_accuracy))
    print("single_original_accuracy {}".format(original_accuracy))
    print("single_semantic_accuracy {}".format(semantic_accuracy))
    print("single_context_accuracy {}".format(context_accuracy))
    print("sr_accuracy {}".format(ori_sema))
    print("cr_accuracy {}".format(ori_sema_cont))

    return {'over_all accuracy':overall_accuracy,'original_accuracy':original_accuracy,'semantic_accuracy':semantic_accuracy,'context_accuracy':context_accuracy,'ori_sema':ori_sema,'ori_sema_cont':ori_sema_cont}


def getSeperateResult(word_play,reverse_thinking):
    final_result = {}
    word_data_list = []
    word_data_list = list(word_play.values())
    print('#########Wordplay##########')
    final_result['wordplay'] = getMetric(word_data_list)
    
    reverse_data_list = []
    for item in reverse_thinking.values():
        reverse_data_list.append(item)
    print('#########Sentence##########')   
    final_result['sentence'] = getMetric(reverse_data_list)  
    
    
    all_data = word_data_list + reverse_data_list
    print('#########All data##########') 
    final_result['all'] = getMetric(all_data) 
    
    return final_result

### Data saving

The file name format follows: model_name + name + serial number.

Automatically appends +1 to the name if a file with the same name already exists.

Inside, it writes the first test example and the results.

In [None]:
dirname= 'results'
pickle_dir = 'pickled_results'

def create_dir(name):
    try:
        os.mkdir(name)
    except:
        pass

def save(final_result, name=None, few_shot=False, example=format, data=None):
    create_dir(dirname)
    create_dir(pickle_dir)
    
    model_name = model_path.split('/')[-1]
    inserted_name = "" if name is None else "_" + name
    i = 1
    while True:
        file_path = f'{dirname}/{model_name}{inserted_name}_{i}.txt'
        pickle_path = f'{pickle_dir}/{model_name}{inserted_name}_{i}.json'
        try:
            with open(file_path, 'r'):
                i += 1
        except:
            with open(file_path, 'w') as file:
                file.write(model_name+'\n')
                if few_shot:
                    file.write(few_shot + '\n')
                if not isinstance(example, str):
                    example = json.dumps(example)
                file.write(example + '\n')
                file.write(json.dumps(final_result, indent=4))
            if data is None:
                print(f"Warning, {model_name} results data not saving in json, please provide data arg")
            else:
                with open(pickle_path, 'w') as file:
                    file.write(json.dumps(data, indent=4))
            return

### Model loading

Login to HuggingFace account to load advanced models.

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

In [None]:
from huggingface_hub import login
login('', add_to_git_credential=False, write_permission=True)

In [None]:
# model_path = 'mistralai/Mistral-7B-Instruct-v0.2'
# model_path = 'microsoft/Phi-3-mini-128k-instruct'
# model_path = "meta-llama/Meta-Llama-3-8B-Instruct"
model_path = "google/gemma-1.1-7b-it"

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
)

def get_model():
    tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
    model = AutoModelForCausalLM.from_pretrained(
        model_path,
        device_map="auto",
        quantization_config=quantization_config,
        trust_remote_code=True,
    )
    return tokenizer, model

tokenizer, model = get_model()

### Dataset

- Taken from original authors link: https://drive.google.com/drive/u/0/folders/1kiFXp5fqpf8--NQJJAlBIBpfSaXTk1UY

In [None]:
data_path = '/kaggle/input/brainteaser/data'
sentence_data_path = f"{data_path}/SP-train.npy"
wordplay_data_list = f"{data_path}/WP-train.npy"
sentence_data_list = list(np.load(sentence_data_path,allow_pickle=True))
wordplay_data_list = list(np.load(wordplay_data_list,allow_pickle=True))

test_data_list = sentence_data_list + wordplay_data_list
print(f"Dataset length {len(test_data_list)}")

In [None]:
format = ""
def get_format():
    global format, letters
    newline = '\n'
    format = f"""Question: {"{}"}

Choices:
{''.join('(' + x + ') {}' + newline for x in letters)}"""
    return format

def get_llama_format(demonstration=None):
    global tokenizer, format
    messages = []
    if demonstration is not None:
        messages.append({"role": "system", "content": demonstration})
    messages.append({"role": "user", "content": get_format()})
    messages.append({"role": "assistant", "content": "Answer:("})
    try:
        format = tokenizer.apply_chat_template(messages, tokenize=False)
    except Exception:
        # When using mistral model, roles in messages must alternate, this way it works without errors
        messages[1]["content"] = demonstration + "\n\n" + messages[1]["content"]
        format = tokenizer.apply_chat_template(messages[1:], tokenize=False)
    # remove "<|eot_id|>" from the end
    format = format[:format.index("Answer:(")+8]

def get_appended_format(demonstration=None):
    global format
    format = get_format()
    if demonstration is not None:
        format = demonstration + "\n\n" + format
    format += "\nAnswer:("

# get_llama_format("Primjer")
set_letters(4)
get_appended_format()
print(format)

In [None]:
format = """The following question is a brainteaser.

Question: {}

Choices:
(A) {}
(B) {}
(C) {}
(D) {}

Answer:("""

In [None]:
def get_single_demo(sample):
    global format
    sample_demo = format.format(
        sample['question'], *sample['choice_list'])
    return sample_demo

### Utility functions

In [None]:
def set_predictions(data_list):
    """Uzima response i parsira ga tako da pod predict stavi slovo koje je napisano"""
    global letters
    for index,item in enumerate(data_list):
        item['predict'] = None
        for x in letters:
            if (f'{x})') in item['response']:
                item['predict'] = x

        if item['predict'] is None:
            print(index)

def custom_stopping_criteria(input_ids: torch.LongTensor, score: torch.FloatTensor, **kwargs) -> bool:
    """
    Funkcija kako bi se dinamicki prepoznalo kad treba prestati generirati tekst:
    ne nakon fiksnog broja tokena, vec kad model napise rjesenje. Najcesce
    ce to ipak biti medu prva dva tokena
    """
    global good_responses
    decoded = tokenizer.decode(input_ids[0][-3:])
    for good_response in good_responses:
        if good_response in decoded:
            return True
    return False

stopping_criteria = StoppingCriteriaList([custom_stopping_criteria])

In [None]:
def generate(samples, tokens=10, all_tokens=True, few_shot=False):
    global model
    for sample in tqdm(samples):
        if few_shot:
            text = demonstration + get_single_demo(sample)
        else:
            text = get_single_demo(sample)
        inputs = tokenizer.encode(text, return_tensors="pt", return_attention_mask=False).to(device)
        original_tokens = len(inputs[0])
        outputs = model.generate(
            inputs,
            pad_token_id=tokenizer.eos_token_id,
            do_sample = False,
            max_new_tokens=tokens,
            stopping_criteria=stopping_criteria
        )
        outputs = outputs[0][0 if all_tokens else original_tokens:]
        sample['response'] = tokenizer.decode(outputs)
    set_predictions(samples)

## Experiments

### Prompt modifications

In [None]:
demonstration1 = "Please pick the best choice for the brain teaser. Each brain teaser has only one possible solution including the choice none of above, answer should only provide the choice:"
demonstration2 = "The following question is a brainteaser. One choice is correct, other choices are semanticaly derived from the question."
demonstration3 = "The following question is a brainteaser."
demonstration4 = "The following question is a brainteaser. One choice is correct, other choices are semanticaly derived from the question. Sometimes none of the above is correct."
demonstration5 = "The following question is a brainteaser. Sometimes none of the above is correct."

In [None]:
def generate_with_demonstration(demonstration=None,test_data_list=test_data_list, name="with-instruction", llama_format=False):
    if llama_format:
        get_llama_format(demonstration)
    else:
        get_appended_format(demonstration)
    generate(test_data_list[:10], tokens=1000, all_tokens=False, few_shot=False)
    word_play,sentence_play = getResultdata(test_data_list)
    final_result = getSeperateResult(word_play, sentence_play)
    save(final_result, name=name, example=get_single_demo(test_data_list[0]), data=test_data_list)

### Mutiple answers

In [None]:
def create_with_k_questions(i, k):
    global test_data_list
    current = test_data_list[i]
    choice_list = [*current['choice_list']]
    choice_order = [*current['choice_order']]
    while len(choice_list) < k-1:
        i -= 11
        previous = test_data_list[i]
        choice_list.insert(-1, previous['distractor1'])
        choice_list.insert(-1, previous['distractor2'])
    last_order = choice_list[-1]
    choice_list = choice_list[:-1]
#     print(current['answer'])
    shuffle(choice_list)
    choice_list.append(last_order)
    try:
        label = choice_list.index(current['answer'])
    except Exception as e:
        print(current['answer'])
        label = len(choice_list)-1
    new = {**current}
    new['label'] = label
    new['choice_list'] = choice_list
    return new

def test_with_k_questions(generated_data_list):
    for original, new in zip(test_data_list, generated_data_list):
        assert set(original['choice_list']).issubset(set(new['choice_list'])), str(original) + '\n'+ str(new)

def create_list_with_k_questions(k):
    global letters
    random.seed(10)
    generated_data_list = []
    for i in range(len(test_data_list)):
        new = create_with_k_questions(i, len(letters))
        generated_data_list.append(new)
    test_with_k_questions(generated_data_list)
    return generated_data_list

### Fixed position of correct answer

In [None]:
def create_with_ordered_correct_answer(i, target_position):
    current = test_data_list[i]
    new = {**current}
    choice_list = [*current['choice_list']]
    correct_answer = choice_list[current["label"]]
    choice_list.remove(correct_answer)
    choice_list.insert(target_position,correct_answer)
    new['choice_list'] = choice_list
    new['label'] = target_position
    return new

generated_data_list = []
for i in range(len(test_data_list)):
    new = create_with_ordered_correct_answer(i, 3)
    generated_data_list.append(new)
    

### Repeated distractors

In [None]:
def all_same_incorrect_answers(i):
    current = test_data_list[i]
    new = {**current}
    choice_list = [*current['choice_list']]
    correct_answer = choice_list[current["label"]]
    choice_list.remove(correct_answer)
    incorrect_answer = choice_list[0]
    choice_list.clear()
    choice_list.insert(0,correct_answer)
    choice_list.insert(1,incorrect_answer)
    choice_list.insert(2,incorrect_answer)
    choice_list.insert(3,incorrect_answer)
    random.shuffle(choice_list)
    
    new['choice_list'] = choice_list
    new['label'] = choice_list.index(correct_answer)
    return new

generated_data_list = []
for i in range(len(test_data_list)):
    new = all_same_incorrect_answers(i)
    generated_data_list.append(new)

# Save the list to a file
import pickle
with open('generated_data_list.pkl', 'wb') as file:
    pickle.dump(generated_data_list, file)

# Loading the generated list to ensure consistency across all models (since we use random.shuffle)
with open('/kaggle/input/generated-data-list/generated_data_list.pkl', 'rb') as file:
    generated_data_list = pickle.load(file)

### Replace one distractor with question

In [None]:
def replace_distractor_with_question():
    output_name = "replace_distractor_with_question"
    
    test_data_list = load_dataset()

    for dict in test_data_list:
        dict["distractor1"] = dict["question"]
        distractor1_index = dict["choice_order"].index(1)
        choice_list = dict["choice_list"]
        choice_list[distractor1_index] = dict["question"]
    
    return test_data_list, output_name
generated_data_list, output_name = replace_distractor_with_question()

### Shuffled text (gibberish)

In [None]:
def turn_to_gibberish():
    output_name = "gibberish_all"
    
    test_data_list = load_dataset()

    random.seed(10)
    for dict in test_data_list:
        choice_list = dict["choice_list"]
        for i in range(len(choice_list)):
            if i != dict["label"]:
                s = choice_list[i]
                choice_list[i] = ''.join(random.sample(s,len(s)))
        
        if dict["id"] == "SP-88":
            for key,item in dict.items():
                print(f"{key}: {item}")
    
    return test_data_list, output_name
generated_data_list, output_name = turn_to_gibberish()

## Generate and save

In [None]:
generate(generated_data_list, tokens=20, all_tokens=False, few_shot=False)

In [None]:
word_play,sentence_play = getResultdata(generated_data_list)
final_result = getSeperateResult(word_play, sentence_play)

In [None]:
save(final_result, name="", example=get_single_demo(generated_data_list[0]), data=generated_data_list)

In [None]:
# Cleanup before loading new model
import gc
del model
model = None
gc.collect()
torch.cuda.empty_cache()