# ðŸ§¡ Generating **EvoProc** procedures for the **GSM8K Dataset** Using a Genetic Algorithm and OLLaMa Queries

## ðŸŸ§ **Step 1**: Import packages and necessary application functions & variables

In [1]:
import os, re, math, json, ollama, traceback
import pandas as pd
from hashlib import blake2b
from datetime import datetime, timezone
from datasets import load_dataset
from typing import Callable, Literal, Optional, Dict, Any, Tuple
from evoproc.ga_scaffold_structured import ProcedureGA, GAConfig
from evoproc.validators import validate_procedure_structured
from evoproc_procedures.models import Procedure
from evoproc_procedures.schemas import get_schema
from evoproc_procedures.prompts import create_procedure_prompt
from evoproc_procedures.ollama import query, query_gpt_chat, repair_fn_ollama
from evoproc_procedures.runners import run_steps_stateful_minimal
from evoproc_procedures.helpers import pretty_print

  from .autonotebook import tqdm as notebook_tqdm


## ðŸŸ§ **Step 2**: Import the GSM8K Dataset

In [2]:
train_dataset = load_dataset("openai/gsm8k", "main", split="train")
test_dataset = load_dataset("openai/gsm8k", "main", split="test")

## ðŸŸ§ **Step 3**: Set variable constants and instantiate necessary functions

These include:
- Defining regex search functions to grab the final numerical answer from the GSM8K `answer` parameter
- Defining the evaluation function to compare predicted and actual answers
- Defining functions for file and ID handling (for saving results)
- Grabbing the GSM final answer schema
- Setting the query function
- Setting the model
- Instantiating the GA (genetic algorithm) object
- Defining the run function which puts everything together into one easy-to-run function

In [3]:
_FINAL_AFTER_HASH_RE = re.compile(r"####\s*(-?\d+(?:\.\d+)?)\s*$", re.MULTILINE)
_LAST_NUMBER_RE = re.compile(r"-?\d+(?:\.\d+)?")

def extract_gold_number(gold_answer):
    m = _FINAL_AFTER_HASH_RE.search(gold_answer)
    if m:
        return float(m.group(1))
    nums = _LAST_NUMBER_RE.findall(gold_answer)
    return float(nums[-1]) if nums else None

def _numbers_equal(a, b, tol=1e-9):
    if a is None or b is None:
        return False
    try:
        return abs(float(a) - float(b)) < tol
    except Exception:
        return a == b

In [4]:
def eval_fn(state) -> float:
    """Return a fitness score in [0,1]."""
    # prefer model-extracted numeric if present, else try to parse its text
    pred_num = state.get("final_answer_numerical")
    if pred_num is None:
        try:
            pred_num = float(re.findall(r"-?\d+(?:\.\d+)?", state.get("final_answer",""))[-1])
        except Exception:
            return 0.0
    gold_num = state.get("_gold_num")  # weâ€™ll inject this per item
    if gold_num is None:
        return 0.0
    # exact match or close within small tolerance
    return 1.0 if math.isclose(pred_num, gold_num, rel_tol=0, abs_tol=1e-6) else 0.0

In [5]:
def _safe_extract_json(text: str) -> Optional[dict]:
    """Try to pull a JSON object from a string. Returns dict or None."""
    if not text:
        return None
    s = text.strip()

    # Strip ```json ... ``` fences
    fence = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", s, flags=re.DOTALL | re.IGNORECASE)
    if fence:
        s = fence.group(1).strip()

    # Try direct parse
    try:
        obj = json.loads(s)
        return obj if isinstance(obj, dict) else None
    except Exception:
        pass

    # Fallback: take the first {...} block
    i, j = s.find("{"), s.rfind("}")
    if i != -1 and j != -1 and j > i:
        try:
            obj = json.loads(s[i : j + 1])
            return obj if isinstance(obj, dict) else None
        except Exception:
            return None
    return None


