In [None]:
import numpy as np
import torch
from task import Task
import trainer
from tqdm import tqdm

def is_static_cell(input_grid, output_grid, r, c):
    """
    Determine if a cell is static (unchanged) between input and output grids.
    A static cell has the same color and same color neighbors (NESW).
    
    Args:
        input_grid: Input grid as numpy array
        output_grid: Output grid as numpy array
        r: Row index
        c: Column index
        
    Returns:
        Boolean indicating if the cell is static
    """
    # Check if shapes match for this comparison
    if input_grid.shape != output_grid.shape:
        return False
    
    # Check if the cell value is the same
    if input_grid[r, c] != output_grid[r, c]:
        return False
    
    # Check if neighbors have the same values
    directions = [(-1, 0), (0, 1), (1, 0), (0, -1)]
    for dr, dc in directions:
        nr, nc = r + dr, c + dc
        # Check if neighbor is within bounds
        if 0 <= nr < input_grid.shape[0] and 0 <= nc < input_grid.shape[1]:
            # If any neighbor changed, the cell is not static
            if input_grid[nr, nc] != output_grid[nr, nc]:
                return False
    
    # If we get here, the cell has the same value and all neighbors have the same values
    return True

def pad_grids_to_same_size(grid1, grid2, padding_value=10):
    """
    Pad both grids to the same size using the specified padding value.
    
    Args:
        grid1: First grid as numpy array
        grid2: Second grid as numpy array
        padding_value: Value to use for padding (default: 10)
        
    Returns:
        Tuple of (padded_grid1, padded_grid2)
    """
    max_rows = max(grid1.shape[0], grid2.shape[0])
    max_cols = max(grid1.shape[1], grid2.shape[1])
    
    # Create padded grids
    padded_grid1 = np.full((max_rows, max_cols), padding_value, dtype=grid1.dtype)
    padded_grid2 = np.full((max_rows, max_cols), padding_value, dtype=grid2.dtype)
    
    # Copy original grids into padded grids
    padded_grid1[:grid1.shape[0], :grid1.shape[1]] = grid1
    padded_grid2[:grid2.shape[0], :grid2.shape[1]] = grid2
    
    return padded_grid1, padded_grid2

