In [1]:
import sys
import random
import os
from pathlib import Path
from dotenv import load_dotenv

load_dotenv()

project_root = Path.cwd().parent
src_path = project_root / "sources"
sys.path.append(str(src_path))

from core.llm import OpenRouterClient
from utils.generate_dsl_docs import generate_symbolic_dsl_reference_markdown

from core.dsl_symbolic_interpreter import SymbolicRuleParser

sys.path.append(str(Path("/home/yann/ssd_storage/python/arcprize2025/tests/")))
sys.path.append(str(Path("/home/yann/ssd_storage/python/arcprize2025/sources/")))
from test_dsl_symbolic_executor import TEST_CASES
from assets.symbols import ROM_VAL_MAP

print("Cell 1 executed: Environment paths, dotenv loaded, and core modules imported.")

  from .autonotebook import tqdm as notebook_tqdm


Cell 1 executed: Environment paths, dotenv loaded, and core modules imported.


In [2]:
TRUE_RULES = {
    "007bbfb7": "⧎(⟹(◨(III), ⬒(III)), ⤨(III), ⊕(IX,IX,∅))",
    "009d5c81": "◫(⧈(◎(I)), [(▦(III,III,∅I∅;III;∅I∅), ⟹(⇒(VIII, II), ⇒(I, ∅))), (▦(III,III,I∅I;∅I∅;III), ⟹(⇒(VIII, III), ⇒(I, ∅)))], ⟹(⇒(VIII, VII), ⇒(I, ∅)))",
    "00d62c1b": "⧎(⌂,∨(⏚(∅),ⓑ(◎(III))),⟹(∨(⏚(∅),ⓑ(◎(III))),⟹(⇒(∅,IV),⇒(I,∅))))",
    "00dbd492": "⧎(⇒(∅,III),⌖(⌂,▦(IX,IX,[[II,II,II,II,II,II,II,II,II],[II,∅,∅,∅,∅,∅,∅,∅,II],[II,∅,∅,∅,∅,∅,∅,∅,II],[II,∅,∅,∅,∅,∅,∅,∅,II],[II,∅,∅,∅,II,∅,∅,∅,II],[II,∅,∅,∅,∅,∅,∅,∅,II],[II,∅,∅,∅,∅,∅,∅,∅,II],[II,∅,∅,∅,∅,∅,∅,∅,II],[II,II,II,II,II,II,II,II,II]])),⧎(⇒(∅,IV),⌖(⌂,▦(VII,VII,[[II,II,II,II,II,II,II],[II,∅,∅,∅,∅,∅,II],[II,∅,∅,∅,∅,∅,II],[II,∅,∅,II,∅,∅,II],[II,∅,∅,∅,∅,∅,II],[II,∅,∅,∅,∅,∅,II],[II,II,II,II,II,II,II]])),⧎(⇒(∅,VIII),⌖(⌂,▦(V,V,[[II,II,II,II,II],[II,∅,∅,∅,II],[II,∅,II,∅,II],[II,∅,∅,∅,II],[II,II,II,II,II]])),⌂)))"
}