def _parse_baseline_output(raw: Any) -> Tuple[Dict[str, Any], Optional[str], Optional[float], str]:
    """
    Returns (state, pred_answer, pred_num, raw_text).
    - state: parsed dict if possible, else {"final_answer": <text>}
    - pred_answer: state["final_answer"] if available, else the raw text
    - pred_num: state["final_answer_numerical"] if available/coercible, else last number in pred_answer
    - raw_text: normalized string version of raw
    """
    # Normalize raw_text
    if isinstance(raw, dict):
        raw_text = json.dumps(raw, ensure_ascii=False)
        state = raw
    else:
        raw_text = "" if raw is None else str(raw)
        state = _safe_extract_json(raw_text)

    if isinstance(state, dict):
        pred_answer = state.get("final_answer")
        pred_num = state.get("final_answer_numerical")

        # Coerce numeric if possible
        try:
            pred_num = float(pred_num) if pred_num is not None else None
        except Exception:
            pred_num = None

        # If numeric missing, try parse from final_answer text
        if pred_num is None:
            txt = str(pred_answer or "")
            nums = _LAST_NUMBER_RE.findall(txt)
            pred_num = float(nums[-1]) if nums else None

        return state, (str(pred_answer) if pred_answer is not None else None), pred_num, raw_text

    # No JSON found: treat as plain text
    txt = raw_text.strip()
    nums = _LAST_NUMBER_RE.findall(txt)
    pred_num = float(nums[-1]) if nums else None
    return {"final_answer": txt}, (txt if txt else None), pred_num, raw_text


In [6]:
def _append_jsonl(path, items):
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
    with open(path, "a", encoding="utf-8") as f:
        for it in items:
            f.write(json.dumps(it, ensure_ascii=False) + "\n")
        f.flush()
        os.fsync(f.fileno())

def _load_existing_ids(path):
    ids = set()
    try:
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    rec = json.loads(line)
                    if "id" in rec and rec["id"] is not None:
                        ids.add(rec["id"])
                except json.JSONDecodeError:
                    # tolerate a partially written last line
                    continue
    except FileNotFoundError:
        pass
    return ids

def _stable_example_id(ex, *, salt=""):
    """
    Make a deterministic ID when dataset lacks 'id'.
    Uses question+answer (+ optional salt) to avoid changing if order changes.
    """
    # If a real id exists, reuse it
    if ex.get("id") is not None:
        return str(ex["id"])

    q = (ex.get("question") or "").strip()
    a = (ex.get("answer") or "").strip()
    h = blake2b(digest_size=16)  # 128-bit
    h.update(salt.encode("utf-8", "ignore"))
    h.update(q.encode("utf-8", "ignore"))
    h.update(b"\x1e")  # delimiter
    h.update(a.encode("utf-8", "ignore"))
    return h.hexdigest()

In [7]:
# MODEL = "llama4:latest"
# MODEL = "gemma3:latest"
# MODEL = "gpt-oss:120b-cloud"
GPT_OSS_LOCAL_BUGGY_MODELS = {"gpt-oss:20b", "gpt-oss:120b"}
MODEL = "gpt-oss:120b"
FINAL_SCHEMA = get_schema("gsm")
QUERY_FN = query

In [8]:
def query_fn_no_format_for_gptoss(prompt, model, fmt, seed):
    # kill structured-output / format only for gpt-oss models
    # Note, this will only be with the gpt-oss NON-CLOUD models (gpt-oss:120b & gpt-oss:20b)
    # Cloud models do not have this bug, so I want to bypass this if it is a cloud model
    if isinstance(model, str) and model in GPT_OSS_LOCAL_BUGGY_MODELS:
        fmt = None
    return QUERY_FN(prompt, model, fmt, seed)   # call your existing function

In [9]:
# ga = ProcedureGA(
#     model=MODEL,
#     create_proc_fn=lambda task: create_procedure_prompt(task),
#     query_fn=QUERY_FN,                                     # backend call
#     schema_json_fn=lambda: Procedure.model_json_schema(),
#     validate_fn=validate_procedure_structured,          # pure function
#     repair_fn=repair_fn_ollama,                         # GA expects (proc, model) -> proc
#     cfg=GAConfig(population_size=3, max_generations=3, crossover_rate=0.5, mutation_rate=0.5, seed=42),
# )