def evaluate_model(model, tasks):
    """
    Evaluate model performance on a set of tasks with explicit shape prediction metrics.
    Ignores static cells (cells that don't change between input and output) when calculating accuracy.
    
    Args:
        model: The model to evaluate
        tasks: List of Task objects
        
    Returns:
        Dictionary of evaluation metrics including node-level accuracy and shape prediction metrics
    """
    total_tasks = len(tasks)
    total_train_grids = sum(len(task.train_pairs) for task in tasks)
    total_test_grids = sum(len(task.test_pairs) for task in tasks)
    total_grids = total_train_grids + total_test_grids
    
    correct_tasks = 0
    correct_train_grids = 0
    correct_test_grids = 0
    
    # Node-level accuracy tracking
    total_train_nodes = 0
    total_test_nodes = 0
    correct_train_nodes = 0
    correct_test_nodes = 0
    
    # Shape prediction metrics
    total_shape_predictions = 0
    correct_shape_predictions = 0
    
    # Evaluate each task
    with tqdm(total=len(tasks), desc="Evaluating model on tasks") as pbar:
        for task in tasks:
            # Run model to get predictions
            predictions = model.solve(task)
            
            # Check predictions match
            if len(predictions) != len(task.test_pairs):
                print(f"Warning: Number of predictions ({len(predictions)}) doesn't match number of test pairs ({len(task.test_pairs)}) for task {task.task_id}")
            
            # Count correct train grids and nodes
            train_correct = 0

            train_predictions = model.solve(task)
            
            for i, ((input_grid, expected), predicted) in enumerate(zip(task.train_pairs, train_predictions)):
                input_np = np.array(input_grid)
                expected_np = np.array(expected)
                
                # Pad grids to same size if shapes don't match
                if input_np.shape != expected_np.shape:
                    input_padded, expected_padded = pad_grids_to_same_size(input_np, expected_np)
                else:
                    input_padded, expected_padded = input_np, expected_np
                
                # Pad prediction to match expected size
                if predicted.shape != expected_padded.shape:
                    predicted_padded = np.full(expected_padded.shape, 10, dtype=predicted.dtype)
                    min_rows = min(predicted.shape[0], expected_padded.shape[0])
                    min_cols = min(predicted.shape[1], expected_padded.shape[1])
                    predicted_padded[:min_rows, :min_cols] = predicted[:min_rows, :min_cols]
                else:
                    predicted_padded = predicted
                
                # Calculate static cell mask
                static_mask = np.zeros(expected_padded.shape, dtype=bool)
                for r in range(expected_padded.shape[0]):
                    for c in range(expected_padded.shape[1]):
                        static_mask[r, c] = is_static_cell(input_padded, expected_padded, r, c)
                
                # Calculate accuracy ignoring static cells
                non_static_mask = ~static_mask
                
                # For grid-level accuracy: check if all non-static cells are correct
                if non_static_mask.sum() == 0:
                    # If all cells are static, grid is correct if prediction matches expected
                    is_correct = np.array_equal(predicted_padded, expected_padded)
                else:
                    # Check non-static cells only
                    non_static_correct = (predicted_padded[non_static_mask] == expected_padded[non_static_mask])
                    is_correct = non_static_correct.all()
                
                # For node-level accuracy: count correct non-static cells
                if non_static_mask.sum() == 0:
                    # If all cells are static, use traditional accuracy
                    train_grid_total_nodes = expected_padded.size
                    train_grid_correct_nodes = np.sum(predicted_padded == expected_padded)
                else:
                    # Count only non-static cells
                    train_grid_total_nodes = non_static_mask.sum()
                    train_grid_correct_nodes = np.sum(predicted_padded[non_static_mask] == expected_padded[non_static_mask])
                    
                    # Add penalty for incorrectly predicted static cells (optional, can be adjusted)
                    static_incorrect = np.sum((predicted_padded[static_mask] != expected_padded[static_mask]))
                    if static_incorrect > 0:
                        # Add static incorrect cells to total count (penalty)
                        train_grid_total_nodes += static_incorrect
                
                total_train_nodes += train_grid_total_nodes
                correct_train_nodes += train_grid_correct_nodes
                    
                # Update grid level metrics
                if is_correct:
                    train_correct += 1
                    correct_train_grids += 1
            
            # Count correct test grids and nodes
            test_correct = 0
            
            for i, ((input_grid, expected), predicted) in enumerate(zip(task.test_pairs, predictions)):
                input_np = np.array(input_grid)
                expected_np = np.array(expected)
                
                # Track shape prediction accuracy
                total_shape_predictions += 1
                shape_is_correct = (predicted.shape == expected_np.shape)
                if shape_is_correct:
                    correct_shape_predictions += 1
                
                # Pad grids to same size if shapes don't match
                if input_np.shape != expected_np.shape:
                    input_padded, expected_padded = pad_grids_to_same_size(input_np, expected_np)
                else:
                    input_padded, expected_padded = input_np, expected_np
                
                # Pad prediction to match expected size
                if predicted.shape != expected_padded.shape:
                    predicted_padded = np.full(expected_padded.shape, 10, dtype=predicted.dtype)
                    min_rows = min(predicted.shape[0], expected_padded.shape[0])
                    min_cols = min(predicted.shape[1], expected_padded.shape[1])
                    predicted_padded[:min_rows, :min_cols] = predicted[:min_rows, :min_cols]
                else:
                    predicted_padded = predicted
                
                # Calculate static cell mask
                static_mask = np.zeros(expected_padded.shape, dtype=bool)
                for r in range(expected_padded.shape[0]):
                    for c in range(expected_padded.shape[1]):
                        static_mask[r, c] = is_static_cell(input_padded, expected_padded, r, c)
                
                # Calculate accuracy ignoring static cells
                non_static_mask = ~static_mask
                
                # For grid-level accuracy: check if all non-static cells are correct
                if non_static_mask.sum() == 0:
                    # If all cells are static, grid is correct if prediction matches expected
                    is_correct = np.array_equal(predicted_padded, expected_padded)
                else:
                    # Check non-static cells only
                    non_static_correct = (predicted_padded[non_static_mask] == expected_padded[non_static_mask])
                    is_correct = non_static_correct.all()
                
                # For node-level accuracy: count correct non-static cells
                if non_static_mask.sum() == 0:
                    # If all cells are static, use traditional accuracy
                    test_grid_total_nodes = expected_padded.size
                    test_grid_correct_nodes = np.sum(predicted_padded == expected_padded)
                else:
                    # Count only non-static cells
                    test_grid_total_nodes = non_static_mask.sum()
                    test_grid_correct_nodes = np.sum(predicted_padded[non_static_mask] == expected_padded[non_static_mask])
                    
                    # Add penalty for incorrectly predicted static cells (optional, can be adjusted)
                    static_incorrect = np.sum((predicted_padded[static_mask] != expected_padded[static_mask]))
                    if static_incorrect > 0:
                        # Add static incorrect cells to total count (penalty)
                        test_grid_total_nodes += static_incorrect
                
                total_test_nodes += test_grid_total_nodes
                correct_test_nodes += test_grid_correct_nodes
                    
                # Update grid level metrics
                if is_correct:
                    test_correct += 1
                    correct_test_grids += 1
            
            # A task is correct if all its test pairs are correct
            if test_correct == len(task.test_pairs):
                correct_tasks += 1
            pbar.update(1)
    
    # Calculate overall grid metrics
    correct_grids = correct_train_grids + correct_test_grids
    train_grid_accuracy = correct_train_grids / total_train_grids if total_train_grids > 0 else 0
    test_grid_accuracy = correct_test_grids / total_test_grids if total_test_grids > 0 else 0
    overall_grid_accuracy = correct_grids / total_grids if total_grids > 0 else 0
    task_accuracy = correct_tasks / total_tasks if total_tasks > 0 else 0
    
    # Calculate shape prediction metrics
    shape_accuracy = correct_shape_predictions / total_shape_predictions if total_shape_predictions > 0 else 0
    
    # Calculate node-level metrics
    total_nodes = total_train_nodes + total_test_nodes
    correct_nodes = correct_train_nodes + correct_test_nodes
    train_node_accuracy = correct_train_nodes / total_train_nodes if total_train_nodes > 0 else 0
    test_node_accuracy = correct_test_nodes / total_test_nodes if total_test_nodes > 0 else 0
    overall_node_accuracy = correct_nodes / total_nodes if total_nodes > 0 else 0
    
    # Print summary
    print(f"\nEvaluation Results (Static-Cell-Aware):")
    print(f"Correct tasks: {correct_tasks}/{total_tasks} ({task_accuracy:.2%})")
    print(f"Correct test grids: {correct_test_grids}/{total_test_grids} ({test_grid_accuracy:.2%})")
    print(f"Shape prediction accuracy: {correct_shape_predictions}/{total_shape_predictions} ({shape_accuracy:.2%})")
    print(f"Correct train grids: {correct_train_grids}/{total_train_grids} ({train_grid_accuracy:.2%})")
    print(f"Correct total grids: {correct_grids}/{total_grids} ({overall_grid_accuracy:.2%})")
    print(f"\nNode-level Accuracy (ignoring static cells):")
    print(f"Test nodes: {correct_test_nodes}/{total_test_nodes} ({test_node_accuracy:.2%})")
    print(f"Train nodes: {correct_train_nodes}/{total_train_nodes} ({train_node_accuracy:.2%})")
    print(f"Overall nodes: {correct_nodes}/{total_nodes} ({overall_node_accuracy:.2%})")
    print(f"\nNote: Static cells (unchanged between input/output with same neighbors) are ignored in accuracy calculation.")
    
    return {
        "task_accuracy": task_accuracy,
        "test_grid_accuracy": test_grid_accuracy,
        "shape_accuracy": shape_accuracy,
        "train_grid_accuracy": train_grid_accuracy,
        "overall_grid_accuracy": overall_grid_accuracy,
        "correct_tasks": correct_tasks,
        "total_tasks": total_tasks,
        "correct_test_grids": correct_test_grids,
        "total_test_grids": total_test_grids,
        "correct_shape_predictions": correct_shape_predictions,
        "total_shape_predictions": total_shape_predictions,
        "correct_train_grids": correct_train_grids,
        "total_train_grids": total_train_grids,
        "correct_grids": correct_grids,
        "total_grids": total_grids,
        # Node-level metrics
        "test_node_accuracy": test_node_accuracy,
        "train_node_accuracy": train_node_accuracy,
        "overall_node_accuracy": overall_node_accuracy,
        "correct_test_nodes": correct_test_nodes,
        "total_test_nodes": total_test_nodes,
        "correct_train_nodes": correct_train_nodes,
        "total_train_nodes": total_train_nodes,
        "correct_nodes": correct_nodes,
        "total_nodes": total_nodes
    }

