In [None]:
from IPython.display import HTML, display

def set_css():
  display(HTML('''
  <style>
    pre {
        white-space: pre-wrap;
    }
  </style>
  '''))
get_ipython().events.register('pre_run_cell', set_css)

In [None]:
# # Mount Google Drive
# from google.colab import drive
# drive.mount('/content/drive')
# gdrive_path = "/content/drive/MyDrive/"
gdrive_path = ""

In [None]:
%pip install -q -U bitsandbytes
%pip install -q -U git+https://github.com/huggingface/transformers.git
%pip install -q -U git+https://github.com/huggingface/peft.git
%pip install -q -U git+https://github.com/huggingface/accelerate.git

In [None]:
%pip install scipy
import torch
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
)
from peft import LoraConfig, PeftModel, PeftConfig

In [None]:
download_base_tokenizer = True

In [None]:
peft_config = LoraConfig(
    lora_alpha=16,
    lora_dropout=0.1,
    r=64,
    bias="none",
    task_type="CAUSAL_LM",
)

In [None]:
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=False,
)

In [None]:
# Load the entire model on the GPU 0
device_map = {"": 0}

In [None]:
peft_model_id = gdrive_path + "adapters/Mistral-7B-Instruct_finetuned_on_20000_core_puzzles"

config = PeftConfig.from_pretrained(peft_model_id)
model = AutoModelForCausalLM.from_pretrained(config.base_model_name_or_path,
                                                    quantization_config=bnb_config,
                                                    return_dict=True,
                                                    load_in_4bit=True,
                                                    device_map={"":0})

if download_base_tokenizer:
    base_model_name = "mistralai/Mistral-7B-Instruct-v0.1"
    tokenizer = AutoTokenizer.from_pretrained(base_model_name, trust_remote_code=True)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"
else:
    tokenizer = AutoTokenizer.from_pretrained(config.base_model_name_or_path)

tokenizer.pad_token = tokenizer.eos_token

# Load the Lora model
model = PeftModel.from_pretrained(model, peft_model_id)

In [None]:
def create_instruction(train_input_grids, train_output_grids, test_input_grids):
    instruction = "This is an abstract reasoning puzzle: "
    instruction += (
        "Below are input/output pairs of grids in two dimensional array format. "
    )
    instruction += "Each grid square contains an integer from 0 to 9. "
    instruction += "These intergers are symbolic representations of colors, and should not be thought of as scalar values. "
    instruction += "The integer 0 represents an empty grid square. The other integers 1 through 9 represent "
    instruction += "colored tiles that occupy the grid square positions corresponding to their locations in the "
    instruction += "two dimensional array. "
    instruction += (
        "Given the following "
        + str(len(train_input_grids))
        + " input/output train pairs: "
    )

    for i, (train_input_grid, train_output_grid) in enumerate(
        zip(train_input_grids, train_output_grids)
    ):
        instruction += f"Train_Input_{i+1}=["
        for j in range(len(train_input_grid)):
            instruction += "["
            for k in range(len(train_input_grid[j])):
                instruction += str(train_input_grid[j][k])
                if k != len(train_input_grid[j]) - 1:
                    instruction += ","
            instruction += "]"
            if j != len(train_input_grid) - 1:
                instruction += ","
        instruction += "]"
        instruction += f" and Train_Output_{i+1}=["
        for j in range(len(train_output_grid)):
            instruction += "["
            for k in range(len(train_output_grid[j])):
                instruction += str(train_output_grid[j][k])
                if k != len(train_output_grid[j]) - 1:
                    instruction += ","
            instruction += "]"
            if j != len(train_output_grid) - 1:
                instruction += ","
        instruction += "]"
        if i != len(train_output_grids) - 1:
            instruction += ", "

    instruction += (
        " find the transformation from each input grid to output grid that is common to all "
        + str(len(train_input_grids))
        + " train pairs. "
    )
    instruction += "For example, tiles can be created, destroyed, moved, scaled, rotated, or change color. "
    instruction += "Groups of tiles that are adjacent to each other can be considered as objects, or sub-grids. "
    instruction += "Sub-grids can also be created, destroyed, moved, scaled, rotated, or change color. "
    instruction += " Grid tiles and sub-grids can spawn into lines that extend vertically, horizontally, or diagonally. "
    instruction += (
        "Devise a candidate transformation and test it on each of the train grids. "
    )
    instruction += "If the candidate transformation fails to produce the correct train output grid for any of the train pairs, "

    instruction += "then that candidate transformation is incorrect. Devise a new candidate transformation and re-test on the train pairs. "
    instruction += "Repeat this testing until you have discovered the correct common transformation that works on all train pairs. "
    instruction += "Then apply this transformation to the following test input grid to get the test output grid: "

    for i, (test_input_grid) in enumerate(test_input_grids):
        instruction += f"Test_Input_{i+1}=["
        for j in range(len(test_input_grid)):
            instruction += "["
            for k in range(len(test_input_grid[j])):
                instruction += str(test_input_grid[j][k])
                if k != len(test_input_grid[j]) - 1:
                    instruction += ","
            instruction += "]"
            if j != len(test_input_grid) - 1:
                instruction += ","
        instruction += "]"
    instruction += "."
    instruction += " Use your knowledge of core principals in physics, mathematics, and chain of thought reasoning to solve this puzzle. "

    return instruction