ga = ProcedureGA(
    model=MODEL,
    create_proc_fn=lambda task: create_procedure_prompt(task),
    query_fn=query_fn_no_format_for_gptoss,             # backend call
    schema_json_fn=lambda: None,
    validate_fn=validate_procedure_structured,          # pure function
    repair_fn=repair_fn_ollama,                         # GA expects (proc, model) -> proc
    cfg=GAConfig(population_size=3, max_generations=3, crossover_rate=0.5, mutation_rate=0.5, seed=42),
)

Runners for running the procedural queries vs. the baseline queries

In [10]:
RunnerFn = Callable[[str, Optional[float]], Dict[str, Any]]

def make_baseline_runner(query_fn, model: str, seed: int = 1234, print_bool: bool = False) -> RunnerFn:
    def _runner(question: str, gold_num: Optional[float]) -> Dict[str, Any]:
        prompt = (
            "Solve the following GSM8K problem.\n"
            'Return ONLY JSON with keys: "final_answer" (string), "final_answer_numerical" (number).\n'
            "No extra text.\n\n"
            f"PROBLEM:\n{question}\n"
        )
        if print_bool:
            print("Prompt to model:")
            print(prompt)
            print("-----")
        raw = query_fn(prompt, model, FINAL_SCHEMA, seed)  # your query_fn can ignore fmt for local gpt-oss
        if print_bool:
            print("Raw model output:")
            print(raw)
            print("-----")
        state, pred_ans, pred_num, raw_text = _parse_baseline_output(raw)

        correct = (
            pred_num is not None
            and gold_num is not None
            and math.isclose(float(pred_num), float(gold_num), rel_tol=0, abs_tol=1e-6)
        )
        return {
            "mode": "baseline",
            "state": state,
            "pred_answer": pred_ans,
            "pred_num": pred_num,
            "correct": bool(correct),
            "raw": raw_text,
        }
    return _runner

def make_procedural_runner(ga, query_fn, model: str, seed: int = 1234, print_bool: bool = False) -> RunnerFn:
    def _runner(question: str, gold_num: Optional[float]) -> Dict[str, Any]:
        best, history = ga.run(
            task_description=question,
            final_answer_schema=FINAL_SCHEMA,
            eval_fn=None,
            print_progress=print_bool,
        )
        if print_bool:
            print("\nBest procedure found:")
            pretty_print(best.proc)
            print("\nRunning final procedure to get final answer...")
        final_state = run_steps_stateful_minimal(
            best.proc, question, FINAL_SCHEMA, ga.model, query_fn=query_fn
        )
        pred_ans = final_state.get("final_answer")
        pred_num = final_state.get("final_answer_numerical")
        correct = (
            pred_num is not None
            and gold_num is not None
            and math.isclose(float(pred_num), float(gold_num), rel_tol=0, abs_tol=1e-6)
        )
        return {
            "mode": "procedural",
            "procedure": best.proc,
            "fitness": best.fitness,
            "steps": len(best.proc.get("steps", [])),
            "state": final_state,
            "pred_answer": pred_ans,
            "pred_num": pred_num,
            "correct": bool(correct),
        }
    return _runner

Runner for running an entire GSM8K batch with file saving IO capabilities