def preprocess_task_graphs(tasks, padding_value=10):
    """
    Preprocess all graphs in the given tasks once, instead of during each solve call.
    
    Args:
        tasks: List of Task objects
        padding_value: Padding value for standardizing dimensions
        
    Returns:
        The tasks with preprocessed graphs
    """
    import torch
    
    expected_dim = 3  # Standardized input dimension
    
    for task in tasks:
        # Preprocess test graphs
        for test_graph in task.test_graphs:
            if hasattr(test_graph, 'x'):
                # Convert from one-hot to class labels if needed
                if test_graph.x.dim() == 2 and test_graph.x.size(1) == 11:
                    test_graph.x = test_graph.x.argmax(dim=1)

                # Ensure x is long and 2D
                test_graph.x = test_graph.x.long()
                if test_graph.x.dim() == 1:
                    test_graph.x = test_graph.x.unsqueeze(1)  # Shape: (nodes, 1)

                # Standardize shape to (nodes, expected_dim)
                if test_graph.x.size(1) < expected_dim:
                    pad = torch.full((test_graph.x.size(0), expected_dim), padding_value, dtype=torch.long)
                    pad[:, :test_graph.x.size(1)] = test_graph.x
                    test_graph.x = pad
                elif test_graph.x.size(1) > expected_dim:
                    test_graph.x = test_graph.x[:, :expected_dim]

                # Extract positional info
                test_graph.pos = test_graph.x[:, 1:3].float() if expected_dim >= 3 else None
                test_graph.x = test_graph.x[:, 0].long().unsqueeze(1)  # Final x: shape (nodes, 1)
                
                # Mark as preprocessed
                test_graph.preprocessed = True
        
        # If the task also has train graphs, preprocess them too
        if hasattr(task, 'train_graphs'):
            for train_graph in task.train_graphs:
                if hasattr(train_graph, 'x'):
                    # Convert from one-hot to class labels if needed
                    if train_graph.x.dim() == 2 and train_graph.x.size(1) == 11:
                        train_graph.x = train_graph.x.argmax(dim=1)
    
                    # Ensure x is long and 2D
                    train_graph.x = train_graph.x.long()
                    if train_graph.x.dim() == 1:
                        train_graph.x = train_graph.x.unsqueeze(1)  # Shape: (nodes, 1)
    
                    # Standardize shape to (nodes, expected_dim)
                    if train_graph.x.size(1) < expected_dim:
                        pad = torch.full((train_graph.x.size(0), expected_dim), padding_value, dtype=torch.long)
                        pad[:, :train_graph.x.size(1)] = train_graph.x
                        train_graph.x = pad
                    elif train_graph.x.size(1) > expected_dim:
                        train_graph.x = train_graph.x[:, :expected_dim]
    
                    # Extract positional info
                    train_graph.pos = train_graph.x[:, 1:3].float() if expected_dim >= 3 else None
                    train_graph.x = train_graph.x[:, 0].long().unsqueeze(1)  # Final x: shape (nodes, 1)
                    
                    # Mark as preprocessed
                    train_graph.preprocessed = True
    
    return tasks

