# ARC-AGI Starter Notebook

This notebook provides a baseline solution for the ARC-AGI competition using a zero-shot approach with Qwen2.5-7B-Instruct. It loads the test dataset, processes grids, generates predictions, and creates submission files. Each code cell is explained to help beginners understand the process.

## Import Libraries

We import necessary libraries for data processing, machine learning, and logging:
- `json` and `csv` for handling input/output files.
- `numpy` for grid manipulation.
- `transformers` for loading DistilGPT2 model and tokenizer.
- `torch` for model inference.
- `logging` for tracking errors and progress.
- `re` for text parsing.

In [None]:
import torch
import json
import csv
import logging
from typing import Dict, List, Optional, Tuple
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
import numpy as np
import re

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


## Utility Functions

These functions convert grids to text and back for model input/output:
- `grid_to_text`: Converts a 2D grid of integers to a string, with each row as a continuous sequence of digits.
- `text_to_grid`: Parses model-generated text back to a 2D grid, handling potential errors and cleaning up text.

In [None]:
def grid_to_text(grid: List[List[int]]) -> str:
    """Convert a numeric grid to text representation."""
    lines = []
    for row in grid:
        row_str = ''.join(str(cell) for cell in row)  # Continuous digits
        lines.append(row_str)
    return '\n'.join(lines)

def text_to_grid(text: str) -> List[List[int]]:
    """Convert text back to numeric grid with improved parsing."""
    try:
        text = text.strip()
        text = re.sub(r'^(Output grid:|Predicted output:|Answer:|Result:)', '', text, flags=re.IGNORECASE)
        text = text.strip()
        lines = text.split('\n')
        grid = []
        for line in lines:
            line = line.strip()
            if not line:
                continue
            numbers = []
            for char in line:
                if char.isdigit():
                    numbers.append(int(char))
            if numbers:
                grid.append(numbers)
        if grid and all(len(row) == len(grid[0]) for row in grid):
            return grid
        return [[0]]
    except Exception as e:
        logger.error(f"Error parsing grid: {e}")
        return [[0]]

## Visualization Function

This function visualizes input and predicted grids side by side using Matplotlib. It helps understand the dataset and model predictions.
- Uses `tab10` colormap to map integers 0–9 to colors.
- Displays input and output/predicted grids for comparison.

In [None]:
import matplotlib.pyplot as plt

def plot_grid_pair(input_grid, output_grid, title="Grid Pair"):
    """Plot input and output grids side by side."""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
    ax1.imshow(np.array(input_grid), cmap='tab10', vmin=0, vmax=9)
    ax1.set_title("Input Grid")
    ax1.axis('off')
    ax2.imshow(np.array(output_grid), cmap='tab10', vmin=0, vmax=9)
    ax2.set_title("Output/Predicted Grid")
    ax2.axis('off')
    plt.suptitle(title)
    plt.show()

## ARCSolver Class

This class encapsulates the zero-shot solver using Qwen2.5-7B-Instruct:
- Initializes the model and tokenizer, with GPU support if available.
- `solve`: Processes a single test case, generating a prediction or falling back to copying the input.
- `predict_with_fallback`: Handles multiple test cases per problem.
- `pad_grid`: Adjusts grid size to match required rows.
- `is_valid_grid`: Validates predicted grids to ensure reasonable size and values.

