In [1]:
# Cell 1: imports & helpers

import os, json, math, random
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple

import numpy as np
import pandas as pd

from sklearn.neighbors import NearestNeighbors
from sentence_transformers import SentenceTransformer
import joblib
import torch

# ---------- config ----------
MODEL_PATH = "/root/autodl-tmp/EasyR1/ckpt/bge-m3"
DEVICE = "cuda"  # or "cpu"
BATCH_SIZE = 8
NORMALIZE_EMBEDDINGS = True

random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

# ---------- I/O ----------
def read_json_any(path: str) -> List[Dict[str, Any]]:
    p = Path(path)
    if p.suffix.lower() == ".jsonl":
        with p.open("r", encoding="utf-8") as f:
            return [json.loads(l) for l in f if l.strip()]
    elif p.suffix.lower() == ".json":
        with p.open("r", encoding="utf-8") as f:
            data = json.load(f)
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            for k in ["data", "items", "rows"]:
                if k in data and isinstance(data[k], list):
                    return data[k]
            return [data]
    else:
        raise ValueError(f"unsupported file type: {p.suffix}")

def get_by_dotted_key(obj: Dict[str, Any], dotted: str, default=None):
    cur = obj
    for part in dotted.split("."):
        if not isinstance(cur, dict) or part not in cur:
            return default
        cur = cur[part]
    return cur

def build_text(row: Dict[str, Any]) -> str:
    q = row.get("question", "") or ""
    a = row.get("answer", "") or ""
    return f"Q: {q}\nA: {a}".strip()

def build_text_from_row(row: Dict[str, Any], source: str) -> Optional[str]:
    """
    将不同来源/格式的样本转换为可用于嵌入的统一文本。
    source ∈ {"qa", "mcq", "cs", "paragraph", "tfq"}
    """
    if source == "qa":
        # 复用你已有的 QA 组装逻辑
        return build_text(row)

    if source == "mcq":
        # 直接使用 fine_mcq 字段原文
        return row.get("fine_mcq")

    if source == "cs":
        # 转换为 C: <category>\nS: <summarised_text>
        c = row.get("category")
        s = row.get("summarised_text")
        if c is None and "summarised_text" not in row:
            # 有些数据可能嵌套或字段名不同，可按需扩展 get_by_dotted_key
            c = get_by_dotted_key(row, "category", "")
            s = get_by_dotted_key(row, "summarised_text", "")
        if c is None or s is None:
            return None
        return f"C: {c}\nS: {s}"

    if source == "paragraph":
        # 直接取 paragraph 字段
        return row.get("paragraph")

    if source == "tfq":
        # 直接取 tfq_part（里面已含“...  Answer: True/False”）
        return row.get("tfq_part")
    print("Warning!!!!")
    return None
# ---------- embedding ----------
_model_cache = {}
def get_embedder(model_path: str = MODEL_PATH, device: str = DEVICE):
    key = (model_path, device)
    if key not in _model_cache:
        _model_cache[key] = SentenceTransformer(model_path, device=device)
    return _model_cache[key]

def encode_texts(texts: List[str],
                 model_path: str = MODEL_PATH,
                 device: str = DEVICE,
                 batch_size: int = BATCH_SIZE,
                 normalize_embeddings: bool = NORMALIZE_EMBEDDINGS) -> np.ndarray:
    model = get_embedder(model_path, device)
    emb = model.encode(
        texts, batch_size=batch_size, show_progress_bar=True,
        convert_to_numpy=True, normalize_embeddings=normalize_embeddings
    )
    return emb.astype(np.float32)

# ---------- Bayes 评分所需 ----------
class DiscreteIndexer:
    """把离散分值（如 1..5）映射到 0..C-1 的索引，并支持反向期望计算"""
    def __init__(self, values: np.ndarray):
        vals = np.sort(np.unique(values.astype(int)))
        self.values = vals
        self.val2idx = {int(v): i for i, v in enumerate(vals)}

    @property
    def n_classes(self) -> int:
        return len(self.values)

    def to_index(self, arr: np.ndarray) -> np.ndarray:
        out = np.zeros_like(arr, dtype=int)
        for i, v in enumerate(arr.astype(int)):
            out[i] = self.val2idx[int(v)]
        return out

    def to_value_expectation(self, post: np.ndarray) -> float:
        return float(np.dot(self.values.astype(float), post))