In [None]:
import torch
import gc
from unified_module import UnifiedReasoningModule
from nlm_module import NLMReasoningModule
import trainer
from task import Blackboard

# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define model path
UNIFIED_PATH = "output/models/unified_shape/unified_shape_final.pt"
NLM_PATH = "output/models/nlm_shape/nlm_shape_final.pt"
DATA_PATH = "precomputed_tasks/evaluation_400"

results_all_runs = []

# Load tasks
tasks = trainer.load_precomputed_tasks(DATA_PATH)


for i in range(5):
    print(f"\nStarting evaluation run {i+1}/5")
    
    # Clear CUDA cache
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    
    # Force garbage collection
    gc.collect()
    
    # Reset task states if needed
    for task in tasks:
        if hasattr(task, 'blackboard'):
            task.blackboard = Blackboard()
    
    # Initialize model
    # print("Initializing unified model...")
    # model = UnifiedReasoningModule(
    #     input_dim=3,
    #     hidden_dim=128,
    #     output_dim=11,
    #     device=device
    # )
    # model.load_complete_state(UNIFIED_PATH)

    
    # Initialize model and preprocess tasks for nlm
    trainer.preprocess_task_graphs(tasks)
    print("Initializing nlm model...")
    model = NLMReasoningModule(
            input_dim=3,
            hidden_dim=128,
            output_dim=11,
            device=device
        )
    model.load_complete_state(NLM_PATH)

    # Move model to device
    model.model = model.model.to(device)
    model.model.eval() # Set to evaluation mode
    
    # Run evaluation
    results = evaluate_model(model, tasks)
    results_all_runs.append(results)
    
    # Delete model to ensure clean state for next iteration
    del model
    
    # Clear CUDA cache again
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

# Analyze results across all runs
print("\nResults across all runs:")
for i, run_results in enumerate(results_all_runs):
    print(f"Run {i+1}: {run_results}")

# Calculate average performance
if results_all_runs:
    avg_accuracy = sum(r.get('accuracy', 0) for r in results_all_runs) / len(results_all_runs)
    print(f"Average accuracy: {avg_accuracy:.4f}")

In [None]:
import torch
import gc
import numpy as np
from unified_module import UnifiedReasoningModule
from nlm_module import NLMReasoningModule
from task_adaptation_runner import TaskAdaptationRunner
import trainer
from task5 import Blackboard, Task
from tqdm import tqdm
import json

# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define model paths
UNIFIED_MAML_PATH = "output/models/unified_shape/unified_maml_final_model.pt"
NLM_MAML_PATH = "output/models/nlm_shape/nlm_maml_final_model.pt"
USE_BLACKBOARD_INSIGHTS = True  # Whether to use insights from blackboard