In [3]:
def create_pure_rule_generation_prompt(
    complexity_focus,
    num_rules_to_generate=5,
):
    # Generate the full DSL grammar
    doc_sigil = generate_symbolic_dsl_reference_markdown()
    if not doc_sigil or len(doc_sigil.strip()) < 10:
        raise ValueError("Failed to generate doc_sigil. Check generate_symbolic_dsl_reference_markdown().")

    # Collect example rules
    available_rule_strings = []
    for test_case_item in TEST_CASES:
        if isinstance(test_case_item, dict) and "rule_string" in test_case_item:
            available_rule_strings.append(test_case_item["rule_string"].strip())
        elif isinstance(test_case_item, str):
            available_rule_strings.append(test_case_item.strip())

    true_rule_strings = list(TRUE_RULES.values())
    true_rule_strings = "\n".join(true_rule_strings)

    instruction = f"""
    
--- **DSL Grammar**
{doc_sigil}

You are a master of a symbolic DSL for grid transformation. Generate exactly {num_rules_to_generate} NEW and UNIQUE programs.

--- **Hard Requirements**
* Each rule must have at least 3 levels of nesting.
* Use deep recursion: e.g., ⌖(⌂, ▦(..., ⌖(⌂, ▦(...))))
* Combine multiple operators: ⟹, ◫, ⧎, ⇒, ∨, ⏚, ◆
* No trivial rules like: ⤨(I), ⟹(⌂, ⌂), ⇌(↔, ↕)


--- **Example of GOOD Output (from real ARC solutions)**
{available_rule_strings}

{true_rule_strings}

--- **Your Task**
Generate exactly {num_rules_to_generate} meaningful DSL programs.

--- Complexity focus
{complexity_focus}

--- **Rules for Output**
* ONE rule per line
* NO numbering (no "1.", "2.", etc.)
* NO markdown, bold, headers, or explanations
* NO English words (e.g., "BlockGridBuilder", "MatchPattern", "ApplyToRow")
* NO invented symbols (e.g., ◆, ⧀, Ⳁ, ∁, ⊥, ⌊, ⌋, →, ⟶)
* Use ONLY the symbols defined in the DSL Grammar
* Every `(` must have a matching `)`
* Do NOT wrap rules in quotes or backticks
* Do NOT invent new syntax
* DO NOT use command sequences (like `⧎`, `⟹`) as arguments to transformation commands like `⌖`, `◫`, `¿`
* For example: `⌖(⌂, ⧎(...))` is INVALID
* Instead, use: `⧎(..., ⌖(⌂, ...), ...)`
* Each rule must be COMPLETE and SELF-CONTAINED.
* NO truncated grids, NO missing closing parentheses.
* The rule must be complete — all parentheses must be balanced.
* NO truncated grids, NO missing `)`.
* Example of valid output:
⟹(ⓑ(◎(I)), ⌖(⌂, ▦(III,III,[[II,II],[II,II]])), ⌖(⌂, ▦(II,II,[[∅,I],[I,∅]])))
Now output exactly {num_rules_to_generate} raw DSL rules:
"""


    # print("<<BEGIN OF PROMPT>>")
    # print(instruction)
    # print("<<END OF PROMPT>>")
    return instruction

In [None]:
import random

print("--- Starting Bulk DSL Rule Generation Process ---")

openrouter_client = OpenRouterClient(
    # model="meta-llama/llama-3.1-70b-instruct",
    # model="deepseek/deepseek-r1-0528-qwen3-8b:free",
    model = "mistralai/mistral-small-3.2-24b-instruct",
    # model = "qwen/qwen3-235b-a22b-07-25",
    # model = "mistralai/mixtral-8x7b-instruct",
    temperature=1.1)

total_rules_needed = 1000
rules_per_batch = 5
save_interval =10
output_file = Path("generated_dsl_rules.txt")