In [11]:
def run_gsm8k_batch(
    examples,
    runner: RunnerFn,       # <-- inject baseline/procedural behavior here
    out_path=None,          # e.g., "runs/gsm8k_results.jsonl"
    save_every=10,          # write every N examples
    resume=False,           # skip examples whose IDs are already in out_path
    id_salt="",             # optional: add dataset name/split/version here for extra uniqueness
    *,
    skip_errors: bool = False,          # NEW: continue after per-item failures
    save_error_records: bool = True,    # NEW: write an "error" record to JSONL
    include_traceback: bool = False,    # NEW: optionally store traceback (can be large)
    print_bool: bool = False,
):
    """
    examples: iterable of dicts like {"question": "...", "answer": "..."} (GSM8K format)
    Returns: list of per-item result dicts with procedure, state, and score.
    Persists results to JSONL every `save_every`. If `resume=True`, skips already-saved IDs.
    """
    pending = []
    results = []
    existing_ids = _load_existing_ids(out_path) if (resume and out_path) else set()

    def _flush():
        nonlocal pending
        if out_path and pending:
            _append_jsonl(out_path, pending)
            pending.clear()

    for idx, ex in enumerate(examples):
        qid = _stable_example_id(ex, salt=id_salt)
        if resume and out_path and (qid in existing_ids):
            continue

        question = ex.get("question")
        gold_text = ex.get("answer")
        gold_num = extract_gold_number(gold_text) if gold_text else None

        best = None # so the except block is safe
        try:
            extra = runner(question, gold_num)

            rec = {
                "id": qid,
                "row_index": idx,
                "question": question,
                "gold_answer": gold_text,
                "gold_num": gold_num,
                "status": "ok",
                **extra,
            }

        except Exception as e:
            this_err_proc = getattr(best, "proc", None) 

            if not skip_errors:
                # flush anything we have before raising
                _flush()
                raise

            # create an error record (and optionally mark it as "done" for resume)
            rec = {
                "id": qid,
                "row_index": idx,
                "question": question,
                "gold_answer": gold_text,
                "gold_num": gold_num,
                "procedure": this_err_proc,
                "status": "error",
                "error_type": type(e).__name__,
                "error": str(e),
                "timestamp_utc": datetime.now(timezone.utc).isoformat(),
            }
            if include_traceback:
                rec["traceback"] = traceback.format_exc()

            # If you *don't* want resume=True to skip errored items later,
            # set save_error_records=False OR change your _load_existing_ids
            # to only count status=="ok".
            if not save_error_records:
                # don't save it; still return it in-memory
                results.append(rec)
                continue

        results.append(rec)

        if out_path:
            pending.append(rec)
            if len(pending) >= save_every:
                _flush()

    _flush()
    return results


## ðŸŸ§ **Step 4**: Run the batch function to get results

Running the first example to make sure everything goes smoothly before running larger batches

Uncomment the cells below if you want to test before running the larger set

In [12]:
# first = train_dataset.select(range(1))

# res = ollama.generate(
#         model=MODEL,
#         prompt=create_procedure_prompt(first[0]["question"]),
#         format=None,
#         options={"temperature": 1, "seed": 1234},
#     )
# print(res["response"])

In [13]:
# # just a small batch for demo, run if you want to test quickly
# first = train_dataset.select(range(1))  
# first_result = run_gsm8k_batch(
#     first,
#     runner=make_baseline_runner(query_fn_no_format_for_gptoss, MODEL, seed=1234, print_bool=True),
#     out_path="runs/gsm8k_results_testing.jsonl",
#     save_every=1,  # write every example for demo
#     id_salt="gsm8k_train",  # optional: add dataset name/split/version here for extra uniqueness
#     print_bool=True
# )
# print(first_result)

Running the actual data set (full). Adjust params below if needed before continuing.

In [14]:
# Change these variables to control current file
CURRENT_FILE_PATH = "runs/gsm8k_train_v6_baseline.jsonl"
CURRENT_ID_SALT = "gsm8k-train-v6_baseline"
# CHANGE TO TRUE ONLY IF YOU NEED TO RESUME
RESUME = False
# CHANGE ONLY IF YOU WANT TO PRINT LOGS AS IT RUNS
PRINT_BOOL = False

In [15]:
# Instantiate the runners
proc_runner = make_procedural_runner(ga, query_fn_no_format_for_gptoss, MODEL, seed=1234, print_bool=PRINT_BOOL)
baseline_runner = make_baseline_runner(query_fn_no_format_for_gptoss, MODEL, seed=1234, print_bool=PRINT_BOOL)
# Set which runner you want to use
RUNNER = baseline_runner

