# Save to HF

In [None]:
import gzip
import json
from tqdm import tqdm
import re
from math_verify import verify
import hashlib

def assign_split_by_hash(key: str, train=0.95, val=0.03, test=0.02) -> str:
    """
    Deterministic split by hashing a key (e.g. question).
    Returns: 'train' | 'validation' | 'test'
    """
    assert abs(train + val + test - 1.0) < 1e-9
    h = hashlib.md5(key.encode("utf-8")).hexdigest()
    r = int(h[:8], 16) / 0xFFFFFFFF  # in [0,1]
    if r < train:
        return "train"
    if r < train + val:
        return "validation"
    return "test"

def load_all_jsonl_gz(path: str):
    data = []
    with gzip.open(path, "rt", encoding="utf-8") as f:
        for line in tqdm(f, desc="Loading jsonl.gz"):
            line = line.strip()
            if line:
                data.append(json.loads(line))
    return data


def split_think_answer_complete(assistant_text: str):
    """
    Only call this after has_complete_think() is True.
    - think_text: the content inside <think>...</think>
    - answer_text: everything after the closing </think>
    """
    THINK_OPEN = "<think>"
    THINK_CLOSE = "</think>"
    i = assistant_text.find(THINK_OPEN)
    j = assistant_text.find(THINK_CLOSE)
    think_text = assistant_text[i + len(THINK_OPEN): j].strip()
    answer_text = assistant_text[j + len(THINK_CLOSE):].strip()
    return think_text, answer_text



# def extract_boxed_answer(text: str):
#     _BOXED_RE = re.compile(r"\\boxed\{([^}]*)\}")
#     if not isinstance(text, str):
#         return None
#     m = _BOXED_RE.search(text)
#     return m.group(1).strip() if m else None


def extract_answer_math_verify(text: str):
    from math_verify import LatexExtractionConfig, parse
    """
    Use math_verify to extract a final answer candidate from model output.
    Returns a string (sympy-ish) or None.
    """
    if not isinstance(text, str) or not text.strip():
        return None

    parsed = parse(
        text,
        extraction_mode="first_match",
        extraction_config=[
            LatexExtractionConfig(
                boxed_match_priority=0,          # prefer \boxed{...} when present
                try_extract_without_anchor=True  # more tolerant to messy outputs
            )
        ],
    )
    if not parsed:
        return None

    # parsed elements can be sympy objects and/or strings depending on the expression
    return str(parsed[0])


def is_int_strict(x) -> bool:
    if x is None:
        return False
    try:
        return str(int(x)) == str(x).strip()
    except Exception:
        return False

In [None]:
data = load_all_jsonl_gz("/mnt/local/shared/michaelw/mlf2/verl/reproduce/data/openthoughts3/openthoughts3-math_examples_complete_cot.jsonl.gz")
print("N =", len(data))
print("keys =", data[0].keys())

In [None]:
hf_ready = []
kept_idx = 0

for element in tqdm(data):
    question = element["conversations"][0]["value"]
    solution = element["conversations"][-1]["value"]
    
    if "boxed" not in solution[-100:]:
        continue
    
    question_id = hashlib.md5(question.encode()).hexdigest()[:16]
    extracted_answer = extract_answer_math_verify(solution[-50:])
    is_answer_int = is_int_strict(extracted_answer)
    instruction = 'Let\'s think step by step and solve this problem. '
    
    hf_ready_data = {
        "data_source": "open-thoughts/OpenThoughts3-1.2M",
        "prompt": [{"role": "user", "content": instruction + question}],
        "ability": "math",
        "reward_model": {"style": "rule", "ground_truth": extracted_answer},
        "extra_info": {
            "split": assign_split_by_hash(question),
            "index": kept_idx,
            "answer": solution,
            "question": question,
            "is_answer_int": is_answer_int,
            "question_id": question_id, 
        },
    }
    hf_ready.append(hf_ready_data)
    kept_idx += 1

In [None]:
from datasets import Dataset, DatasetDict