complexity_prompts = [
    # --- Level 1: Atomic Building Blocks ---
    # Focus on simple, valid, parseable rules
    "simple, generate rules with exactly one operation: ⇒(from, to), ↔(grid), ↕(grid), ⇌(cmd1, cmd2). Use only I-X, ∅, or ⌂ as inputs.",
    "simple, generate rules using only ⇒(from, to) with diverse pairs: (I,II), (II,I), (III,V), (X,∅), (∅,I). No nesting.",
    "simple, generate rules using only grid transformations: ↔(⌂), ↕(⌂), ⤨(I), ◨(II), ⬒(III). One per rule.",
    "simple, generate rules using only boolean/logical ops on masks: ⏚(color), ⓑ(x), ◎(x), ≡(a,b).",
    "simple, generate rules using only pattern builder: ▦(rows, cols, [[pattern]]). Use only I, II, ∅ in the grid.",

    # --- Level 2: Valid Compositions ---
    # Focus on 2-3 level nesting, correct argument roles
    "moderate, generate rules like ⟹(condition, then, else) where condition is ⏚(∅), ⓑ(◎(I)), or ≡(a,b), and then/else are simple transforms.",
    "moderate, generate rules like ⌖(⌂, pattern) where pattern is ▦(III,III,[[II,∅,II],[∅,II,∅],[II,∅,II]]) or ⇒(I,II).",
    "moderate, generate rules like ◫(⌂, [(pattern1, action1), (else, action2)]) with one or two branches. Use ▦(...) as pattern.",
    "moderate, generate rules using ⧎(cmd1, cmd2, cmd3) to apply a sequence of transformations to ⌂.",
    "moderate, generate rules that combine arithmetic (⊞, ⊟) or logic (∨, ∧) with simple transforms.",

    # --- Level 3: Real ARC-Like Patterns (Based on TRUE_RULES) ---
    # Force use of real solution structures
    "complex, generate a rule that applies ⌖(⌂, ▦(...)) multiple times in a sequence, like in puzzle '00dbd492'. Use ⟹ or ⧎ to chain them.",
    "complex, generate a rule that uses ◫(⌂, [...]) with 2-3 branches, each applying a different ▦(...) pattern and transform.",
    "complex, generate a rule that uses ⇒(∅, N) to map background to a color, then applies ⌖ or ⟹ based on it.",
    "complex, generate a rule that uses nested logical checks: ⟹(∨(⏚(∅), ⓑ(◎(III))), transform1, transform2).",
    "complex, generate a rule that uses ⧎(⇒(∅,N), ⌖(⌂, ▦(...)), ...) to progressively modify the grid — like a multi-step solver.",

    # --- Level 4: Structural Innovation ---
    # Force new patterns, not just repetition
    "advanced, generate a rule that uses ◫(⌂, [...]) with a branch that itself contains another ◫ or ⟹.",
    "advanced, generate a rule that uses recursive-like structure: ⌖(⌂, ⟹(..., ⌖(⌂, ...))) or ⌖(⌂, ⌖(⌂, ...)).",
    "advanced, generate a rule that uses ⊕(grid1, grid2, mode) to combine two ⌖(⌂, ▦(...)) results.",
    "advanced, generate a rule that uses ⧈(cmd) to extract a bounding box, then applies a transform to it.",
    "advanced, generate a rule that uses ⇌(⇒(a,b), ⇒(c,d)) to apply symmetric value mapping.",
]
switch_complexity_every_n_batches = 1


all_validated_rules = []
parser = SymbolicRuleParser()

if output_file.exists():
    with open(output_file, 'r') as f:
        existing_rules = set(line.strip() for line in f if line.strip())
    all_validated_rules.extend(list(existing_rules))
    print(f"Resuming generation. Loaded {len(all_validated_rules)} existing rules from {output_file}.")
else:
    print(f"Starting new generation. Output will be saved to {output_file}.")

print(f"\n### Target: {total_rules_needed} valid rules. Requesting {rules_per_batch} per batch.")
print(f"### Complexity Prompts Defined: {len(complexity_prompts)}")
print(f"### Switching complexity every {switch_complexity_every_n_batches} batches.")

--- Starting Bulk DSL Rule Generation Process ---
Resuming generation. Loaded 510 existing rules from generated_dsl_rules.txt.

### Target: 1000 valid rules. Requesting 5 per batch.
### Complexity Prompts Defined: 20
### Switching complexity every 1 batches.


In [None]:
import time
import random
import re

def clean_rule_line(line):
    # Remove "1. ", "2) ", "**", etc.
    line = line.strip()
    line = re.sub(r"^\s*\d+[\.\)]\s*", "", line)  # Remove 1. 2. 3.
    line = re.sub(r"^\s*\*\*.*\*\*\s*$", "", line)  # Remove **Output** headers
    line = line.strip().strip('`"\'')  # Clean quotes
    return line


    

print("--- Beginning LLM Calls and Validation Loop ---")
loop_count = 0
current_complexity_prompt = None

MAX_RETRIES = 5 
INITIAL_BACKOFF_SECONDS = 5 