def evaluate_task_adaptation(tasks, model_path, reasoning_module_class, method="maml", inner_lr=0.001, inner_steps=5):
    """
    Evaluate using task adaptation with the TaskAdaptationRunner
    
    Args:
        tasks: List of tasks to evaluate
        model_path: Path to the pre-trained meta-learning model
        reasoning_module_class: Class of reasoning module (UnifiedReasoningModule or NLMReasoningModule)
        method: Meta-learning method ("maml" or "proto")
        inner_lr: Inner loop learning rate
        inner_steps: Number of inner loop steps
        
    Returns:
        Dictionary with evaluation results including shape metrics
    """
    print(f"Evaluating with {method.upper()} task adaptation with ILR:{inner_lr} and IS:{inner_steps}...")
    
    # Determine reasoning method string
    reasoning_method = "unified" if reasoning_module_class.__name__ == "UnifiedReasoningModule" else "nlm"
    
    # Initialize task adaptation runner with the correct device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    runner = TaskAdaptationRunner(
        inner_lr=inner_lr,
        inner_steps=inner_steps,
        model_path=model_path,
        model_config={'input_dim': 3},
        reasoning_module_class=reasoning_module_class,
        method=method,
        device=device
    )
    
    # Create a wrapper class to make TaskAdaptationRunner compatible with evaluate_model
    class TaskAdaptationWrapper:
        def __init__(self, runner):
            self.runner = runner
            
        def solve(self, task):
            return self.runner.adapt_to_task(
                task=task,
                visualize=False,
                save_dir=None
            )
    
    # Create the wrapper instance
    model_wrapper = TaskAdaptationWrapper(runner)
    
    # Get full results from evaluate_model
    results = evaluate_model(model_wrapper, tasks)
    
    # Return the metrics without redundant recalculation
    return {
        "full_accuracy": results['test_grid_accuracy'],
        "shape_accuracy": results['shape_accuracy'],
        "content_accuracy": results['test_node_accuracy'],
        "task_accuracy": results['task_accuracy'],
        "test_grid_accuracy": results['test_grid_accuracy'],
        "test_node_accuracy": results['test_node_accuracy']
    }

def run_evaluation(tasks, reasoning_module_class, model_path, inner_lr, inner_steps):
    """Run multiple evaluation runs and return averaged results with shape metrics"""
    num_runs = 5  # Number of evaluation runs
    all_results = []
    
    for run in range(num_runs):
        print(f"\n=== Starting evaluation run {run+1}/{num_runs} ===")
        
        # Clear CUDA cache
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        
        # Force garbage collection
        gc.collect()
        
        # Reset tasks' blackboards if needed
        for task in tasks:
            if hasattr(task, 'blackboard'):
                task.blackboard = Blackboard()
        
        # Run evaluation with task adaptation
        run_results = evaluate_task_adaptation(
            tasks=tasks,
            model_path=model_path,
            reasoning_module_class=reasoning_module_class,
            inner_lr=inner_lr,
            inner_steps=inner_steps,
            method="maml"
        )
        
        all_results.append(run_results)
        
        # Print comprehensive metrics for this run
        print(f"Run {run+1} metrics:")
        print(f"  Full accuracy: {run_results.get('full_accuracy', 0.0):.4f} (shape + content)")
        print(f"  Shape accuracy: {run_results.get('shape_accuracy', 0.0):.4f} (correct dimensions)")
        print(f"  Content accuracy: {run_results.get('content_accuracy', 0.0):.4f} (overlapping cells)")
        print(f"  Grid accuracy: {run_results.get('test_grid_accuracy', 0.0):.4f}")
        print(f"  Node accuracy: {run_results.get('test_node_accuracy', 0.0):.4f}")
    
    # Calculate average performance for all metrics
    metrics = [
        "full_accuracy", "shape_accuracy", "content_accuracy", 
        "test_grid_accuracy", "test_node_accuracy", "task_accuracy"
    ]
    
    avg_metrics = {}
    for metric in metrics:
        # Use get() to handle cases where old metrics might be missing
        values = [result.get(metric, 0.0) for result in all_results]
        avg_metrics[f"average_{metric}"] = sum(values) / len(values) if values else 0.0
    
    # Print comprehensive average metrics
    print(f"\nAverage metrics across {num_runs} runs:")
    print(f"Full accuracy: {avg_metrics.get('average_full_accuracy', 0.0):.4f} (shape + content)")
    print(f"Shape accuracy: {avg_metrics.get('average_shape_accuracy', 0.0):.4f} (correct dimensions)")
    print(f"Content accuracy: {avg_metrics.get('average_content_accuracy', 0.0):.4f} (overlapping cells)")
    print(f"Task accuracy: {avg_metrics.get('average_task_accuracy', 0.0):.4f}")
    print(f"Grid accuracy: {avg_metrics.get('average_test_grid_accuracy', 0.0):.4f}")
    print(f"Node accuracy: {avg_metrics.get('average_test_node_accuracy', 0.0):.4f}")
    
    # Return comprehensive results
    return {
        "all_runs": all_results,
        **avg_metrics  # Include all average metrics
    }

# Test the static cell evaluation
# test_static_cell_evaluation()

