In [None]:
# =========================================================
# 0) CÀI ĐẶT & IMPORT
# =========================================================
import os, json, random, textwrap, gc, math, sys
from pathlib import Path
from typing import List, Dict, Any
from dataclasses import dataclass

import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    DataCollatorForSeq2Seq,
    Trainer,
    TrainingArguments,
)
from datasets import Dataset, DatasetDict
# import evaluate

SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)

# Nếu chạy trên Kaggle:
INPUT_ROOT = Path("/kaggle/input/google-code-golf-2025")
OUTPUT_ROOT = Path("/kaggle/working")

In [None]:
# =========================================================
# 1) TÌM & ĐỌC DỮ LIỆU JSON (task001.json ... task400.json)
#    - Mặc định sẽ rà tất cả thư mục trong /kaggle/input
#      để tìm các file "task*.json".
# =========================================================

json_files = sorted(INPUT_ROOT.rglob("task*.json"))
assert len(json_files) > 0, "Không tìm thấy file task*.json trong /kaggle/input"

print(f"Found {len(json_files)} JSON files (e.g., {json_files[:3]})")

def grid_to_json_str(grid: List[List[int]]) -> str:
    # dùng JSON chuẩn để model (ByT5: char-level) xử lý ổn định
    return json.dumps(grid, separators=(",", ":"))

def should_use_extras(input_grid, extras):
    """
    ước lượng độ dài prompt nếu thêm extras; nếu vượt MAX_INPUT_LEN thì bỏ.
    Không gọi make_single_example để tránh đệ quy.
    """
    # prompt tối thiểu (không extras)
    base = (
        "Write ONLY this function (no prints, no globals):\n"
        "def transform(grid: list[list[int]]) -> list[list[int]]:\n"
        "\nInput:\n" + json.dumps(input_grid, separators=(',', ':'))
    )
    base_len = len(tokenizer(base, add_special_tokens=False).input_ids)

    # phần extras (tối đa 2 ví dụ)
    extra_text = ""
    for i, (iin, oout) in enumerate(extras[:2], 1):
        extra_text += (
            f"\nExample {i}\nInput:\n{json.dumps(iin, separators=(',',':'))}"
            f"\nOutput:\n{json.dumps(oout, separators=(',',':'))}"
        )

    total_text = base + ("\nExamples to infer the rule:" + extra_text if extra_text else "")
    total_len = len(tokenizer(total_text, add_special_tokens=False).input_ids)

    # để dư buffer ~5% phòng token hóa khác biệt
    return total_len < int(MAX_INPUT_LEN * 0.95)


def make_single_example(input_grid, output_grid, extra_examples=None):
    parts = []
    parts.append("Write ONLY this function (no prints, no globals):")
    parts.append("def transform(grid: list[list[int]]) -> list[list[int]]:\n")
    if extra_examples and should_use_extras(input_grid, extra_examples):
        parts.append("Examples to infer the rule:")
        for i, (iin, oout) in enumerate(extra_examples[:2], 1):
            parts.append(
                f"\nExample {i}\nInput:\n{json.dumps(iin, separators=(',',':'))}"
                f"\nOutput:\n{json.dumps(oout, separators=(',',':'))}"
            )
    parts.append("\nInput:\n" + json.dumps(input_grid, separators=(',', ':')))
    prompt = "\n".join(parts).strip()

    # target "return constant" để giữ định dạng code cho training
    target_code = (
        "def transform(grid: list[list[int]]) -> list[list[int]]:\n"
        f"    return {json.dumps(output_grid, separators=(',',':'))}\n"
    ).strip()

    return {"prompt": prompt, "code": target_code}

def load_arc_style_pairs(json_path: Path):
    """
    Đọc file ARC-style JSON: kỳ vọng các khóa 'train' (list các {input, output}),
    'test' (thường có) ... Trả về list[(in, out)] từ 'train'.
    """
    obj = json.loads(json_path.read_text())
    pairs = []
    if "train" in obj:
        for it in obj["train"]:
            if "input" in it and "output" in it:
                pairs.append((it["input"], it["output"]))
    return pairs