def estimate_T_from_B(B_emb: np.ndarray,
                      B_scores_idx: np.ndarray,
                      n_classes: int,
                      k_t: int = 2,
                      metric: str = "cosine",
                      smoothing: float = 1e-2) -> Tuple[np.ndarray, np.ndarray]:
    """
    在历史库 B 内部用 k_t-NN 共现估计 T（近似 P(obs=j | true=i)）与先验 p
    """
    nn = NearestNeighbors(n_neighbors=k_t+1, metric=metric)
    nn.fit(B_emb)
    _, idx = nn.kneighbors(B_emb, return_distance=True)

    C = np.full((n_classes, n_classes), smoothing, dtype=float)
    for center, neighs in enumerate(idx):
        ci = int(B_scores_idx[center])
        for nb in neighs[1:1+k_t]:  # 跳过自身
            cj = int(B_scores_idx[nb])
            C[ci, cj] += 1.0

    T = C / C.sum(axis=1, keepdims=True)
    counts = np.bincount(B_scores_idx, minlength=n_classes).astype(float) + 1e-8
    p = counts / counts.sum()
    return T, p

def bayes_from_hist(neigh_hist: np.ndarray, T: np.ndarray, p: np.ndarray) -> Tuple[np.ndarray, float]:
    """log p_i + sum_j hist_j * log T_{i,j}  →  softmax → 后验"""
    logp = np.log(p + 1e-12)
    logT = np.log(T + 1e-12)
    C = T.shape[0]
    post_log = np.zeros(C, dtype=float)
    for i in range(C):
        post_log[i] = logp[i] + float(np.dot(neigh_hist, logT[i]))
    m = post_log.max()
    post = np.exp(post_log - m); post /= post.sum()
    entropy = float(-np.sum(post * np.log(post + 1e-12)))
    return post, entropy

In [2]:
# Cell 2: build & persist library assets

DATA_PATH = "/root/autodl-tmp/EasyR1/examples/scored_raw2qa.jsonl"
OUT_DIR = "/root/autodl-tmp/EasyR1/knn_bayes_lib"              # 资产目录（可自定义）
METRIC = "cosine"                        # 最近邻度量
K_T = 2                                  # 估计 T 的 B 内部近邻数
K_FOR_ONLINE = 16                        # 在线检索时默认 K

os.makedirs(OUT_DIR, exist_ok=True)

# 1) 读取并提取文本 + 历史评分（Overall_compressed）
rows = read_json_any(DATA_PATH)
B_texts, B_scores, B_uids = [], [], []
for i, r in enumerate(rows):
    sc = get_by_dotted_key(r, "rating_detail.Overall_compressed", None)
    if sc is None:
        continue
    B_texts.append(build_text(r))
    B_scores.append(int(sc))
    B_uids.append(r.get("id", i))

B_scores = np.array(B_scores, dtype=int)
print(f"[INFO] B size = {len(B_scores)}; score classes = {np.unique(B_scores)}")

# 2) 嵌入
B_emb = encode_texts(B_texts)  # shape [NB, d]

# 3) 类别映射 + 估计 T, p
class_values = np.sort(np.unique(B_scores))
indexer = DiscreteIndexer(class_values)
B_idx = indexer.to_index(B_scores)
T, p = estimate_T_from_B(B_emb, B_idx, indexer.n_classes, k_t=K_T, metric=METRIC)

# 4) 拟合 KNN 索引（用于在线检索）
nn = NearestNeighbors(n_neighbors=K_FOR_ONLINE, metric=METRIC)
nn.fit(B_emb)

# 5) 保存资产
np.save(Path(OUT_DIR) / "B_emb.npy", B_emb)
np.save(Path(OUT_DIR) / "B_scores.npy", B_scores)
with open(Path(OUT_DIR) / "B_uids.jsonl", "w", encoding="utf-8") as f:
    for u in B_uids:
        f.write(json.dumps({"uid": u}, ensure_ascii=False) + "\n")