In [None]:
import numpy as np
import json
import os
import re
import ast
import gc

gdrive_path = "/content/drive/MyDrive/"

directory = gdrive_path + "data/ARC/evaluation/"
file_list = []

total_files = 0
num_correct = 0
for root, dirs, files in os.walk(directory):
  for name in files:

    torch.cuda.empty_cache()
    gc.collect()

    if ".json" not in name:
      continue

    print("##################################################")
    print(name)

    file_path = os.path.join(root, name)

    with open(file_path, "r") as f:
        data = json.load(f)
    train_tasks = data["train"]
    test_tasks = data["test"]
    file_list.append(file_path)

    train_input_grids = []
    train_output_grids = []
    test_input_grids = []
    test_output_grids = []

    for train_task in train_tasks:
        ti = train_task["input"]
        train_input_grids.append(ti)

        to = train_task["output"]
        train_output_grids.append(to)

    for test_task in test_tasks:
        ti = test_task["input"]
        test_input_grids.append(ti)

        to = test_task["output"]
        test_output_grids.append(to)

    instruction = create_instruction(
        train_input_grids, train_output_grids, test_input_grids
    )

    query = '<s>[INST] ' + instruction + ' [/INST] '
    encodeds = tokenizer(query, return_tensors="pt", add_special_tokens=False)
    model_inputs = encodeds.to("cuda")

    model.eval()
    with torch.no_grad():
      generated_ids = model.generate(**model_inputs, max_new_tokens=4096, do_sample=False, pad_token_id=tokenizer.eos_token_id)
    decoded = tokenizer.batch_decode(generated_ids)
    result = decoded[0]

    match = re.search(r"Test_Output_1=\[\[.*?\]\]", result)
    if match:
        array_string = match.group(0).replace("Test_Output_1=", "")
        array = ast.literal_eval(array_string)
    else:
      print("No match found")
      print(result)
      array = []

    ground_truth = test_output_grids[0]

    print("Ground Truth")
    for row in ground_truth:
        print(row)
    print("Prediction" )
    for row in array:
        print(row)

    total_files += 1
    if array == ground_truth:
        print("...................................Correct!!!!!!!!!!!!!\n")
        num_correct += 1
    else:
        print("...................................Incorrect.\n")

    del query
    del encodeds
    del model_inputs
    del generated_ids
    del decoded
    del result
    del array
    del ground_truth
    del train_input_grids
    del train_output_grids
    del test_input_grids
    del test_output_grids
    del train_tasks
    del test_tasks
    del instruction
    del file_path
    del data
    gc.collect()
    torch.cuda.empty_cache()
    gc.collect()
    torch.cuda.empty_cache()
    gc.collect()
    torch.cuda.empty_cache()

print(f"Total files: {total_files}")
print(f"Correct: {num_correct}")
print(f"Accuracy: {num_correct/total_files}")
