# Write python code for training tasks

## Goal

Write python code that implements training tasks and also creates the input distributions.

## Imports

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys
import os
import glob
import json
import random
import inspect
from itertools import islice
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import pandas as pd
from tqdm.auto import tqdm
import pyperclip
from IPython.display import Markdown, display

# add path to python path
sys.path.append(os.path.realpath('../scripts/'))
from arc24.data import load_arc_data_with_solutions
from evaluation import plot_grids, plot_grid, plot_task
from arc24.logging import logging
from arc24.data_augmentation import apply_data_augmentation, get_random_color_map, get_random_geometric_augmentation_params

logger = logging.getLogger(__name__)

sys.path.append(os.path.realpath('../arc'))
import training_inputs
import training_tasks

plt.plot()
plt.close('all')
plt.rcParams["figure.figsize"] = (25, 2)
mpl.rcParams['lines.linewidth'] = 3
mpl.rcParams['font.size'] = 16

## Code

In [None]:
eval_data = load_arc_data_with_solutions('/mnt/hdd0/Kaggle/arc24/data/arc-agi_evaluation_challenges.json')
train_data = load_arc_data_with_solutions('/mnt/hdd0/Kaggle/arc24/data/arc-agi_training_challenges.json')

In [None]:
def plot_grids_with_shape(grids, suptitle=None, facecolor='white'):
    plt.figure(facecolor=facecolor)
    for plot_idx, grid in enumerate(grids):
        plt.subplot(1, len(grids), plot_idx + 1)
        plot_grid(grid)
        plt.title(f'{len(grid)}x{len(grid[0])}')
    if suptitle is not None:
        # plt.suptitle(suptitle)
        # plt.tight_layout(pad=0.2)
        print(suptitle)
    plt.show()

In [None]:
def visualize_train_task(task_id):
    function_intro = f'def task_{task_id}(grid):'
    print(function_intro)
    pyperclip.copy(function_intro)

    inputs = [sample['input'] for sample in train_data[task_id]['train'] + train_data[task_id]['test']]
    outputs = [sample['output'] for sample in train_data[task_id]['train'] + train_data[task_id]['test']]
    plot_grids_with_shape(inputs, 'Task Inputs')
    plot_grids_with_shape(outputs, 'Ground truth Outputs', facecolor='gray')

    try:
        inputs = [getattr(training_inputs, f'task_{task_id}')() for _ in range(5)]
        plot_grids_with_shape(inputs, 'Generated Inputs')
    except AttributeError:
        logger.warning('Input generation function not found')
    except (NameError, NotImplementedError):
        logger.warning('Input generation is implemented, but it is calling not implemented functions')
    try:
        outputs = [getattr(training_tasks, f'task_{task_id}')(i) for i in inputs]
        plot_grids_with_shape(outputs, 'Generated Outputs', facecolor='gray')
    except AttributeError:
        logger.warning('Task function not found')
    except (NameError, NotImplementedError):
        logger.warning('Task function is implemented, but it is calling not implemented functions')

In [None]:
from matplotlib.colors import LinearSegmentedColormap

# Define the colors and the corresponding positions
colors = [(1, 0, 0), (1, 1, 0), (0, 1, 0)]  # Red, Yellow, Green
positions = [0, 0.5, 1]  # At 0 -> red, 0.5 -> yellow, 1 -> green
# Create the colormap
custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", list(zip(positions, colors)))

def measure_progress(module):
    progress = []
    task_ids = list(train_data.keys())
    for task_id in task_ids:
        try:
            task_function = getattr(module, f'task_{task_id}')
            function_parameters = inspect.signature(task_function).parameters
            if function_parameters:
                task_function(**create_dummy_parameters(function_parameters))
            else:
                task_function()
            progress.append('done')
        except AttributeError as e:
            progress.append('not implemented')
        except (NameError, NotImplementedError):
            progress.append('implemented but not functional')
    numeric_progress = map({'done': 1, 'not implemented': 0, 'implemented but not functional': 0.5}.get, progress)
    numeric_progress = np.array(list(numeric_progress))
    print(f'Fully functional tasks: {np.mean(numeric_progress == 1):.1%} ({np.sum(numeric_progress == 1)})')
    print(f'Implemented tasks: {np.mean(numeric_progress > 0):.1%} ({np.sum(numeric_progress > 0)})')
    plt.imshow(np.array(list(numeric_progress)).reshape(1, -1), cmap=custom_cmap, aspect='auto')