meta = {
    "model_path": MODEL_PATH,
    "device_hint": DEVICE,
    "normalize_embeddings": NORMALIZE_EMBEDDINGS,
    "metric": METRIC,
    "k_t": K_T,
    "k_default": K_FOR_ONLINE,
    "class_values": class_values.tolist(),
    "T": T.tolist(),
    "p": p.tolist(),
}
with open(Path(OUT_DIR) / "meta.json", "w", encoding="utf-8") as f:
    json.dump(meta, f, ensure_ascii=False, indent=2)

joblib.dump(nn, Path(OUT_DIR) / "nn_index.pkl")
print(f"[OK] assets saved to: {OUT_DIR}")

[INFO] B size = 22037; score classes = [0 2 3 4 5]


Batches:   0%|          | 0/1378 [00:00<?, ?it/s]

[OK] assets saved to: /root/autodl-tmp/EasyR1/knn_bayes_lib


In [3]:
# Cell 3: load assets & define online scoring

LIB_DIR = "./knn_bayes_lib"  # 与上一步 OUT_DIR 保持一致
assert Path(LIB_DIR).exists(), "library dir not found"

# 1) 加载资产
B_emb = np.load(Path(LIB_DIR) / "B_emb.npy")
B_scores = np.load(Path(LIB_DIR) / "B_scores.npy")
with open(Path(LIB_DIR) / "meta.json", "r", encoding="utf-8") as f:
    meta = json.load(f)

class_values = np.array(meta["class_values"], dtype=int)
indexer = DiscreteIndexer(class_values)
B_idx = indexer.to_index(B_scores)

T = np.array(meta["T"], dtype=float)
p = np.array(meta["p"], dtype=float)
METRIC = meta["metric"]
K_DEFAULT = int(meta.get("k_default", 16))

# 加载 KNN 索引（如不存在可临时拟合）
if Path(LIB_DIR, "nn_index.pkl").exists():
    nn = joblib.load(Path(LIB_DIR, "nn_index.pkl"))
else:
    nn = NearestNeighbors(n_neighbors=K_DEFAULT, metric=METRIC)
    nn.fit(B_emb)

# 2) 在线评分函数
def score_with_bayes(new_samples: List[Dict[str, Any]],
                     k: int = K_DEFAULT,
                     return_neighbors: int = 0) -> pd.DataFrame:
    """
    new_samples: 每个元素至少包含 title/question/answer 字段之一；或你自己拼好 text 放在 'text'
    k: 检索近邻数
    return_neighbors: 若 >0 则返回每条的前 n 个邻居（uid/score/相似度）
    """
    # 组装文本
    texts = []
    for r in new_samples:
        if "text" in r and r["text"]:
            texts.append(r["text"])
        else:
            texts.append(build_text(r))

    # 向量化（使用与库一致的模型/规范化）
    X = encode_texts(texts)

    # 检索
    dists, idx = nn.kneighbors(X, n_neighbors=k, return_distance=True)

    # 对每条样本做直方图 → Bayes 后验
    results = []
    for i, (row_d, row_i) in enumerate(zip(dists, idx)):
        neigh_cls_idx = B_idx[row_i]
        hist = np.bincount(neigh_cls_idx, minlength=indexer.n_classes).astype(float)
        post, H = bayes_from_hist(hist, T, p)
        # 离散预测（后验众数） & 连续期望
        pred_mode = int(indexer.values[np.argmax(post)])
        pred_exp = float(indexer.to_value_expectation(post))

        rec = {
            "sample_id": i,
            "bayes_score": pred_mode,     # 离散分（推荐用于业务一致性）
            "bayes_exp": pred_exp,        # 连续期望（可做细粒度统计）
            "bayes_entropy": H,           # 不确定性
        }

        if return_neighbors > 0:
            # 近邻相似度（cosine 距离下，用 1 - d 近似相似度）
            sims = 1.0 - row_d
            topn = min(return_neighbors, len(row_i))
            neigh_info = []
            for j in range(topn):
                neigh_info.append({
                    "lib_index": int(row_i[j]),
                    "lib_score": int(B_scores[row_i[j]]),
                    "sim": float(sims[j])
                })
            rec["neighbors"] = neigh_info

        results.append(rec)

    return pd.DataFrame(results)