In [16]:
results = run_gsm8k_batch(
    train_dataset, 
    runner=RUNNER,
    out_path=CURRENT_FILE_PATH, 
    save_every=5,
    resume=RESUME,
    id_salt=CURRENT_ID_SALT,   # optional but nice to set (dataset name/split/version)
    skip_errors=True,          # NEW: continue after per-item failures
    print_bool=PRINT_BOOL
)

KeyboardInterrupt: 

## ðŸŸ§ **Step 5**: Analyze results

Read your results from your file

In [None]:
read_results = None
with open(CURRENT_FILE_PATH, "r") as f:
    procs = [json.loads(line) for line in f]
    read_results = procs
    f.close()
results_df = pd.DataFrame(read_results)

In [None]:
print(results_df["correct"].value_counts())
results_df["correct"].value_counts().plot(kind='bar')

correct
False    32
True     17
Name: count, dtype: int64

Printing to further investigate false answers

In [None]:
false_answers = results_df[results_df["correct"]==False]
for index, row in false_answers.iterrows():
    print(f"Question: {row['question']}")
    print(f"Gold Answer: {row['gold_answer']}")
    print(f"Predicted Answer: {row['pred_answer']}")
    print("Procedure:")
    pretty_print(row['procedure'])
    print("\n---\n")

Question: Betty is saving money for a new wallet which costs $100. Betty has only half of the money she needs. Her parents decided to give her $15 for that purpose, and her grandparents twice as much as her parents. How much more money does Betty need to buy the wallet?
Gold Answer: In the beginning, Betty has only 100 / 2 = $<<100/2=50>>50.
Betty's grandparents gave her 15 * 2 = $<<15*2=30>>30.
This means, Betty needs 100 - 50 - 30 - 15 = $<<100-50-30-15=5>>5 more.
#### 5
Predicted Answer: Betty needs $30 more to buy the wallet.
Procedure:

--- Procedure: Calculate how much more money Betty needs. ---
Steps:

Step 1: Extract the problem text from the input.
  **Inputs**:
    - problem_text: The problem description.
  **Outputs**:
    - problem_text: The problem description.

Step 2: Calculate the amount Betty has.
  **Inputs**:
    - problem_text: The problem description.
  **Outputs**:
    - money_she_has: The amount of money Betty has.

Step 3: Calculate the amount Betty needs.
  **

Looking at an isolated false answer

In [None]:
this = false_answers.iloc[1]
run_steps_stateful_minimal(this['procedure'], this['question'], FINAL_SCHEMA, ga.model, print_bool=True, query_fn=QUERY_FN)

[step 1] inputs: {'problem_text': 'Julie is reading a 120-page book. Yesterday, she was able to read 12 pages and today, she read twice as many pages as yesterday. If she wants to read half of the remaining pages tomorrow, how many pages should she read?'}
[step 1] outputs: {k: state[k] for k in ['total_pages'] if k in state}
[step 2] inputs: {'total_pages': 120}
[step 2] outputs: {k: state[k] for k in ['yesterdays_pages'] if k in state}
[step 3] inputs: {'total_pages': 120, 'yesterdays_pages': 0}
[step 3] outputs: {k: state[k] for k in ['todays_pages'] if k in state}
[step 4] inputs: {'todays_pages': 120, 'yesterdays_pages': 0}
[step 4] outputs: {k: state[k] for k in ['tomorrows_pages'] if k in state}
[step 5] inputs: {'tomorrows_pages': 120}
[step 5] outputs: {k: state[k] for k in ['final_answer', 'final_answer_numerical', 'confidence', 'units'] if k in state}


{'problem_text': 'Julie is reading a 120-page book. Yesterday, she was able to read 12 pages and today, she read twice as many pages as yesterday. If she wants to read half of the remaining pages tomorrow, how many pages should she read?',
 'total_pages': 120,
 'yesterdays_pages': 0,
 'todays_pages': 120,
 'tomorrows_pages': 120,
 'final_answer': 'Julie will read 120 pages tomorrow.',
 'final_answer_numerical': 120,
 'confidence': 1,
 'units': 'pages'}