# Gom toàn bộ dữ liệu thành các (prompt, code)
samples = []
for jf in json_files:
    pairs = load_arc_style_pairs(jf)
    if not pairs:
        continue

    # Tạo mẫu kiểu "single-pair" (baseline), đồng thời thêm vài ví dụ phụ từ cùng file
    # để tăng tính nhất quán định dạng prompt.
    extras = []
    if len(pairs) >= 3:
        # Lấy 2 ví dụ phụ
        extras = [pairs[0], pairs[1]]

    for i, (iin, oout) in enumerate(pairs):
        # Với mỗi cặp trong file, tạo 1 sample huấn luyện
        # (có thể kèm extras từ cùng nhiệm vụ)
        ex = make_single_example(iin, oout, extra_examples=extras if i == 0 else None)
        samples.append(ex)

print(f"Prepared {len(samples)} trainable (prompt->code) samples.")

In [None]:
# =========================================================
# 2) CHIA TẬP TRAIN/VAL
# =========================================================

random.shuffle(samples)
split = int(0.95 * len(samples)) if len(samples) > 20 else max(1, int(0.8 * len(samples)))
train_data = samples[:split]
val_data   = samples[split:]

ds_train = Dataset.from_list(train_data)
ds_val   = Dataset.from_list(val_data)
raw_ds = DatasetDict({"train": ds_train, "validation": ds_val})
raw_ds

In [None]:
# =========================================================
# 3) TOKENIZER & TIỀN XỬ LÝ
#    - ByT5 (google/byt5-base) là char-level → hợp với JSON/code.
# =========================================================

MODEL_NAME = "google/byt5-small"

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)

MAX_INPUT_LEN  = 6144   
MAX_TARGET_LEN = 1024 

def preprocess_fn(batch):
    model_inputs = tokenizer(batch["prompt"], max_length=MAX_INPUT_LEN, truncation=True)
    labels = tokenizer(text_target=batch["code"], max_length=MAX_TARGET_LEN, truncation=True)
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

tokenized = raw_ds.map(preprocess_fn, batched=True, remove_columns=raw_ds["train"].column_names)
data_collator = DataCollatorForSeq2Seq(tokenizer, padding="longest")

In [None]:
# =========================================================
# 4) MODEL & THAM SỐ HUẤN LUYỆN — Seq2SeqTrainer cho ByT5
# =========================================================
import torch, numpy as np
from transformers import (
    T5ForConditionalGeneration,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME).to(device)

VOCAB_SIZE = int(model.config.vocab_size)
PAD_ID     = int(tokenizer.pad_token_id)
SPECIALS   = set(int(i) for i in tokenizer.all_special_ids)
OFFSET     = int(getattr(tokenizer, "offset", 3))

def _to_int_seq(seq):
    """Ép về list[int], flatten nông nếu gặp list/ndarray lồng nông."""
    out = []
    for x in seq:
        if isinstance(x, (list, tuple, np.ndarray)):
            for y in x:
                try: out.append(int(y))
                except: out.append(PAD_ID)
        else:
            try: out.append(int(x))
            except: out.append(PAD_ID)
    return out

def _sanitize_ids_batch(batch_ids):
    """Clamp id theo vocab, xử lý id < OFFSET (không phải special) -> PAD."""
    clean = []
    for seq in batch_ids:
        seq = _to_int_seq(seq)
        fixed = []
        for tid in seq:
            if tid not in SPECIALS and tid < OFFSET:
                tid = PAD_ID
            if tid < 0 or tid >= VOCAB_SIZE:
                tid = PAD_ID
            fixed.append(tid)
        clean.append(fixed)
    return clean