def create_dummy_parameters(function_parameters):
    kwargs = {}
    if 'grid' in function_parameters:
        kwargs['grid'] = np.eye(3, dtype=int).tolist()
    return kwargs

## Visualize tasks

In [None]:
plot_grid([np.arange(10).tolist()], write_numbers=True)

In [None]:
visualize_train_task(task_id=list(train_data.keys())[0]) #69

## Measure progress

In [None]:
measure_progress(training_tasks); plt.title('Task implementation progress');

In [None]:
measure_progress(training_inputs); plt.title('Input generation progress');

## Study data augmentation

In [None]:
import re

def study_data_augmentation(task_id, use_data_augmentation=True):
    if task_id.startswith('task_'): task_id = task_id[5:]
    print(f'Task {task_id}')

    task = train_data[task_id]
    source_code = inspect.getsource(getattr(training_tasks, f'task_{task_id}'))
    if use_data_augmentation:
        color_map = get_random_color_map()
        geometric_augmentation_params = get_random_geometric_augmentation_params()
        task = apply_data_augmentation(
            task, color_map=color_map, **geometric_augmentation_params)
        source_code = update_source_code_to_data_augmentation(source_code, color_map, geometric_augmentation_params)

    plt.figure(figsize=(25, 4)); plot_task(task); plt.show()
    formatted_source_code = f"```python\n{source_code}\n```\n"
    display(Markdown(formatted_source_code))

def update_source_code_to_data_augmentation(code, color_map, geometric_augmentation_params):
    # TODO: deal with horizontal flip
    if geometric_augmentation_params['n_rot90'] in [1, 3]:
        code = swap_axes(code)
    code = update_code_to_colormap(code, color_map)
    code = remove_comments(code)
    return code

def swap_axes(text):
    # Use regex to find 'axis=0' or 'axis=1' and swap them
    new_text = ''
    for line in text.splitlines():
        if line.endswith('# skip-augmentation'):
            new_text += line + '\n'
            continue
        new_text += re.sub(r'axis=(\d)', lambda match: 'axis=1' if match.group(1) == '0' else 'axis=0', line) + '\n'
    return new_text

def update_code_to_colormap(text, color_map):
    text = replace_colors(text, color_map)
    text = update_color_map(text, color_map)
    return text
    new_text = ''
    # for line in text.splitlines():
    #     if line.endswith('# skip-augmentation'):
    #         new_text += line + '\n'
    #         continue
    #     new_text += re.sub(r'cmap=.*', f"cmap='{color_map}'", line) + '\n'
    return new_text

def replace_colors(text, color_map):
    # Regex to match any digit after an equal sign and optional spaces (e.g., color=2, moving_object_color = 2)
    return re.sub(r'=\s*(\d+)', lambda match: f"= {color_map.get(int(match.group(1)), match.group(1))}", text)



def update_color_map(text, aug_color_map):
    # Define a function that will update each pair in the color_map
    def replace_color_map(match):
        # Extract the color_map string from the match
        color_map_str = match.group(1)
        # Evaluate the color_map string into a Python dictionary
        original_color_map = eval(f"{{{color_map_str}}}")
        
        # Create a new updated color_map based on the augmentation map
        updated_color_map = {aug_color_map.get(k, k): aug_color_map.get(v, v) for k, v in original_color_map.items()}
        
        # Return the updated color_map in string format
        return f'color_map={updated_color_map}'

    # Regex to match the color_map={...} pattern
    updated_text = re.sub(r'color_map=\{([^}]+)\}', replace_color_map, text)
    
    return updated_text


def remove_comments(text):
    new_text = ''
    for line in text.splitlines():
        new_text += re.sub(r'#.*', '', line) + '\n'
    return new_text