In [None]:
class ARCSolver:
    def __init__(self, row_counts: Dict[str, int], model_name: str = "Qwen/Qwen2.5-7B-Instruct"):
        """
        Initialize solver with Qwen model for enhanced zero-shot reasoning.

        Args:
            row_counts: Dictionary mapping problem_id to required rows
            model_name: Hugging Face model name (default: Qwen2.5-7B-Instruct)
        """
        self.row_counts = row_counts
        self.model_name = model_name
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        # Initialize model and tokenizer
        self._load_model()

        # Generation configuration for better outputs
        self.generation_config = GenerationConfig(
            max_new_tokens=512,
            temperature=0.1,  # Low temperature for more deterministic outputs
            top_p=0.95,
            do_sample=True,
            pad_token_id=self.tokenizer.eos_token_id if self.tokenizer else None,
            repetition_penalty=1.1
        )

    def _load_model(self):
        """Load the specified model and tokenizer."""
        try:
            logger.info(f"Loading {self.model_name}...")

            # Load tokenizer
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_name,
                trust_remote_code=True,
                padding_side="left"
            )

            # Set pad token if not exists
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token

            # Load model with appropriate settings
            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                device_map="auto" if torch.cuda.is_available() else None,
                trust_remote_code=True,
                low_cpu_mem_usage=True
            )

            self.model.eval()
            logger.info(f"Successfully loaded {self.model_name} on {self.device}")

        except Exception as e:
            logger.error(f"Failed to load {self.model_name}: {e}")
            logger.info("Falling back to DistilGPT2...")
            self._load_fallback_model()

    def _load_fallback_model(self):
        """Load fallback model if main model fails."""
        try:
            self.model_name = "distilgpt2"
            self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2")
            self.model = AutoModelForCausalLM.from_pretrained("distilgpt2")

            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token

            self.model.to(self.device)
            self.model.eval()
            logger.info("Fallback model DistilGPT2 loaded successfully")

        except Exception as e:
            logger.error(f"Failed to load fallback model: {e}")
            self.model = None
            self.tokenizer = None

    def grid_to_text(self, grid: List[List[int]]) -> str:
        """Convert grid to readable text format."""
        return '\n'.join([''.join(map(str, row)) for row in grid])

    def text_to_grid(self, text: str) -> List[List[int]]:
        """
        Convert text back to grid format with robust parsing.

        Args:
            text: Generated text containing grid

        Returns:
            Parsed grid as List[List[int]]
        """
        try:
            lines = text.strip().split('\n')
            grid = []

            for line in lines:
                # Clean the line - remove non-digit characters except spaces
                cleaned = re.sub(r'[^0-9\s]', '', line.strip())
                if not cleaned:
                    continue

                # Convert to digits
                if ' ' in cleaned:
                    # Space-separated format
                    row = [int(x) for x in cleaned.split() if x.isdigit()]
                else:
                    # Concatenated format
                    row = [int(x) for x in cleaned if x.isdigit()]

                if row:  # Only add non-empty rows
                    grid.append(row)

            return grid if grid else [[0]]

        except Exception as e:
            logger.warning(f"Failed to parse grid from text: {e}")
            return [[0]]

    def create_enhanced_prompt(self, input_grid: List[List[int]], target_rows: int,
                             training_examples: List = None) -> str:
        """
        Create an enhanced prompt for better ARC reasoning.

        Args:
            input_grid: Input grid for the problem
            target_rows: Expected number of rows in output
            training_examples: Optional training examples for few-shot learning

        Returns:
            Formatted prompt string
        """
        input_text = self.grid_to_text(input_grid)
        rows, cols = len(input_grid), len(input_grid[0])

        prompt = f"""You are an expert at solving ARC (Abstraction and Reasoning Corpus) puzzles. These puzzles require identifying patterns and transformations in grids of colored cells (represented by digits 0-9).

TASK: Analyze the input grid and predict the output grid by identifying the underlying transformation pattern.

INPUT GRID ({rows}x{cols}):
{input_text}

ANALYSIS STEPS:
1. Identify unique values and their positions
2. Look for geometric patterns (lines, shapes, symmetries)
3. Check for directional transformations
4. Consider boundary effects and edge cases
5. Apply the transformation rule consistently

OUTPUT REQUIREMENTS:
- Generate exactly {target_rows} rows
- Each row should have {cols} digits (0-9)
- Maintain consistent column width
- Apply identified pattern systematically

OUTPUT GRID:"""

        return prompt

    def generate_prediction(self, prompt: str) -> str:
        """
        Generate prediction using the loaded model.

        Args:
            prompt: Input prompt for the model

        Returns:
            Generated text response
        """
        if not self.model or not self.tokenizer:
            return ""

        try:
            # Tokenize input
            inputs = self.tokenizer(
                prompt,
                return_tensors="pt",
                truncation=True,
                max_length=2048
            )
            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            # Generate response
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    generation_config=self.generation_config
                )

            # Decode response
            generated_text = self.tokenizer.decode(
                outputs[0],
                skip_special_tokens=True
            )

            # Extract only the new content (after prompt)
            response = generated_text[len(prompt):].strip()
            return response

        except Exception as e:
            logger.error(f"Generation failed: {e}")
            return ""

    def solve(self, problem: Dict, problem_id: str) -> List[List[int]]:
        """
        Solve a single test case using enhanced reasoning.

        Args:
            problem: Problem dictionary containing test cases
            problem_id: Unique identifier for the problem

        Returns:
            Predicted output grid
        """
        try:
            input_grid = problem['test'][0]['input']
            target_rows = self.row_counts.get(problem_id, len(input_grid))

            # Validate input
            if not self.is_valid_grid(input_grid):
                logger.warning(f"Invalid input grid for {problem_id}")
                return self.create_fallback_grid(input_grid, target_rows)

            # Create enhanced prompt
            prompt = self.create_enhanced_prompt(input_grid, target_rows)

            # Generate prediction
            response = self.generate_prediction(prompt)

            if response:
                # Parse generated grid
                predicted_grid = self.text_to_grid(response)

                # Validate and adjust prediction
                if self.is_valid_grid(predicted_grid):
                    adjusted_grid = self.adjust_grid_size(predicted_grid, target_rows, len(input_grid[0]))
                    if self.is_reasonable_prediction(adjusted_grid, input_grid):
                        return adjusted_grid

            # Fallback to input-based prediction
            logger.info(f"Using fallback for {problem_id}")
            return self.create_fallback_grid(input_grid, target_rows)

        except Exception as e:
            logger.error(f"Solve failed for {problem_id}: {e}")
            return self.create_fallback_grid(problem['test'][0]['input'],
                                           self.row_counts.get(problem_id, 11))

    def predict_with_fallback(self, problem: Dict, problem_id: str) -> List[List[List[int]]]:
        """
        Predict outputs for all test cases with enhanced fallback logic.

        Args:
            problem: Problem dictionary
            problem_id: Problem identifier

        Returns:
            List of predicted grids for each test case
        """
        predictions = []
        target_rows = self.row_counts.get(problem_id, 11)

        for i, test_case in enumerate(problem['test']):
            logger.info(f"Processing test case {i+1}/{len(problem['test'])} for {problem_id}")

            try:
                input_grid = test_case['input']

                # Create problem dict for single test case
                single_problem = {'test': [test_case]}
                prediction = self.solve(single_problem, problem_id)
                predictions.append(prediction)

            except Exception as e:
                logger.warning(f"Failed to process test case {i+1} for {problem_id}: {e}")
                fallback = self.create_fallback_grid(test_case['input'], target_rows)
                predictions.append(fallback)

        return predictions

    def adjust_grid_size(self, grid: List[List[int]], target_rows: int, target_cols: int) -> List[List[int]]:
        """
        Adjust grid to match target dimensions.

        Args:
            grid: Input grid to adjust
            target_rows: Desired number of rows
            target_cols: Desired number of columns

        Returns:
            Adjusted grid
        """
        if not grid:
            return [[0] * target_cols for _ in range(target_rows)]

        # Adjust rows
        if len(grid) > target_rows:
            grid = grid[:target_rows]
        elif len(grid) < target_rows:
            last_row = grid[-1] if grid else [0] * target_cols
            while len(grid) < target_rows:
                grid.append(last_row[:])

        # Adjust columns
        for i, row in enumerate(grid):
            if len(row) > target_cols:
                grid[i] = row[:target_cols]
            elif len(row) < target_cols:
                grid[i].extend([0] * (target_cols - len(row)))

        return grid

    def create_fallback_grid(self, input_grid: List[List[int]], target_rows: int) -> List[List[int]]:
        """
        Create a reasonable fallback grid based on input.

        Args:
            input_grid: Original input grid
            target_rows: Target number of rows

        Returns:
            Fallback grid
        """
        if not input_grid:
            return [[0] for _ in range(target_rows)]

        cols = len(input_grid[0])

        # Simple strategy: copy input and pad/truncate as needed
        if len(input_grid) >= target_rows:
            return input_grid[:target_rows]
        else:
            fallback = input_grid[:]
            # Pad with last row or zeros
            pad_row = input_grid[-1][:] if input_grid else [0] * cols
            while len(fallback) < target_rows:
                fallback.append(pad_row[:])
            return fallback

    def is_valid_grid(self, grid: List[List[int]]) -> bool:
        """
        Validate if grid format is correct.

        Args:
            grid: Grid to validate

        Returns:
            True if valid, False otherwise
        """
        try:
            if not grid or not grid[0]:
                return False

            # Check rectangular shape
            col_count = len(grid[0])
            if not all(len(row) == col_count for row in grid):
                return False

            # Check value range
            for row in grid:
                for val in row:
                    if not isinstance(val, int) or val < 0 or val > 9:
                        return False

            return True

        except Exception:
            return False

    def is_reasonable_prediction(self, predicted_grid: List[List[int]],
                               input_grid: List[List[int]]) -> bool:
        """
        Check if prediction is reasonable compared to input.

        Args:
            predicted_grid: Predicted output grid
            input_grid: Original input grid

        Returns:
            True if reasonable, False otherwise
        """
        try:
            # Basic validation
            if not self.is_valid_grid(predicted_grid):
                return False

            # Size reasonableness check
            pred_size = len(predicted_grid) * len(predicted_grid[0])
            input_size = len(input_grid) * len(input_grid[0])

            # Allow up to 4x larger or 4x smaller
            if pred_size > input_size * 4 or pred_size < max(1, input_size // 4):
                return False

            # Check for some variation (not all same values)
            flat_pred = [val for row in predicted_grid for val in row]
            if len(set(flat_pred)) == 1 and len(flat_pred) > 4:
                # All same values might be suspicious for larger grids
                return False

            return True

        except Exception:
            return False

## Load Test Data

Loads the test dataset from `test (1).json` into a dictionary for processing.

In [None]:
def load_test_data(test_path: str) -> Dict:
    """Load test JSON data."""
    with open(test_path, 'r') as f:
        test_data = json.load(f)
    return test_data

## Parse Sample Submission

Extracts the required number of rows for each problem from the sample submission CSV (`SampleSubmission (29).csv`). This ensures predictions match the expected output size.

In [None]:
def get_row_counts(sample_csv_path: str) -> Dict[str, int]:
    """Parse sample submission CSV to determine required rows per problem."""
    row_counts = {}
    with open(sample_csv_path, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            problem_id = '_'.join(row['ID'].split('_')[:2])
            row_counts[problem_id] = max(row_counts.get(problem_id, 0), int(row['ID'].split('_')[-1]))
    return row_counts

## Create Submission

Generates submission files in JSON and CSV formats:
- Iterates through test problems, predicting outputs for each test case.
- Handles multiple test cases per problem and formats IDs correctly.
- Includes error handling and fallback to input copying.
- Visualizes the first test case’s input and prediction.

In [None]:
def create_submission(test_data: Dict, model: ARCSolver,
                             output_file: str = 'submission.json',
                             csv_file: str = 'submission.csv') -> Tuple[Dict, List]:
    """
    Create submission files with enhanced error handling and progress tracking.

    Args:
        test_data: Dictionary of test problems
        model: Trained ARC solver model
        output_file: JSON output filename
        csv_file: CSV output filename

    Returns:
        Tuple of (submission_json, submission_csv)
    """
    submission_json = {}
    submission_csv = []
    total_problems = len(test_data)
    successful_predictions = 0

    print(f"\n Generating submissions for {total_problems} test problems...")
    print(f" Using model: {model.model_name}")
    print(f" Device: {model.device}")

    for i, (problem_id, problem) in enumerate(test_data.items()):
        # Progress tracking
        if (i + 1) % 5 == 0 or i == 0:
            print(f" Progress: {i+1}/{total_problems} ({(i+1)/total_problems*100:.1f}%)")

        try:
            num_test_cases = len(problem['test'])
            logger.info(f"Processing {problem_id} with {num_test_cases} test case(s)")

            # Generate predictions using enhanced model
            problem_for_prediction = {
                'train': problem.get('train', []),  # Include training if available
                'test': problem['test']
            }

            predictions = model.predict_with_fallback(problem_for_prediction, problem_id)

            # Validate predictions count
            if len(predictions) != num_test_cases:
                logger.warning(f"Prediction count mismatch for {problem_id}: "
                             f"expected {num_test_cases}, got {len(predictions)}")
                # Ensure correct number of predictions
                while len(predictions) < num_test_cases:
                    target_rows = model.row_counts.get(problem_id, 11)
                    fallback = model.create_fallback_grid(problem['test'][0]['input'], target_rows)
                    predictions.append(fallback)
                predictions = predictions[:num_test_cases]

            submission_json[problem_id] = predictions
            successful_predictions += 1

            # Generate CSV entries
            for matrix_idx, prediction in enumerate(predictions):
                matrix_num = matrix_idx + 1
                for row_idx, row in enumerate(prediction):
                    row_num = row_idx + 1
                    row_string = ''.join(str(cell) for cell in row)

                    # Create ID based on number of test cases
                    if num_test_cases > 1:
                        csv_id = f"{problem_id}_{matrix_num}_{row_num}"
                    else:
                        csv_id = f"{problem_id}_{row_num}"

                    submission_csv.append({
                        'ID': csv_id,
                        'row': row_string
                    })

        except Exception as e:
            logger.error(f"Error processing {problem_id}: {e}")

            # Enhanced fallback handling
            target_rows = model.row_counts.get(problem_id, 11)
            fallback_predictions = []

            for test_case in problem['test']:
                fallback_grid = model.create_fallback_grid(test_case['input'], target_rows)
                fallback_predictions.append(fallback_grid)

            submission_json[problem_id] = fallback_predictions

            # Generate CSV for fallback
            for matrix_idx, fallback_grid in enumerate(fallback_predictions):
                matrix_num = matrix_idx + 1
                for row_idx, row in enumerate(fallback_grid):
                    row_num = row_idx + 1
                    row_string = ''.join(str(cell) for cell in row)

                    if len(fallback_predictions) > 1:
                        csv_id = f"{problem_id}_{matrix_num}_{row_num}"
                    else:
                        csv_id = f"{problem_id}_{row_num}"

                    submission_csv.append({
                        'ID': csv_id,
                        'row': row_string
                    })

    # Save files
    print(f"\n Saving submission files...")

    # Save JSON
    with open(output_file, 'w') as f:
        json.dump(submission_json, f, separators=(',', ':'))

    # Save CSV
    with open(csv_file, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=['ID', 'row'])
        writer.writeheader()
        writer.writerows(submission_csv)

    # Summary statistics
    print(f"\n Submission Generation Complete!")
    print(f" Summary:")
    print(f"   JSON format: {output_file}")
    print(f"   CSV format: {csv_file}")
    print(f"   Total problems: {len(submission_json)}")
    print(f"   Successful predictions: {successful_predictions}")
    print(f"   Total CSV rows: {len(submission_csv)}")
    print(f"   Success rate: {successful_predictions/total_problems*100:.1f}%")

    # Show sample entries
    if submission_csv:
        print(f"\n Sample CSV entries (first 5 rows):")
        for i in range(min(5, len(submission_csv))):
            entry = submission_csv[i]
            print(f"   {entry['ID']}: {entry['row'][:50]}{'...' if len(entry['row']) > 50 else ''}")

    # Multi-test case analysis
    multi_test_problems = [(p_id, p) for p_id, p in test_data.items() if len(p['test']) > 1]
    if multi_test_problems:
        print(f"\n Multi-test case problems: {len(multi_test_problems)}")
        example_id, example_prob = multi_test_problems[0]
        print(f"   Example '{example_id}': {len(example_prob['test'])} test cases")

        if example_id in submission_json:
            for i, pred in enumerate(submission_json[example_id]):
                print(f"   Matrix {i+1}: {len(pred)}×{len(pred[0]) if pred else 0}")

    return submission_json, submission_csv

## Main Execution

Executes the pipeline:
- Loads test data and row counts.
- Initializes the solver and generates submissions.
- Handles errors and logs progress.

In [None]:

test_path = "/content/test (2).json"
sample_csv_path = "/content/SampleSubmission (30).csv"
output_json = "/content/submission.json"
output_csv = "/content/submission.csv"

try:
    test_data = load_test_data(test_path)
    logger.info(f"Loaded {len(test_data)} test problems")

    row_counts = get_row_counts(sample_csv_path)
    logger.info(f"Determined row counts for {len(row_counts)} problems")

    solver = ARCSolver(
        row_counts=row_counts,
        model_name="Qwen/Qwen2.5-1.5B-Instruct"
    )
    submission_json, submission_csv = create_submission(test_data, solver, output_json, output_csv)

    print(f"\n Submission generation completed successfully!")

except Exception as e:
    print(f" Submission generation failed: {e}")
    logger.error(f"Submission generation failed: {e}")

## Next Steps

- Run the notebook to generate `submission.json` and `submission.csv`.
- Visualize more test cases by modifying the `plot_grid_pair` call in the `create_submission` function.
- Experiment with different prompts or models to improve predictions.
- Share your results or questions on the Zindi leaderboard discussion!

In [None]:
import pandas as pd
pd.read_csv('submission.csv')

In [None]:
pd.read_csv("/content/SampleSubmission (30).csv")