def compute_metrics(eval_pred):
    preds, labels = eval_pred
    # Seq2SeqTrainer + predict_with_generate -> preds là token ids; nhưng vẫn robust:
    if isinstance(preds, tuple):
        preds = preds[0]

    # chuyển về list để dễ xử lý ragged
    if isinstance(preds, np.ndarray):
        preds = preds.tolist()
    if isinstance(labels, np.ndarray):
        labels = labels.tolist()

    # làm sạch pred ids
    pred_ids = _sanitize_ids_batch(preds)

    # thay -100 trong labels -> PAD, rồi làm sạch
    labels = [[(int(t) if t != -100 else PAD_ID) for t in seq] for seq in labels]
    label_ids = _sanitize_ids_batch(labels)

    # decode
    pred_str  = tokenizer.batch_decode(pred_ids,  skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    exacts = [1 if p.strip() == y.strip() else 0 for p, y in zip(pred_str, label_str)]
    return {"exact_match": float(sum(exacts))/len(exacts) if exacts else 0.0}

BATCH_SIZE = 1          # trước là 2
GRAD_ACC   = 32         # trước là 8 (giữ effective batch ≈ 32)
LR         = 2e-4
NUM_EPOCHS = 5 
MAX_GEN    = MAX_TARGET_LEN

training_args = Seq2SeqTrainingArguments(
    output_dir=str(OUTPUT_ROOT / "byt5-codegen"),
    num_train_epochs=NUM_EPOCHS,
    learning_rate=LR,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    gradient_accumulation_steps=GRAD_ACC,

    eval_strategy="steps",
    eval_steps=100,

    save_steps=200,
    logging_steps=50,
    save_total_limit=2,

    fp16=torch.cuda.is_available(),
    bf16=False,
    gradient_checkpointing=True,

    # Quan trọng: bắt Seq2SeqTrainer dùng generate() khi eval
    predict_with_generate=True,
    generation_max_length=MAX_TARGET_LEN,
    generation_num_beams=1,   # tùy chọn

    report_to=[],
    load_best_model_at_end=True,
    metric_for_best_model="eval_exact_match",
    greater_is_better=True,
)

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized["train"],
    eval_dataset=tokenized["validation"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)


In [None]:
# =========================================================
# 5) HUẤN LUYỆN
# =========================================================

train_result = trainer.train()
trainer.save_model(OUTPUT_ROOT / "byt5-codegen-best")

In [None]:
# =========================================================
# 6) SUY LUẬN: SINH CODE TỪ 1 NHIỆM VỤ CỤ THỂ & THỰC THI THỬ
# =========================================================

def build_inference_prompt(task_json_path: Path, max_examples: int = 3) -> Dict[str, Any]:
    obj = json.loads(task_json_path.read_text())
    train_pairs = [(x["input"], x["output"]) for x in obj.get("train", []) if "input" in x and "output" in x]
    test_items  = obj.get("test", [])
    assert train_pairs, "No train pairs found."
    assert test_items, "No test items found."

    # Lấy tối đa vài ví dụ train để làm ngữ cảnh
    k = min(max_examples, len(train_pairs))
    extras = train_pairs[:k]

    # Chọn test[0] để demo
    test_input = test_items[0]["input"]
    prompt = make_single_example(test_input, test_items[0].get("output", test_input), extra_examples=extras)["prompt"]
    # Lưu ý: ở inference thực tế, bạn thường KHÔNG biết output của test.
    # Ở đây chỉ để tạo prompt dạng "few-shot" nhất quán.
    return {"prompt": prompt, "test_input": test_input, "gt_output": test_items[0].get("output")}

def generate_code(prompt: str, max_new_tokens: int = 384) -> str:
    prefix = "def transform(grid: list[list[int]]) -> list[list[int]]:\n    "
    enc = tokenizer(prompt, return_tensors="pt").to(device)
    dec_prefix = tokenizer(prefix, return_tensors="pt").input_ids.to(device)
    
    # inputs = tokenizer(prompt, return_tensors="pt").to(device)
    gen_ids = model.generate(
        **enc,
        decoder_input_ids=dec_prefix,
        max_new_tokens=max_new_tokens,
        do_sample=False,
        num_beams=4,
        early_stopping=True,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.pad_token_id,
        length_penalty=1.0,
    )
    text = tokenizer.decode(gen_ids[0], skip_special_tokens=True)
    # Đảm bảo có prefix
    if not text.startswith(prefix):
        text = prefix + text
    return text

# Chọn ngẫu nhiên một file để demo suy luận
demo_path = random.choice(json_files)
print("Demo with:", demo_path)
demo = build_inference_prompt(demo_path, max_examples=3)
print("\n=== PROMPT (truncated) ===\n", demo["prompt"][:800], "...\n")

gen_code = generate_code(demo["prompt"])
print("=== GENERATED CODE ===\n", gen_code[:1000], "\n")

In [None]:
# =========================================================
# 7) THỰC THI CODE (CẨN TRỌNG) & ĐÁNH GIÁ
#    - Sandbox cơ bản (KHÔNG đảm bảo an toàn tuyệt đối; chỉ dùng offline/Kaggle).
# =========================================================

SAFE_BUILTINS = {
    "range": range, "len": len, "enumerate": enumerate, "list": list, "max": max, "min": min, "sum": sum,
    "abs": abs, "zip": zip, "map": map, "filter": filter, "all": all, "any": any
}

def run_transform(code_str: str, grid: List[List[int]]) -> List[List[int]]:
    local_env = {}
    try:
        exec(compile(code_str, "<generated>", "exec"), {"__builtins__": SAFE_BUILTINS}, local_env)
        if "transform" not in local_env:
            raise RuntimeError("No function 'transform' defined.")
        result = local_env["transform"](grid)
        return result
    except Exception as e:
        print("Execution error:", e)
        return None

pred_grid = run_transform(gen_code, demo["test_input"])
print("Pred grid:", pred_grid)
if demo["gt_output"] is not None:
    print("GT grid   :", demo["gt_output"])
    print("Exact match:", pred_grid == demo["gt_output"])

In [None]:
# =========================================================
# 8) VIẾT task.py TỪ CODE SINH RA & IMPORT EVALUATOR
# =========================================================
import re, ast
from pathlib import Path

_HEADER = "def transform(grid: list[list[int]]) -> list[list[int]]:"

def _clean_text(s: str) -> str:
    # bỏ markdown ```...``` và control chars
    s = re.sub(r"```(?:python)?\s*([\s\S]*?)```", r"\1", s, flags=re.I)
    s = "".join(ch for ch in s if ch == "\n" or 32 <= ord(ch) <= 0x10FFFF)
    # cắt mọi thứ trước header (nếu model còn in thêm lời)
    i = s.find(_HEADER)
    if i >= 0: s = s[i:]
    # giữ lại chỉ thân hàm + wrapper
    return s.strip()

def _ensure_indent(code: str) -> str:
    # nếu sau header không có indent, thêm 4 spaces vào mỗi dòng thân
    lines = code.splitlines()
    if not lines or not lines[0].startswith("def transform"):
        return code
    body = lines[1:]
    if not body:
        body = ["    return grid"]
    # nếu tất cả body đã indent đúng thì giữ nguyên, nếu không thì ép indent
    if not all((not ln) or ln.startswith("    ") for ln in body):
        body = [("    " + ln if ln and not ln.startswith("    ") else ln) for ln in body]
    return "\n".join([lines[0]] + body)

def _compile_ok(src: str) -> bool:
    try:
        ast.parse(src)
        return True
    except SyntaxError:
        return False

def write_task_py_from_generated(gen_code: str, path=OUTPUT_ROOT / "task.py"):
    src = _clean_text(gen_code or "")
    # nếu thiếu header (khó xảy ra sau patch A) thì thêm
    if not src.startswith(_HEADER):
        src = _HEADER + "\n    " + src

    # đảm bảo có ít nhất 1 return trong thân; nếu không, thêm 'return grid'
    if "return " not in src.split(_HEADER, 1)[-1]:
        src = src.rstrip() + "\n    return grid\n"

    src = _ensure_indent(src)

    # nếu vẫn lỗi cú pháp -> fallback sang thân rỗng nhưng hợp lệ
    if not _compile_ok(src):
        src = _HEADER + "\n    return grid\n"

    wrapper = "\n\ndef p(grid):\n    return transform(grid)\n"
    Path(path).write_text(src.rstrip() + wrapper, encoding="utf-8")
    print(f"Wrote {path} (forced header + sanitized)")

In [None]:
# =========================================================
# 9) SINH CODE, TẠO task.py, VÀ CHẤM
#    (Dùng model/tokenizer/trainer ở các cell trước)
# =========================================================
import json, random

# ---- chọn 1 task để demo (VD: task400.json) ----
task_glob = list(INPUT_ROOT.rglob("task400.json")) or list(INPUT_ROOT.rglob("task*.json"))
assert task_glob, "Không tìm thấy task*.json trong /kaggle/input"
task_path = task_glob[0]
task_num_match = re.search(r"task(\d+)\.json$", str(task_path))
task_num = int(task_num_match.group(1)) if task_num_match else 0
print("Using task:", task_path, "| task_num:", task_num)

# ---- tạo prompt few-shot từ file task ----
obj = json.loads(task_path.read_text())
pairs = [(x["input"], x["output"]) for x in obj.get("train", []) if "input" in x and "output" in x]
assert pairs, "task.json thiếu cặp train input/output"
extras = pairs[: min(3, len(pairs))]

test_items = obj.get("test", [])
if not test_items:
    # Nếu không có test, dùng tạm train[0] để huấn luyện format prompt (không ảnh hưởng verify)
    test_items = [{"input": pairs[0][0]}]

test_input = test_items[0]["input"]
prompt = make_single_example(test_input, pairs[0][1], extra_examples=extras)["prompt"]  # tái dùng hàm ở cell trước

# ---- generate code bằng model đã fine-tuned ----
gen_code = generate_code(prompt, max_new_tokens=384)
print("=== GENERATED (truncated) ===\n", gen_code[:600], "...\n")

# ---- ghi task.py (p() gọi transform()) ----
write_task_py_from_generated(gen_code, OUTPUT_ROOT/"task.py")

# ---- chuẩn bị examples cho evaluator ----
examples = {
    "train": obj.get("train", []),
    "test":  obj.get("test", []),
    "arc-gen": obj.get("arc-gen", []),  # nếu thiếu sẽ thành []
}

# ---- chạy verify_program từ evaluator ----
# Lưu ý: nếu muốn chỉ định đường dẫn task.py khác, truyền task_path=...
evaluator.verify_program(task_num=task_num, examples=examples)

def _lengths(ds, field):
    ids = tokenizer(ds[field][:50], truncation=False)  # sample 50 để nhanh
    return [len(x) for x in ids["input_ids"]]

in_lens  = _lengths(raw_ds["train"], "prompt")
tgt_lens = _lengths(raw_ds["train"], "code")
print("max input len:", max(in_lens), " | MAX_INPUT_LEN =", MAX_INPUT_LEN)
print("max target len:", max(tgt_lens), " | MAX_TARGET_LEN =", MAX_TARGET_LEN)



In [None]:
# =========================================================
# 9b) HEURISTIC SYNTHESIS (nếu model sinh rác)
#      - Tìm phép biến đổi không gian (rotate/flip/transpose)
#        + ánh xạ màu (color map) sao cho khớp toàn bộ train.
#      - Nếu tìm thấy, ghi thẳng task.py với code "thật".
# =========================================================
from copy import deepcopy

def transpose(g): return [list(r) for r in zip(*g)]
def rot90(g):    return transpose(g[::-1])
def rot180(g):   return [r[::-1] for r in g[::-1]]
def rot270(g):   return rot90(rot180(g))
def flip_h(g):   return [r[::-1] for r in g]
def flip_v(g):   return g[::-1]
def identity(g): return [row[:] for row in g]

SPATIAL_OPS = [
    ("identity", identity),
    ("rot90",   rot90),
    ("rot180",  rot180),
    ("rot270",  rot270),
    ("flip_h",  flip_h),
    ("flip_v",  flip_v),
    ("transpose", transpose),
]

def same_shape(a, b):
    return len(a)==len(b) and all(len(ra)==len(rb) for ra, rb in zip(a,b))

def learn_color_map(src, tgt):
    """Học ánh xạ màu toàn cục: mỗi màu a -> đúng 1 màu b (không phụ thuộc vị trí)."""
    m = {}
    for ra, rb in zip(src, tgt):
        for a, b in zip(ra, rb):
            if a in m and m[a] != b: 
                return None
            m[a] = b
    return m

def apply_color_map(g, m, default=None):
    out = []
    for r in g:
        nr = []
        for v in r:
            if v in m:
                nr.append(m[v])
            else:
                nr.append(v if default is None else default)
        out.append(nr)
    return out

def compose_ops(ops):
    def comp(g):
        cur = g
        for _, f in ops:
            cur = f(cur)
        return cur
    name = "|".join(n for n,_ in ops) if ops else "identity"
    return name, comp

def find_rule(train_pairs):
    """
    Tìm chuỗi op không gian (0..2 bước) + màu (toàn cục) khớp 100% train.
    Trả về: (ops_list, color_map_dict) hoặc None
    """
    candidates = [()]  # độ dài 0
    # độ dài 1
    for op in SPATIAL_OPS:
        candidates.append((op,))
    # độ dài 2 (op1 -> op2)
    for i, op1 in enumerate(SPATIAL_OPS):
        for op2 in SPATIAL_OPS:
            candidates.append((op1, op2))

    for ops in candidates:
        name, f = compose_ops(list(ops))
        # học color-map từ cặp đầu
        src0, tgt0 = train_pairs[0]
        mid0 = f(src0)
        if not same_shape(mid0, tgt0):
            continue
        cmap = learn_color_map(mid0, tgt0)
        if cmap is None:
            continue
        # kiểm tra tất cả cặp
        ok = True
        for src, tgt in train_pairs:
            mid = f(src)
            if not same_shape(mid, tgt):
                ok = False; break
            pred = apply_color_map(mid, cmap)
            if pred != tgt:
                ok = False; break
        if ok:
            return list(ops), cmap
    return None

def synthesize_code_from_rule(ops, cmap):
    """Sinh mã Python tối giản cho transform() tương ứng với (ops + color map)."""
    op_code = {
        "identity":  "def _id(g): return [row[:] for row in g]",
        "transpose": "def _transpose(g): return [list(r) for r in zip(*g)]",
        "rot90":     "def _rot90(g): return _transpose(g[::-1])",
        "rot180":    "def _rot180(g): return [r[::-1] for r in g[::-1]]",
        "rot270":    "def _rot270(g): return _rot90(_rot180(g))",
        "flip_h":    "def _flip_h(g): return [r[::-1] for r in g]",
        "flip_v":    "def _flip_v(g): return g[::-1]",
    }
    seq = []
    for name, _ in ops:
        seq.append(name)
    seq_code = "\n".join(op_code[n] for n in sorted(set(seq)) if n in op_code)

    pipe_lines = []
    pipe_lines.append("mid = [row[:] for row in grid]")
    for name in seq:
        if name=="identity": 
            continue
        pipe_lines.append({
            "transpose": "mid = _transpose(mid)",
            "rot90":     "mid = _rot90(mid)",
            "rot180":    "mid = _rot180(mid)",
            "rot270":    "mid = _rot270(mid)",
            "flip_h":    "mid = _flip_h(mid)",
            "flip_v":    "mid = _flip_v(mid)",
        }[name])

    # color map code
    items = ", ".join(f"{k}:{v}" for k,v in sorted(cmap.items()))
    cmap_code = (
        "cmap = {"+items+"}\n"
        "out = []\n"
        "for r in mid:\n"
        "    out.append([cmap.get(v, v) for v in r])\n"
        "return out"
    )

    body = "\n    ".join(pipe_lines + [cmap_code])

    code = f"""
def transform(grid: list[list[int]]) -> list[list[int]]:
    {body}

def p(grid):
    return transform(grid)
""".strip()

    # Thêm helpers nếu cần
    if seq:
        code = seq_code + "\n\n" + code
    return code

# ===== Chạy synthesis khi cần =====
train_pairs = pairs  # từ Cell 9 (đã đọc obj)
rule = find_rule(train_pairs)
if rule is not None:
    ops, cmap = rule
    synth_code = synthesize_code_from_rule(ops, cmap)
    (OUTPUT_ROOT/"task.py").write_text(synth_code, encoding="utf-8")
    print("Wrote task.py via heuristic synthesis ✅")
else:
    print("No simple rule found (ops<=2 + color-map). Keeping previous task.py.")


In [None]:
# ===== Inference sạch cho 1 task =====
import json, re, ast
from pathlib import Path
import torch
from transformers import AutoTokenizer, T5ForConditionalGeneration

DEVICE     = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_DIR  = OUTPUT_ROOT / "byt5-codegen-best"   # <- chỗ bạn Trainer.save_model()
TOKENIZER  = "google/byt5-base"

tokenizer = AutoTokenizer.from_pretrained(TOKENIZER)
model     = T5ForConditionalGeneration.from_pretrained(str(MODEL_DIR)).to(DEVICE).eval()

HEADER = "def transform(grid: list[list[int]]) -> list[list[int]]:\n    "

def generate_code(prompt: str, max_new_tokens: int = 384) -> str:
    enc = tokenizer(prompt, return_tensors="pt").to(DEVICE)
    dec_prefix = tokenizer(HEADER, return_tensors="pt").input_ids.to(DEVICE)
    with torch.no_grad():
        ids = model.generate(
            **enc,
            decoder_input_ids=dec_prefix,     # ép mở đầu đúng header
            max_new_tokens=max_new_tokens,
            do_sample=False,                   # beam search → sạch hơn
            num_beams=4,
            early_stopping=True,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.pad_token_id,
            length_penalty=1.0,
        )
    text = tokenizer.decode(ids[0], skip_special_tokens=True)
    if not text.startswith(HEADER):
        text = HEADER + text
    return text

def _clean_code(s: str) -> str:
    # bỏ ```...```, control chars, cắt mọi thứ trước HEADER
    s = re.sub(r"```(?:python)?\s*([\s\S]*?)```", r"\1", s, flags=re.I)
    s = "".join(ch for ch in s if ch == "\n" or 32 <= ord(ch) <= 0x10FFFF)
    i = s.find(HEADER.strip().splitlines()[0])
    if i >= 0:
        s = s[i:]
    # đảm bảo có header
    if not s.startswith(HEADER.splitlines()[0]):
        s = HEADER + "return grid\n"
    # bảo đảm indent thân hàm
    lines = s.splitlines()
    head = lines[0].strip()
    body = lines[1:] if len(lines) > 1 else []
    if not body or not any("return" in ln for ln in body):
        body = ["    return grid"]
    if not all((not ln) or ln.startswith("    ") for ln in body):
        body = [("    "+ln if ln and not ln.startswith("    ") else ln) for ln in body]
    final = [HEADER.rstrip()] + body
    code = "\n".join(final) + "\n\n" + "def p(grid):\n    return transform(grid)\n"
    # compile-check
    try:
        ast.parse(code)
    except SyntaxError:
        code = HEADER + "    return grid\n\n" + "def p(grid):\n    return transform(grid)\n"
    return code

# ---- chọn task ----
task_glob = list(INPUT_ROOT.rglob("task400.json")) or list(INPUT_ROOT.rglob("task*.json"))
assert task_glob, "Không tìm thấy task*.json"
task_path = task_glob[0]
task_num  = int(Path(task_path).stem.replace("task",""))
print("Using task:", task_path, "| task_num:", task_num)

# ---- build prompt NGẮN để tránh truncation (không nhồi extras nếu quá dài) ----
obj = json.loads(Path(task_path).read_text())
pairs = [(x["input"], x["output"]) for x in obj.get("train", []) if "input" in x and "output" in x]
assert pairs, "task.json thiếu cặp train input/output"
test_items = obj.get("test", []) or [{"input": pairs[0][0]}]

def prompt_minimal(iin):
    return (
        "Write ONLY this function (no prints, no globals):\n"
        "def transform(grid: list[list[int]]) -> list[list[int]]:\n\n"
        "Input:\n" + json.dumps(iin, separators=(',',':'))
    )

prompt   = prompt_minimal(test_items[0]["input"])
gen_code = generate_code(prompt, max_new_tokens=512)
print("=== GEN PREVIEW ===\n", gen_code[:300], "...\n")

code = _clean_code(gen_code)
(OUTPUT_ROOT/"task.py").write_text(code, encoding="utf-8")
print("Wrote", OUTPUT_ROOT/"task.py")

# ---- verifier (API của bạn không có task_path) ----
examples = {"train": obj.get("train", []), "test": obj.get("test", []), "arc-gen": obj.get("arc-gen", [])}
evaluator.verify_program(task_num=task_num, examples=examples)


In [None]:
from pathlib import Path
import json, torch
from transformers import AutoTokenizer, T5ForConditionalGeneration

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_DIR = OUTPUT_ROOT / "byt5-codegen-best"   # chỗ bạn save_model
tokenizer = AutoTokenizer.from_pretrained("google/byt5-base")
model = T5ForConditionalGeneration.from_pretrained(str(MODEL_DIR)).to(DEVICE).eval()

sample_task = next(iter(INPUT_ROOT.rglob("task*.json")))
obj = json.loads(Path(sample_task).read_text())
iin, oout = obj["train"][0]["input"], obj["train"][0]["output"]

prompt = (
    "Write ONLY this function (no prints, no globals):\n"
    "def transform(grid: list[list[int]]) -> list[list[int]]:\n\n"
    "Input:\n" + json.dumps(iin, separators=(',',':'))
)

HEADER = "def transform(grid: list[list[int]]) -> list[list[int]]:\n    "
inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)
dec_prefix = tokenizer(HEADER, return_tensors="pt").input_ids.to(DEVICE)
with torch.no_grad():
    ids = model.generate(**inputs, decoder_input_ids=dec_prefix, max_new_tokens=256,
                         do_sample=False, num_beams=4, eos_token_id=tokenizer.eos_token_id,
                         pad_token_id=tokenizer.pad_token_id)