if __name__ == "__main__":
    MODELS = ["unified", "nlm"]
    INNER_LR = 0.05
    INNER_STEPS = [5, 10, 15]
    
    for MODEL in MODELS:
        for IS in INNER_STEPS:
            
            # Load evaluation tasks
            print("Loading evaluation tasks...")
            tasks = trainer.load_precomputed_tasks("precomputed_tasks/evaluation_400")
            
            # Run evaluation
            if MODEL=="nlm":
                tasks = trainer.preprocess_task_graphs(tasks)
                results = run_evaluation(tasks, NLMReasoningModule, NLM_MAML_PATH, INNER_LR, IS)
            else:
                results = run_evaluation(tasks, UnifiedReasoningModule, UNIFIED_MAML_PATH, INNER_LR, IS)
            
            # Convert numpy values to Python native types for JSON serialization
            def convert_numpy(obj):
                if isinstance(obj, np.number):
                    return float(obj)
                elif isinstance(obj, np.ndarray):
                    return obj.tolist()
                elif isinstance(obj, dict):
                    return {k: convert_numpy(v) for k, v in obj.items()}
                elif isinstance(obj, list):
                    return [convert_numpy(i) for i in obj]
                return obj
            
            with open(f"{MODEL}_{IS}_05_task_adaptation_results.json", "w") as f:
                json.dump(convert_numpy(results), f, indent=2)
            
            print("Evaluation completed and results saved.")

### LLM Module

In [None]:
import os
import json
import numpy as np
import torch
from tqdm.notebook import tqdm
from task import Task
from llm_module import LLMReasoningModule
import traceback
import gc
import time

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Paths
DATA_DIR = "data/training"
RESULTS_DIR = "output/evaluation/llm"
os.makedirs(RESULTS_DIR, exist_ok=True)

# Load tasks
def load_tasks(directory, limit=None):
    """Load tasks from directory with optional limit"""
    tasks = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".json"):
                file_path = os.path.join(root, file)
                with open(file_path, "r") as f:
                    try:
                        data = json.load(f)
                        if "train" not in data or "test" not in data:
                            print(f"Warning: Invalid task format in {file_path}")
                            continue
                        
                        task = Task(
                            task_id=os.path.basename(file_path),
                            train_pairs=[(pair["input"], pair["output"]) for pair in data["train"]],
                            test_pairs=[(pair["input"], pair["output"]) for pair in data["test"]],
                        )
                        tasks.append(task)
                        
                        if limit and len(tasks) >= limit:
                            return tasks
                    except Exception as e:
                        print(f"Error loading {file_path}: {e}")
    return tasks

# Initialize LLM module with your fine-tuned model
def create_llm_module(model_name="gpt-4", api_key=None, temperature=0.3):
    """Create LLM module with specified model"""
    # Get API key from environment if not provided
    api_key = api_key or os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise ValueError("OpenAI API key not provided. Set OPENAI_API_KEY environment variable.")
        
    # Create module
    llm_module = LLMReasoningModule(
        model=model_name,
        api_key=api_key,
        temperature=temperature,
        max_tokens=2048,
        log_path="logs/llm_evaluation",
        cache_responses=True
    )
    
    return llm_module

# Evaluate a single prediction
def evaluate_prediction(prediction, target):
    """Check if prediction exactly matches target"""
    prediction = np.array(prediction)
    target = np.array(target)
    
    # Check if shapes match
    if prediction.shape != target.shape:
        return {
            "exact_match": False,
            "shape_match": False,
            "accuracy": 0.0
        }
    
    # Shape matches, check cell-by-cell accuracy
    total_cells = target.size
    matching_cells = np.sum(prediction == target)
    cell_accuracy = matching_cells / total_cells if total_cells > 0 else 0.0
    
    return {
        "exact_match": np.array_equal(prediction, target),
        "shape_match": True,
        "accuracy": float(cell_accuracy)
    }

def evaluate_prediction_static_aware(prediction, target, input_grid):
    """Check if prediction matches target, ignoring static cells"""
    prediction = np.array(prediction)
    target = np.array(target)
    input_grid = np.array(input_grid)
    
    # Pad grids to same size if shapes don't match
    if input_grid.shape != target.shape:
        input_padded, target_padded = pad_grids_to_same_size(input_grid, target)
    else:
        input_padded, target_padded = input_grid, target
    
    # Pad prediction to match target size
    if prediction.shape != target_padded.shape:
        prediction_padded = np.full(target_padded.shape, 10, dtype=prediction.dtype)
        min_rows = min(prediction.shape[0], target_padded.shape[0])
        min_cols = min(prediction.shape[1], target_padded.shape[1])
        prediction_padded[:min_rows, :min_cols] = prediction[:min_rows, :min_cols]
        shape_match = False
    else:
        prediction_padded = prediction
        shape_match = True
    
    # Calculate static cell mask
    static_mask = np.zeros(target_padded.shape, dtype=bool)
    for r in range(target_padded.shape[0]):
        for c in range(target_padded.shape[1]):
            static_mask[r, c] = is_static_cell(input_padded, target_padded, r, c)
    
    # Calculate accuracy ignoring static cells
    non_static_mask = ~static_mask
    
    if non_static_mask.sum() == 0:
        # If all cells are static, use traditional accuracy
        total_cells = target_padded.size
        matching_cells = np.sum(prediction_padded == target_padded)
        exact_match = np.array_equal(prediction_padded, target_padded)
    else:
        # Count only non-static cells
        total_cells = non_static_mask.sum()
        matching_cells = np.sum(prediction_padded[non_static_mask] == target_padded[non_static_mask])
        exact_match = matching_cells == total_cells
        
        # Add penalty for incorrectly predicted static cells
        static_incorrect = np.sum((prediction_padded[static_mask] != target_padded[static_mask]))
        if static_incorrect > 0:
            total_cells += static_incorrect
    
    cell_accuracy = matching_cells / total_cells if total_cells > 0 else 0.0
    
    return {
        "exact_match": exact_match,
        "shape_match": shape_match,
        "accuracy": float(cell_accuracy),
        "static_cells": static_mask.sum(),
        "non_static_cells": non_static_mask.sum()
    }