def to_datasetdict(hf_ready):
    splits = {"train": [], "validation": [], "test": []}
    for ex in hf_ready:
        sp = ex["extra_info"]["split"]
        if sp not in splits:
            continue
        splits[sp].append(ex)

    ds_dict = DatasetDict({
        k: Dataset.from_list(v) for k, v in splits.items() if len(v) > 0
    })
    return ds_dict



ds = to_datasetdict(hf_ready)
print(len(ds))

In [None]:
ds['train'][0]

In [None]:
from huggingface_hub import create_repo

repo_id = "michaelw-cerebras/openthoughts3-math-gsm8kstyle"

create_repo(repo_id, repo_type="dataset", private=True, exist_ok=True)

ds.push_to_hub(repo_id)

In [None]:
# {'difficulty': None, 
#  'source': 'ai2-adapt-dev/openmath-2-math', 
#  'domain': 'math', 
#  'conversations': [{'from': 'human', 'value': 'A bookshelf has 5 shelves, and each shelf can hold up to 3 books. In how many ways can 6 distinct books be placed on the bookshelf such that no shelf contains more than 2 books by the same author, and no two books by the same author are on adjacent shelves?'}, 
#                    {'from': 'gpt', 'value': '<think> \n</think>\n\nTo determine the number of ways to place 6 distinct books on a bookshelf with 5 shelves, considering each shelf can hold up to 3 books, and the constraints that no shelf contains more than 2 books by the same author and no two books by the same author are on adjacent shelves:\n\n1. **Interpretation of Constraints**:\n   - Each shelf can hold up to 3 books, but no shelf can have more than 2 books by the same author.\n   - No two books by the same author can be on adjacent shelves.\n\n2. **Assumption**:\n   - Since the books are distinct, if all books are by different authors, the constraints are automatically satisfied (each author has only one book).\n\n3.'}]}

# Patch: add token count

In [None]:
import re
from datasets import load_dataset
from transformers import AutoTokenizer

REPO_ID = "michaelw-cerebras/openthoughts3-math-gsm8kstyle"
TOKENIZER_NAME = "Qwen/Qwen3-0.6B" 

tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME, use_fast=True)

THINK_RE = re.compile(r"<think>(.*?)</think>", flags=re.DOTALL | re.IGNORECASE)

def split_think_answer(text: str):
    """
    Returns (cot_text, answer_text).
    - cot_text: concatenated content inside <think>...</think> (supports multiple blocks)
    - answer_text: everything after the last </think> if any, else the full text as answer_text and cot_text=""
    """
    if text is None:
        return "", ""

    matches = list(THINK_RE.finditer(text))
    if not matches:
        # no explicit <think> tags
        return "", text.strip()

    cot_parts = [m.group(1).strip() for m in matches]
    cot_text = "\n\n".join([p for p in cot_parts if p])

    # take everything after the last closing tag as answer
    last = matches[-1]
    answer_text = text[last.end():].strip()

    return cot_text, answer_text

def count_tokens(s: str) -> int:
    if not s:
        return 0
    return len(tokenizer.encode(s, add_special_tokens=False))

def add_token_counts(example):
    full = example["extra_info"]["answer"]
    cot_text, ans_text = split_think_answer(full)

    example["extra_info"]["cot_tokens"] = count_tokens(cot_text)
    example["extra_info"]["answer_tokens"] = count_tokens(ans_text)

    # example["cot_tokens"] = example["extra_info"]["cot_tokens"]
    # example["answer_tokens"] = example["extra_info"]["answer_tokens"]

    return example

ds = load_dataset(REPO_ID)
ds2 = ds.map(add_token_counts, desc="Add cot_tokens & answer_tokens")
ds2.push_to_hub(REPO_ID, private=True)

# Patch: add question id

In [None]:
import hashlib
from datasets import load_dataset

REPO_ID = "michaelw-cerebras/openthoughts3-math-gsm8kstyle"

def add_question_id(example):
    """Add question_id based on question hash"""
    question = example["extra_info"]["question"]
    question_id = hashlib.md5(question.encode()).hexdigest()[:16]
    
    example["extra_info"]["question_id"] = question_id
    
    return example