while len(all_validated_rules) < total_rules_needed:
    loop_count += 1

    if loop_count == 1 or (loop_count - 1) % switch_complexity_every_n_batches == 0:
        chosen_complexity = random.choice(complexity_prompts)
        current_complexity_prompt = chosen_complexity
        print(f"\n--- Switching Complexity! ---")
        print(f"New complexity focus: '{current_complexity_prompt}'")

    print(f"\n--- Batch {loop_count} --- (Current valid rules: {len(all_validated_rules)}/{total_rules_needed})")

    generation_prompt = create_pure_rule_generation_prompt(
        num_rules_to_generate=rules_per_batch,
        complexity_focus=current_complexity_prompt,
    )

    generated_text = None
    retries = 0
    while retries < MAX_RETRIES:
        try:
            print(f"Calling LLM for {rules_per_batch} rules (Attempt {retries + 1}/{MAX_RETRIES})...")
            generated_text = openrouter_client(
                generation_prompt
            )
            print(f"\n--- RAW LLM OUTPUT (Batch {loop_count}) ---")
            print(generated_text)
            print(f"--- END RAW OUTPUT ---\n")
            if generated_text:
                break 
            else:
                print("LLM returned empty text. Retrying...")
        except Exception as e:
            print(f"LLM call failed for batch {loop_count} (Attempt {retries + 1}/{MAX_RETRIES}): {e}")

        retries += 1
        if retries < MAX_RETRIES:
            wait_time = INITIAL_BACKOFF_SECONDS * (2 ** (retries - 1)) # Exponential backoff
            print(f"Waiting {wait_time} seconds before retrying...")
            time.sleep(wait_time)
        else:
            print(f"Max retries ({MAX_RETRIES}) reached for batch {loop_count}. Skipping this batch.")
            break # Exit retry loop, no more attempts for this batch

    if not generated_text: # If after all retries, still no text
        print(f"Skipping batch {loop_count} due to persistent LLM errors.")
        continue # Skip to next outer loop iteration (next batch)

    generated_rules_list = [
        line.strip() for line in generated_text.split('\n')
        if line.strip() and not line.strip().startswith('#')
    ]

    if not generated_rules_list:
        print("No parsable rules found in raw LLM output for this batch. Retrying in next loop.")
        time.sleep(2)
        continue

    print(f"Attempting to Parse and Validate {len(generated_rules_list)} DSL Rules from LLM response...")

    batch_valid_count = 0
    for i, rule_str in enumerate(generated_rules_list):
        if rule_str not in all_validated_rules:
            try:
                parser.parse_rule(rule_str)
                all_validated_rules.append(rule_str)
                batch_valid_count += 1
            except Exception as e:
                pass

    print(f"Batch {loop_count} Summary: {batch_valid_count} new unique valid rules added. Total valid rules: {len(all_validated_rules)}.")

    if len(all_validated_rules) >= total_rules_needed:
        pass
    elif len(all_validated_rules) // save_interval > (len(all_validated_rules) - batch_valid_count) // save_interval:
        print(f"Saving {len(all_validated_rules)} rules to {output_file}...")
        try:
            with open(output_file, 'w') as f:
                for rule in all_validated_rules:
                    f.write(rule + '\n')
            print("Save complete.")
        except IOError as e:
            print(f"Error saving rules to file: {e}")

print(f"\n--- Generation Loop Complete! ---")
print(f"Final count: {len(all_validated_rules)} unique valid rules collected.")
print(f"Performing final save of all {len(all_validated_rules)} rules to {output_file}...")
try:
    with open(output_file, 'w') as f:
        for rule in all_validated_rules:
            f.write(rule + '\n')
    print("Final save complete.")
except IOError as e:
    print(f"Error during final save: {e}")

print("Cell 5 executed: Bulk rule generation loop completed, rules saved.")

--- Beginning LLM Calls and Validation Loop ---

--- Switching Complexity! ---
New complexity focus: 'moderate, generate rules using ⧎(cmd1, cmd2, cmd3) to apply a sequence of transformations to ⌂.'

--- Batch 1 --- (Current valid rules: 510/1000)
Calling LLM for 5 rules (Attempt 1/5)...