In [4]:
# Cell 4: example usage

new_samples = [
    {
        "title": "Sibley's and James Store Historic District",
        "question": "How does the architectural connection between the Sibley Brothers General Store and the Old Thomas James Store reflect historical continuity?",
        "answer": "The hyphen linking the 1899 Sibley Brothers store and the c.1810 Old Thomas James Store manifests continuity in Virginia’s small-town commerce..."
    },
    # 你可以继续追加更多样本；也可以直接传 {"text": "..."} 形式
]

df_pred = score_with_bayes(new_samples, k=16, return_neighbors=5)
df_pred

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,sample_id,bayes_score,bayes_exp,bayes_entropy,neighbors
0,0,4,3.979516,0.152872,"[{'lib_index': 22036, 'lib_score': 4, 'sim': 0..."


In [1]:
import json
import os

in_path = "/root/autodl-tmp/EasyR1/sft/data/qa/merged_train_qa_noisy.json"
out_dir = "/root/autodl-tmp/EasyR1/sft/data/qa"

# 读入原始数据
with open(in_path, "r", encoding="utf-8") as f:
    data = json.load(f)

# 计算切分点
ratio = int(len(data) * 0.3)

# 前 30%
data_30 = data[:ratio]
# 剩下 70%
data_70 = data[ratio:]

# 写出文件
out_path_30 = os.path.join(out_dir, "merged_train_qa_noisy_30.json")
out_path_70 = os.path.join(out_dir, "merged_train_qa_noisy_70.json")

with open(out_path_30, "w", encoding="utf-8") as f:
    json.dump(data_30, f, ensure_ascii=False, indent=2)

with open(out_path_70, "w", encoding="utf-8") as f:
    json.dump(data_70, f, ensure_ascii=False, indent=2)

print(f"Saved {len(data_30)} samples to {out_path_30}")
print(f"Saved {len(data_70)} samples to {out_path_70}")

Saved 9332 samples to /root/autodl-tmp/EasyR1/sft/data/qa/merged_train_qa_noisy_30.json
Saved 21776 samples to /root/autodl-tmp/EasyR1/sft/data/qa/merged_train_qa_noisy_70.json


In [2]:
import json, random
from pathlib import Path

# ==== 参数区 ====
src_path = Path("/root/autodl-tmp/EasyR1/sft/data/qa/merged_train_qa_noisy_70.json")
test_ratio = 0.1
seed = 42
drop_instruction = True   # 如果要删除 instruction 字段，就设为 True
# =================

# 读取数据
with src_path.open("r", encoding="utf-8") as f:
    data = json.load(f)

# 合并 instruction -> input
processed = []
for item in data:
    instr = (item.get("instruction") or "").strip()
    inp   = (item.get("input") or "").strip()
    if instr and inp:
        new_input = f"{instr}\n\n{inp}"
    else:
        new_input = instr or inp
    item["input"] = new_input
    if drop_instruction:
        item.pop("instruction", None)
    processed.append(item)

# 划分训练/测试
rng = random.Random(seed)
rng.shuffle(processed)
n = len(processed)
n_test = max(1, int(n * test_ratio))
test_set = processed[:n_test]
train_set = processed[n_test:]

# 输出文件路径
out_dir = src_path.parent
stem = src_path.stem
train_out = out_dir / f"{stem}.train.json"
test_out  = out_dir / f"{stem}.test.json"

# 保存
with train_out.open("w", encoding="utf-8") as f:
    json.dump(train_set, f, ensure_ascii=False, indent=2)
with test_out.open("w", encoding="utf-8") as f:
    json.dump(test_set, f, ensure_ascii=False, indent=2)

print(f"总样本数: {n}")
print(f"训练集: {len(train_set)} -> {train_out}")
print(f"测试集: {len(test_set)} -> {test_out}")

总样本数: 21776
训练集: 19599 -> /root/autodl-tmp/EasyR1/sft/data/qa/merged_train_qa_noisy_70.train.json
测试集: 2177 -> /root/autodl-tmp/EasyR1/sft/data/qa/merged_train_qa_noisy_70.test.json