# Load, update, push
ds = load_dataset(REPO_ID)
ds_updated = ds.map(add_question_id, desc="Adding question_id")
ds_updated.push_to_hub(REPO_ID, private=True)

print("Done! question_id added to all splits.")

# Load from HF and generate Local Parquet for training

In [2]:
from datasets import load_dataset
from tqdm import tqdm
from datasets import Dataset
import os
import random
from collections import defaultdict

In [None]:
def deduplicate_by_question(sample_list, max_per_question=1, seed=2026):
    random.seed(seed)
    
    # Group by question_id
    question_groups = defaultdict(list)
    for sample in sample_list:
        q_id = sample["extra_info"]["question_id"]
        question_groups[q_id].append(sample)
    
    # Sample up to max_per_question from each group
    deduped_samples = []
    stats = {
        "total_questions": len(question_groups),
        "samples_before": len(sample_list),
        "samples_after": 0,
    }
    
    for q_id, samples in question_groups.items():
        if len(samples) <= max_per_question:
            # Keep all if less than max
            selected = samples
        else:
            # Randomly sample max_per_question
            selected = random.sample(samples, max_per_question)
        
        deduped_samples.extend(selected)
    
    stats["samples_after"] = len(deduped_samples)
    stats["reduction_rate"] = 1 - (stats["samples_after"] / stats["samples_before"])
    
    return deduped_samples, stats


# ========== Load Data ==========
openthoughts_math_train = load_dataset(
    "michaelw-cerebras/openthoughts3-math-gsm8kstyle",
    split="train",
    streaming=False,
)

openthoughts_math_val = load_dataset(
    "michaelw-cerebras/openthoughts3-math-gsm8kstyle",
    split="validation",
    streaming=False,
)

# ========== Filter by is_answer_int ==========
train_parquet_list, val_parquet_list = [], []

for sample in tqdm(openthoughts_math_train, desc="Filtering train"):
    if sample["extra_info"]["is_answer_int"]:
        train_parquet_list.append(sample)

for sample in tqdm(openthoughts_math_val, desc="Filtering val"):
    if sample["extra_info"]["is_answer_int"]:
        val_parquet_list.append(sample)

print(f"\n{'='*60}")
print(f"After is_answer_int filtering:")
print(f"  Train: {len(train_parquet_list)} samples")
print(f"  Val: {len(val_parquet_list)} samples")

# ========== Deduplication ==========
train_deduped, train_stats = deduplicate_by_question(
    train_parquet_list, 
    max_per_question=1,
    seed=2026,
)

val_deduped, val_stats = deduplicate_by_question(
    val_parquet_list, 
    max_per_question=1,
    seed=2026,
)

print(f"\n{'='*60}")
print(f"After deduplication (max 1 per question):")
print(f"\nTRAIN:")
print(f"  Unique questions: {train_stats['total_questions']}")
print(f"  Samples: {train_stats['samples_before']} → {train_stats['samples_after']}")
print(f"  Reduction: {train_stats['reduction_rate']:.1%}")

print(f"\nVAL:")
print(f"  Unique questions: {val_stats['total_questions']}")
print(f"  Samples: {val_stats['samples_before']} → {val_stats['samples_after']}")
print(f"  Reduction: {val_stats['reduction_rate']:.1%}")

# ========== Create Datasets ==========
train_parquet_ds = Dataset.from_list(train_deduped)
val_parquet_ds = Dataset.from_list(val_deduped)

print(f"\n{'='*60}")
print(f"Final dataset sizes:")
print(f"  Train: {len(train_parquet_ds)} samples")
print(f"  Val: {len(val_parquet_ds)} samples")

# ========== Save to Parquet ==========
import os
os.makedirs("local_parquet_dir", exist_ok=True)

train_parquet_ds.to_parquet(os.path.join("local_parquet_dir", "train.parquet"))
val_parquet_ds.to_parquet(os.path.join("local_parquet_dir", "test.parquet"))

print(f"\n✅ Saved to local_parquet_dir/")