def test_static_cell_evaluation():
    """Test the static cell evaluation functionality"""
    print("Testing static cell evaluation...")
    
    # Test case 1: Simple pattern completion
    input_grid = np.array([
        [0, 0, 0],
        [0, 1, 0],
        [0, 0, 0]
    ])
    
    expected_output = np.array([
        [0, 0, 0],
        [0, 1, 1],  # Only (1,2) changes
        [0, 0, 0]
    ])
    
    # Perfect prediction
    perfect_prediction = expected_output.copy()
    
    # Prediction with one error in static cell
    static_error_prediction = expected_output.copy()
    static_error_prediction[0, 0] = 1  # Error in static cell
    
    # Prediction with one error in non-static cell
    dynamic_error_prediction = expected_output.copy()
    dynamic_error_prediction[1, 2] = 0  # Error in the only changing cell
    
    # Test perfect prediction
    result1 = evaluate_prediction_static_aware(perfect_prediction, expected_output, input_grid)
    print(f"Perfect prediction: accuracy={result1['accuracy']:.3f}, exact_match={result1['exact_match']}")
    print(f"  Static cells: {result1['static_cells']}, Non-static cells: {result1['non_static_cells']}")
    
    # Test static error prediction
    result2 = evaluate_prediction_static_aware(static_error_prediction, expected_output, input_grid)
    print(f"Static error prediction: accuracy={result2['accuracy']:.3f}, exact_match={result2['exact_match']}")
    print(f"  Static cells: {result2['static_cells']}, Non-static cells: {result2['non_static_cells']}")
    
    # Test dynamic error prediction
    result3 = evaluate_prediction_static_aware(dynamic_error_prediction, expected_output, input_grid)
    print(f"Dynamic error prediction: accuracy={result3['accuracy']:.3f}, exact_match={result3['exact_match']}")
    print(f"  Static cells: {result3['static_cells']}, Non-static cells: {result3['non_static_cells']}")
    
    print("\nNote: Static cell errors add penalty to total count, dynamic cell errors directly reduce accuracy.")

# Function to examine blackboard state
def examine_blackboard(task, prefix=""):
    """Examine and print blackboard state"""
    if not hasattr(task, 'blackboard'):
        print(f"{prefix}No blackboard found")
        return {}
    
    # Get blackboard state
    blackboard_info = {}
    
    # Extract reasoning history
    reasoning_history = task.get_reasoning_history()
    blackboard_info["reasoning_steps"] = len(reasoning_history)
    
    # Extract confidence scores
    if hasattr(task.blackboard, 'confidence_scores'):
        blackboard_info["confidence_scores"] = task.blackboard.confidence_scores

    # Check for transformations
    if hasattr(task.blackboard, 'knowledge_base'):
        # New blackboard format
        transformations_keys = [k for k in task.blackboard.knowledge_base.keys() 
                                if 'transformation' in k]
        blackboard_info["has_transformations"] = len(transformations_keys) > 0
        blackboard_info["transformation_keys"] = transformations_keys
        
        # Count total knowledge items
        blackboard_info["knowledge_items"] = len(task.blackboard.knowledge_base)
    
    # For readable output, print summary
    if prefix:
        print(f"{prefix}Blackboard summary:")
        print(f"{prefix}  - Reasoning steps: {blackboard_info['reasoning_steps']}")
        print(f"{prefix}  - Knowledge items: {blackboard_info.get('knowledge_items', 'N/A')}")
        print(f"{prefix}  - Has transformations: {blackboard_info.get('has_transformations', 'N/A')}")
        
    return blackboard_info