In [None]:
study_data_augmentation(task_id='task_05f2a901', use_data_augmentation=True)

In [None]:
plot_grid([np.arange(10).tolist()], write_numbers=True)

## Architecture design

### V1

In [None]:
import numpy as np
import inspect
from functools import partial

In [None]:
def combine_pipeline_into_single_function(pipeline):
    code = 'def pipeline(grid):\n'
    

In [None]:
def generate_pipeline_code(pipeline):
    import inspect
    from functools import partial

    code_lines = []
    code_lines.append('def pipeline(grid):')
    indent = '    '
    first_function = True
    for p in pipeline:
        # Determine if p is a partial function or a regular function
        if isinstance(p, partial):
            func = p.func
            args = p.args
            keywords = p.keywords
        else:
            func = p
            args = ()
            keywords = {}

        # Assign parameter values from partial function
        for param_name, value in keywords.items():
            code_lines.append(f'{indent}{param_name} = {repr(value)}')

        # Get source code of the function
        source = inspect.getsource(func)
        source_lines = source.strip().split('\n')

        # Remove the 'def' line
        body_lines = source_lines[1:]

        # Adjust indentation and remove 'return' statements
        body_lines = [
            line.lstrip()
            for line in body_lines
            if not line.strip().startswith('return')
        ]

        # Determine input variable
        input_var = func.__code__.co_varnames[0]
        if first_function:
            input_var_in_code = 'grid'
            first_function = False
        else:
            input_var_in_code = 'output'

        # Replace input variable names in the function body
        body_lines = [
            line.replace(input_var, input_var_in_code)
            for line in body_lines
        ]

        # Append the function body to code_lines
        for line in body_lines:
            code_lines.append(indent + line)

    # Add the final return statement
    code_lines.append(f'{indent}return output')

    # Join all lines into a single string
    return '\n'.join(code_lines)


In [None]:
print(generate_pipeline_code(pipeline = [partial(rotate, angle=90), partial(flip, axis=0)]
))

In [None]:

def rotate(grid, angle):
    # Rotate the grid by the given angle (90, 180, 270)
    output = np.rot90(grid, k=angle // 90)
    for _ in range(2):
        print('hello')
    return output

def flip(grid, axis):
    # Flip the grid horizontally or vertically
    output = np.flip(grid, axis)
    return output

# pipeline.py

class Pipeline:
    def __init__(self, *functions):
        self.functions = functions

    def apply(self, grid):
        for func in self.functions:
            grid = func(grid)
        return grid

    def generate_code(self):
        code_lines = []
        for func in self.functions:
            code = inspect.getsource(func)
            code_lines.append(code)
        return '\n'.join(code_lines)

# code_generator.py

def combine_functions(pipeline):
    code_lines = ['def transform(grid):']
    indent = '    '
    code_lines.append(f'{indent}import copy')
    code_lines.append(f'{indent}grid = copy.deepcopy(grid)')
    for func in pipeline.functions:
        func_code = inspect.getsource(func).split('\n')
        func_code = [indent + line for line in func_code if not line.startswith('def')]
        code_lines.extend(func_code)
        func_call = f'{indent}grid = {func.__name__}(grid)'
        code_lines.append(func_call)
    code_lines.append(f'{indent}return grid')
    return '\n'.join(code_lines)

functions = Pipeline(rotate, flip)
print(functions.generate_code())

print(combine_functions(functions))

### V2

To ease the process of combining the functions into a single function I will use by convention the same variable for input and output: `grid`

Additionally always use keywords on partial.

In [None]:
import numpy as np
import inspect
from functools import partial
from IPython.display import Markdown, display

def rotate(grid, angle):
    grid = np.rot90(grid, k=angle // 90)
    return grid

def flip(grid, axis):
    grid = np.flip(grid, axis)
    return grid

def foo(grid):
    for i in range(10):
        print(i)
    return grid

In [None]:
def generate_pipeline_code(pipeline):
    code_lines = []
    code_lines.append('def pipeline(grid):')
    indent = '    '
    for p in pipeline:
        # Determine if p is a partial function or a regular function
        if isinstance(p, partial):
            func = p.func
            keywords = p.keywords
        else:
            func = p
            keywords = {}

        # Assign parameter values from partial function
        for param_name, value in keywords.items():
            code_lines.append(f'{indent}{param_name} = {repr(value)}')

        # Get source code of the function
        source = inspect.getsource(func)
        source_lines = source.strip().split('\n')

        # Remove the 'def' and 'return' lines
        body_lines = source_lines[1:-1]
        code_lines.extend(body_lines)

    # Add the final return statement
    code_lines.append(f'{indent}return grid')

    # Join all lines into a single string
    return '\n'.join(code_lines)

code = generate_pipeline_code(pipeline = [
    partial(rotate, angle=90),
    partial(flip, axis=0),
    foo])
display(Markdown(f'```python\n{code}\n```'))

This looks good and it could work, I just have to be careful with the code. I might test that all the functions follow that criteria.

In [None]:
def verify_function_follows_convention(function):
    assert inspect.getsource(function).splitlines()[-1].endswith('return grid')
    assert 'grid' in inspect.signature(function).parameters

[verify_function_follows_convention(function) for function in [rotate, flip, foo]];

I could also automatically test that the code generated by combining the functions of the pipeline gives the same results as the code from the pipeline. Functions need to be deterministic to do this.

### V3

Let's combine all together.

In [None]:
import numpy as np
import inspect
from functools import partial
from IPython.display import Markdown, display


class Pipeline:
    def __init__(self, *functions):
        self.functions = functions
        [verify_function_follows_convention(function) for function in functions]

    def apply(self, grid):
        grid = np.array(grid, dtype=np.int8)
        for func in self.functions:
            grid = func(grid)
        grid = grid.tolist()
        return grid

    def generate_code(self):
        # TODO: maybe assign a name to the pipeline function
        return generate_pipeline_code(self.functions)

def generate_pipeline_code(functions, function_name='pipeline', parameter_name='grid'):
    code_lines = []
    code_lines.append(f'def {function_name}({parameter_name}):')
    indent = '    '
    for function in functions:
        if isinstance(function, partial):
            keywords = function.keywords
            function = function.func
        else:
            keywords = {key: value.default for key, value in inspect.signature(function).parameters.items() if key != parameter_name}
        for param_name, value in keywords.items():
            code_lines.append(f'{indent}{param_name} = {repr(value)}')

        source = inspect.getsource(function)
        source_lines = source.strip().split('\n')
        body_lines = source_lines[1:-1] # Remove the 'def' and 'return' lines
        code_lines.extend(body_lines)

    # Add the final return statement
    code_lines.append(f'{indent}return {parameter_name}')
    return '\n'.join(code_lines)

def verify_function_follows_convention(function):
    """ All functions in the pipeline should follow the convention of taking a grid as input and returning a grid """
    if isinstance(function, partial):
        function = function.func
    assert inspect.getsource(function).splitlines()[-1].endswith('return grid')
    assert 'grid' in inspect.signature(function).parameters

In [None]:
def rotate(grid, angle):
    grid = np.rot90(grid, k=angle // 90)
    return grid

def flip(grid, axis):
    grid = np.flip(grid, axis)
    return grid

def foo(grid, n=10):
    for i in range(n):
        print(i)
    return grid

In [None]:
pipeline = Pipeline(
    partial(rotate, angle=90),
    partial(flip, axis=0),
    foo)
code = pipeline.generate_code()
display(Markdown(f'```python\n{code}\n```'))

In [None]:
pipeline = Pipeline(
    partial(rotate, angle=90),)
code = pipeline.generate_code()
display(Markdown(f'```python\n{code}\n```'))
pipeline.apply(np.eye(3))

In [None]:
class Task():
    def __init__(self, input_generator, input_augmentation, task_pipeline):
        self.input_generator = input_generator
        self.input_augmentation = input_augmentation
        self.task_pipeline = task_pipeline

## TODO

- [x] Create stats about the progress of the tasks implementation
- [ ] Is the implementation robust to data augmentation?