text = tokenizer.decode(ids[0], skip_special_tokens=True)
print(text[:300])


In [None]:
from pathlib import Path
import json, random

def build_supervised_set_with_heuristics():
    items = []
    all_tasks = sorted(INPUT_ROOT.rglob("task*.json"))
    print("Scanning", len(all_tasks), "tasks...")
    ok = 0
    for p in all_tasks:
        obj = json.loads(Path(p).read_text())
        pairs = [(x["input"], x["output"]) for x in obj.get("train", []) if "input" in x and "output" in x]
        if not pairs:
            continue
        rule = find_rule(pairs)  # dùng từ Cell 9b bạn đã có: (ops, cmap) hoặc None
        if rule is None:
            continue
        ops, cmap = rule
        code = synthesize_code_from_rule(ops, cmap)  # code “thật” giải task

        # prompt ngắn: chỉ đưa Input của test[0] để giữ thống nhất
        test_items = obj.get("test", []) or [{"input": pairs[0][0]}]
        prompt = (
            "Write ONLY this function (no prints, no globals):\n"
            "def transform(grid: list[list[int]]) -> list[list[int]]:\n\n"
            "Input:\n" + json.dumps(test_items[0]["input"], separators=(',',':'))
        )
        items.append({"prompt": prompt, "code": code})
        ok += 1
    print(f"Prepared {ok} heuristic-labeled samples.")
    return items