# Detailed evaluation of LLM module
def evaluate_llm_module(llm_module, tasks, verbose=True, save_path=None):
    """Evaluate LLM module on tasks"""
    results = {
        "model_name": llm_module.model,
        "task_results": {},
        "overall_accuracy": 0.0,
        "exact_matches": 0,
        "total_grids": 0,
        "errors": [],
        "timing": {
            "total_time": 0,
            "avg_time_per_task": 0
        }
    }
    
    start_time = time.time()
    
    # Process each task
    for i, task in enumerate(tqdm(tasks, desc=f"Evaluating {llm_module.model}")):
        if verbose:
            print(f"\n\n{'='*80}")
            print(f"Task {i+1}/{len(tasks)}: {task.task_id}")
            print(f"{'='*80}")
        
        task_start_time = time.time()
        
        try:
            # Examine blackboard before LLM reasoning
            if verbose:
                print("\nInitial blackboard state:")
                examine_blackboard(task, prefix="  ")
            
            # Use the module's solve method
            if verbose:
                print("\nSolving with LLM module...")
            
            predictions = llm_module.solve(task)
            
            # Examine blackboard after LLM reasoning
            if verbose:
                print("\nBlackboard state after LLM reasoning:")
                blackboard_info = examine_blackboard(task, prefix="  ")
            
            # Evaluate each prediction
            task_exact_matches = 0
            task_total = len(task.test_pairs)
            prediction_results = []
            
            for i, (_, target_grid) in enumerate(task.test_pairs):
                if i < len(predictions):
                    eval_result = evaluate_prediction(predictions[i], target_grid)
                    prediction_results.append(eval_result)
                    task_exact_matches += int(eval_result["exact_match"])
                    
                    if verbose:
                        print(f"\nTest example {i+1}:")
                        print(f"  - Exact match: {eval_result['exact_match']}")
                        print(f"  - Cell accuracy: {eval_result['accuracy']:.4f}")
            
            # Calculate task accuracy
            task_accuracy = task_exact_matches / task_total if task_total > 0 else 0.0
            task_duration = time.time() - task_start_time
            
            # Store results
            results["task_results"][task.task_id] = {
                "exact_match_accuracy": task_accuracy,
                "exact_matches": task_exact_matches,
                "total": task_total,
                "prediction_details": prediction_results,
                "execution_time": task_duration,
            }
            
            # Update overall counts
            results["exact_matches"] += task_exact_matches
            results["total_grids"] += task_total
            
            if verbose:
                print(f"\nTask summary:")
                print(f"  - Exact matches: {task_exact_matches}/{task_total}")
                print(f"  - Task accuracy: {task_accuracy:.4f}")
                print(f"  - Execution time: {task_duration:.2f} seconds")
            
        except Exception as e:
            error_info = {
                "task_id": task.task_id,
                "error": str(e),
                "traceback": traceback.format_exc()
            }
            results["errors"].append(error_info)
            if verbose:
                print(f"\nError evaluating task {task.task_id}: {e}")
                print(traceback.format_exc())
    
    # Calculate overall metrics
    total_time = time.time() - start_time
    results["overall_accuracy"] = (
        results["exact_matches"] / results["total_grids"] 
        if results["total_grids"] > 0 else 0.0
    )
    results["timing"]["total_time"] = total_time
    results["timing"]["avg_time_per_task"] = total_time / len(tasks) if tasks else 0
    
    # Print summary
    print(f"\n{llm_module.model} Evaluation Results:")
    print(f"Overall Accuracy: {results['overall_accuracy']:.4f}")
    print(f"Exact Matches: {results['exact_matches']}/{results['total_grids']}")
    print(f"Total evaluation time: {total_time:.2f} seconds")
    print(f"Average time per task: {results['timing']['avg_time_per_task']:.2f} seconds")
    
    if results["errors"]:
        print(f"Encountered {len(results['errors'])} errors during evaluation")
    
    # Save results if requested
    if save_path:
        with open(save_path, "w") as f:
            json.dump(results, f, indent=2)
        print(f"Results saved to {save_path}")
    
    return results

# Main evaluation function
def main(model_name="gpt-4", limit_tasks=5, verbose=True):
    print(f"Loading up to {limit_tasks} tasks...")
    tasks = load_tasks(DATA_DIR, limit=limit_tasks)
    print(f"Loaded {len(tasks)} tasks")
    
    try:
        # Create LLM module with specified model
        print(f"Creating LLM module with model: {model_name}")
        llm_module = create_llm_module(model_name=model_name)
        
        # Evaluate the module
        print("Evaluating LLM module...")
        results_path = os.path.join(RESULTS_DIR, f"{model_name.replace('-', '_')}_evaluation.json")
        results = evaluate_llm_module(
            llm_module=llm_module,
            tasks=tasks,
            verbose=verbose,
            save_path=results_path
        )
        
        return results
    
    except Exception as e:
        print(f"Critical error in evaluation: {e}")
        traceback.print_exc()
        return {"error": str(e), "traceback": traceback.format_exc()}

# Run the main function
if __name__ == "__main__":
    # You can adjust these parameters as neededwhich
    results = main(
        model_name="ft:gpt-4o-mini-2024-07-18:personal:arc-agi-blackboard:BJoraD1a",  # Replace with your fine-tuned model name
        limit_tasks=3,
        verbose=True         # Set to True for detailed output
    )