## Generating results from an LLM on 3-Shot AugARC

### This Script generates predictions from an LLM on the 3-Shot AugARC Evaluation set, saves them in a json and produced the final accuracy of the model.

The first step is to host the model for predictions. Please download [text-generation-webui](https://github.com/oobabooga/text-generation-webui) and load the LLM that you would like to evalute. Expose the model via public API on port 5010.

In [1]:
import requests

url = "http://127.0.0.1:5010/api/v1/generate"

In [2]:
import os
import json

def read_json_files(directory):
    json_data = {}

    for filename in os.listdir(directory):
        if filename.endswith('.json'):
            file_path = os.path.join(directory, filename)

            with open(file_path, 'r') as file:
                filename = str(file.name)[-13:][:-5]
                try:
                    data = json.load(file)
                    json_data[filename] = data
                except json.JSONDecodeError as e:
                    print(f"Error reading {filename}: {e}")

    return json_data

directory = 'arc_data/evaluation'
all_json_data = read_json_files(directory)

In [3]:
special_task_ids = ['12997ef3', '1d398264', '31d5ba1a', '3b4c2228', '4852f2fa', '4c177718', '5d2a5c43', '6ea4a07e', '8b28cd80', '9110e3c5', '9b4c17c4', 'b1fc8b8e', 'bbb1b8b6', 'c074846d', 'd5c634a2', 'da2b0fe3', 'e21a174a', 'e345f17b', 'f3e62deb']

In [4]:
def rotate_matrix_90_degrees(matrix):
    return [list(row) for row in zip(*matrix[::-1])]

def rotate_matrix_270_degrees(matrix):
    return [list(row) for row in zip(*matrix)][::-1]

def transform_input_90(data, special_task=False):
    transformed_data = {}

    for key in ['train', 'test']:
        transformed_data[key] = ''
        if key == 'test' and special_task:
            transformed_data_special = {
                '0': '',
                '1': '',
            }
            for indx, case in enumerate(data[key]):
                input_matrix = rotate_matrix_90_degrees(case['input'])
                output_matrix = rotate_matrix_90_degrees(case['output'])
                new_input = ''
                new_output = ''
                for row in input_matrix:
                    for i, element in enumerate(row):
                        new_input += str(element)
                        if i < len(row) -1 :
                            new_input += ' '
                        else:
                            new_input += '\n'

                for row in output_matrix:
                    for i, element in enumerate(row):
                        new_output += str(element)
                        if i < len(row) -1 :
                            new_output += ' '
                        else:
                            new_output += '\n'


                transformed_data_special[str(indx)] = f'\n###Input:\n{new_input}\n###Output:\n{new_output}'

            transformed_data[key] = transformed_data_special
        else:
            for case in data[key]:
                input_matrix = rotate_matrix_90_degrees(case['input'])
                output_matrix = rotate_matrix_90_degrees(case['output'])
                new_input = ''
                new_output = ''
                for row in input_matrix:
                    for i, element in enumerate(row):
                        new_input += str(element)
                        if i < len(row) -1 :
                            new_input += ' '
                        else:
                            new_input += '\n'

                for row in output_matrix:
                    for i, element in enumerate(row):
                        new_output += str(element)
                        if i < len(row) -1 :
                            new_output += ' '
                        else:
                            new_output += '\n'

                transformed_data[key] += f'\n###Input:\n{new_input}\n###Output:\n{new_output}'

    return transformed_data

In [5]:
def transform_input_270(data, special_task=False):
    transformed_data = {}

    for key in ['train', 'test']:
        transformed_data[key] = ''
        if key == 'test' and special_task:
            transformed_data_special = {
                '0': '',
                '1': '',
            }
            for indx, case in enumerate(data[key]):
                input_matrix = rotate_matrix_270_degrees(case['input'])
                output_matrix = rotate_matrix_270_degrees(case['output'])
                new_input = ''
                new_output = ''
                for row in input_matrix:
                    for i, element in enumerate(row):
                        new_input += str(element)
                        if i < len(row) -1 :
                            new_input += ' '
                        else:
                            new_input += '\n'

                for row in output_matrix:
                    for i, element in enumerate(row):
                        new_output += str(element)
                        if i < len(row) -1 :
                            new_output += ' '
                        else:
                            new_output += '\n'


                transformed_data_special[str(indx)] = f'\n###Input:\n{new_input}\n###Output:\n{new_output}'

            transformed_data[key] = transformed_data_special
        else:
            for case in data[key]:
                input_matrix = rotate_matrix_270_degrees(case['input'])
                output_matrix = rotate_matrix_270_degrees(case['output'])
                new_input = ''
                new_output = ''
                for row in input_matrix:
                    for i, element in enumerate(row):
                        new_input += str(element)
                        if i < len(row) -1 :
                            new_input += ' '
                        else:
                            new_input += '\n'

                for row in output_matrix:
                    for i, element in enumerate(row):
                        new_output += str(element)
                        if i < len(row) -1 :
                            new_output += ' '
                        else:
                            new_output += '\n'

                transformed_data[key] += f'\n###Input:\n{new_input}\n###Output:\n{new_output}'

    return transformed_data

In [6]:
def transform_input(data, special_task=False):
    transformed_data = {}

    for key in ['train', 'test']:
        transformed_data[key] = ''
        if key == 'test' and special_task:
            transformed_data_special = {
                '0': '',
                '1': '',
            }
            for indx, case in enumerate(data[key]):
                input_matrix = case['input']
                output_matrix = case['output']
                new_input = ''
                new_output = ''
                for row in input_matrix:
                    for i, element in enumerate(row):
                        new_input += str(element)
                        if i < len(row) -1 :
                            new_input += ' '
                        else:
                            new_input += '\n'

                for row in output_matrix:
                    for i, element in enumerate(row):
                        new_output += str(element)
                        if i < len(row) -1 :
                            new_output += ' '
                        else:
                            new_output += '\n'


                transformed_data_special[str(indx)] = f'\n###Input:\n{new_input}\n###Output:\n{new_output}'

            transformed_data[key] = transformed_data_special
        else:
            for case in data[key]:
                input_matrix = case['input']
                output_matrix = case['output']
                new_input = ''
                new_output = ''
                for row in input_matrix:
                    for i, element in enumerate(row):
                        new_input += str(element)
                        if i < len(row) -1 :
                            new_input += ' '
                        else:
                            new_input += '\n'

                for row in output_matrix:
                    for i, element in enumerate(row):
                        new_output += str(element)
                        if i < len(row) -1 :
                            new_output += ' '
                        else:
                            new_output += '\n'

                transformed_data[key] += f'\n###Input:\n{new_input}\n###Output:\n{new_output}'

    return transformed_data

In [7]:
eval_data = {}
eval_data_90 = {}
eval_data_270 = {}

for key, element in all_json_data.items():
    if key in special_task_ids:
        eval_data[key] = transform_input(element, True)
    else:
        eval_data[key] = transform_input(element)

for key, element in all_json_data.items():
    if key in special_task_ids:
        eval_data_90[key] = transform_input_90(element, True)
    else:
        eval_data_90[key] = transform_input_90(element)

for key, element in all_json_data.items():
    if key in special_task_ids:
        eval_data_270[key] = transform_input_270(element, True)
    else:
        eval_data_270[key] = transform_input_270(element)

In [8]:
import ast

def extract_after_output(text):
    index = text.find('###Output:\n')
    if index != -1:
        return text[index + len('###Output:\n'):]
    else:
        return text

def extract_before_output(text):
    index = text.find('###Output:\n')
    if index != -1:
        return text[:index]
    else:
        return text

In [9]:
request = {
    'temperature': 0.1,
    'top_p': 0.7,
    'stopping_strings': ['\n\n'],
    'max_new_tokens': 400
}

In [None]:
DEFAULT_PROMPT = "We are playing a game which involves transforming a 2D input grid of digits into an output grid of digits. Every below pair of grids contains the same transformation. Each Input grid is followed by an Output grid which applies the same transformation as previous Input/Output pairs. Given the provided examples, output only the correct grid for the last input without any additional explanation"

correct_tasks = set()
count = 0
results_dict = {}

for key, element in eval_data.items():
    correct_output = ''
    results_dict[key] = {}
    if key in special_task_ids:
        correct_list = [False, False]

        for indx in range(2):
            j = 0
            results_dict[key][str(indx)] = {}

            while j < 3:
                request['prompt'] = DEFAULT_PROMPT+'\n'

                if j == 0:
                    train_data = eval_data[key]['train']
                    question = extract_before_output(eval_data[key]['test'][str(indx)]).strip()
                    request['prompt']+= f'{train_data}\n{question}\n\n###Output:\n'
                    correct_output = extract_after_output(eval_data[key]['test'][str(indx)]).strip()

                if j == 1:
                    train_data = eval_data_90[key]['train']
                    question = extract_before_output(eval_data_90[key]['test'][str(indx)]).strip()
                    request['prompt']+= f'{train_data}\n{question}\n\n###Output:\n'
                    correct_output = extract_after_output(eval_data_90[key]['test'][str(indx)]).strip()

                if j == 2:
                    train_data = eval_data_270[key]['train']
                    question = extract_before_output(eval_data_270[key]['test'][str(indx)]).strip()
                    request['prompt']+= f'{train_data}\n{question}\n\n###Output:\n'
                    correct_output = extract_after_output(eval_data_270[key]['test'][str(indx)]).strip()

                response = requests.post(url, json=request)
                current_solution = ast.literal_eval(response.text)["results"][0]['text'].strip()
                results_dict[key][str(indx)][str(j)] = current_solution


                if (correct_output == current_solution):
                    correct_list[indx] = True

                j += 1

        if correct_list[0] == True and correct_list[1] == True:
            correct_tasks.add(key)
    else:
        j=0
        while j < 3:
            request['prompt'] = DEFAULT_PROMPT+'\n'

            if j == 0:
                train_data = eval_data[key]['train']
                question = extract_before_output(eval_data[key]['test']).strip()
                request['prompt']+= f'{train_data}\n{question}\n\n###Output:\n'
                correct_output = extract_after_output(eval_data[key]['test']).strip()

            if j == 1:
                train_data = eval_data_90[key]['train']
                question = extract_before_output(eval_data_90[key]['test']).strip()
                request['prompt']+= f'{train_data}\n{question}\n\n###Output:\n'
                correct_output = extract_after_output(eval_data_90[key]['test']).strip()

            if j == 2:
                train_data = eval_data_270[key]['train']
                question = extract_before_output(eval_data_270[key]['test']).strip()
                request['prompt']+= f'{train_data}\n{question}\n\n###Output:\n'
                correct_output = extract_after_output(eval_data_270[key]['test']).strip()

            response = requests.post(url, json=request)
            current_solution = ast.literal_eval(response.text)["results"][0]['text'].strip()
            results_dict[key][str(j)] = current_solution


            if (correct_output == current_solution):
                correct_tasks.add(key)

            j += 1


In [None]:
print(f'Number of correct tasks: {len(list(correct_tasks))}')

In [None]:
model_name = 'Llama-3-8B'
filename = f'{model_name}_AugARC_Evaluation_Results_Dict.json'

with open(filename, 'w') as f:
    json.dump(results_dict, f)