heuristic_samples = build_supervised_set_with_heuristics()
assert len(heuristic_samples) > 0, "Heuristic couldn't solve any task; mở rộng phép biến đổi hoặc giữ cả label hằng số."


In [None]:
# samples_hard = heuristic_samples
# samples_easy = samples (tập cũ prompt->'return hằng số')
mix_ratio = 0.8  # 80% code thật, 20% hằng số
n_true = int(mix_ratio * len(heuristic_samples))
random.shuffle(heuristic_samples)
random.shuffle(samples)  # 'samples' là tập cũ bạn tạo
mixed = heuristic_samples[:n_true] + samples[: max(0, len(heuristic_samples)-n_true)]
random.shuffle(mixed)

ds_train = Dataset.from_list(mixed[: int(0.95*len(mixed))])
ds_val   = Dataset.from_list(mixed[int(0.95*len(mixed)):])
tokenized = DatasetDict({"train": ds_train, "validation": ds_val}).map(
    preprocess_fn, batched=True, remove_columns=["prompt","code"]
)


In [None]:
# =========================================================
# 10) (TÙY CHỌN) ĐÁNH GIÁ HÀNG LOẠT TASK
# =========================================================
from collections import defaultdict

results = defaultdict(list)
all_tasks = sorted(INPUT_ROOT.rglob("task*.json"))
print(f"Found {len(all_tasks)} tasks.")

for tpath in all_tasks:
    m = re.search(r"task(\d+)\.json$", str(tpath))
    tnum = int(m.group(1)) if m else 0
    obj = json.loads(tpath.read_text())
    pairs = [(x["input"], x["output"]) for x in obj.get("train", []) if "input" in x and "output" in x]
    if not pairs:
        continue
    extras = pairs[: min(3, len(pairs))]
    test_items = obj.get("test", []) or [{"input": pairs[0][0]}]
    test_input = test_items[0]["input"]

    prompt = make_single_example(test_input, pairs[0][1], extra_examples=extras)["prompt"]
    code = generate_code(prompt, max_new_tokens=384)
    write_task_py_from_generated(code, WORKING/"task.py")

    examples = {"train": obj.get("train", []), "test": obj.get("test", []), "arc-gen": obj.get("arc-gen", [])}
    print(f"\n=== VERIFY task{tnum:03d} ===")
    evaluator.verify_program(task_num=tnum, examples=examples)


In [None]:
!pip -q install transformers==4.44.2 datasets==2.21.0 accelerate==0.34.2 evaluate==0.4.2