# 构造 `VeRL` 可用的 RL 数据

RL 训练时用到的 3 个 Dataset：

- `SidDataset`
- `RLTitle2SidDataset`
- `RLSeqTitle2SidDataset`

MiniOneRec 这些 Dataset 本来返回的是 `input_ids/labels`。
现在改成

目标输出：
- `train.parquet`
- `val.parquet`

每条样本（row）建议字段（schema）如下：
- `data_source`: string（子任务名，如 `sid` / `title2sid` / `seq_title2sid`）  
- `prompt`: list[dict]（HF chat template 消息列表：`[{role, content}, ...]`）  
- `ability`: string（任务大类，如 `rec`）  
- `reward_model`: dict（至少包含 `ground_truth`；也可带 `style` 等）  
- `extra_info`: dict（你想透传到 reward function 的额外信息）

> 在 veRL 的自定义 reward 中，会拿到：`data_source, solution_str, ground_truth, extra_info`，因此务必把 reward 所需信息放进 `reward_model.ground_truth` 或 `extra_info`。


In [1]:
import os, sys, json, random
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple

import pandas as pd
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM

import torch

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from data import (
    SidDataset, 
    RLTitle2SidDataset, 
    RLSeqTitle2SidDataset
)

In [3]:
base_model="../llms/Qwen/Qwen3-0.6B"
SFT_DATA_ROOT="./data/Amazon2018"
BSZ=32
MICRO_BSZ=4

seed=42
cutoff_len=612
sample=-1

category="Industrial_and_Scientific"  # "Office_Products"   
CATEGORY=category

category_dict = {"Industrial_and_Scientific": "industrial and scientific items", "Office_Products": "office products", "Toys_and_Games": "toys and games", "Sports": "sports and outdoors", "Books": "books"}

train_file=f"{SFT_DATA_ROOT}/{CATEGORY}/train/{CATEGORY}_convert.csv"
eval_file=f"{SFT_DATA_ROOT}/{CATEGORY}/valid/{CATEGORY}_convert.csv"
test_file=f"{SFT_DATA_ROOT}/{CATEGORY}/test/{CATEGORY}_convert.csv"
info_file=f"{SFT_DATA_ROOT}/{CATEGORY}/info/{CATEGORY}_convert.txt"

sid_index_path=f"{SFT_DATA_ROOT}/{CATEGORY}/{CATEGORY}.index.json"
item_meta_path=f"{SFT_DATA_ROOT}/{CATEGORY}/{CATEGORY}.item.json"

In [4]:
train_data1 = SidDataset(train_file, category=category_dict[category], sample=sample)

train_data2 = RLTitle2SidDataset(item_file=item_meta_path, index_file=sid_index_path, category=category_dict[category], sample=sample)

train_data3 = RLSeqTitle2SidDataset(train_file, category=category_dict[category], sample=10000)

# train_data = ConcatDataset(train_datasets)
eval_data = SidDataset(eval_file, category=category_dict[category], sample=sample)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  row['history_item_sid'] = eval(row['history_item_sid'])
100%|██████████| 33185/33185 [00:03<00:00, 9184.86it/s]
100%|██████████| 6059/6059 [00:00<00:00, 689678.90it/s]
100%|██████████| 10000/10000 [00:00<00:00, 15744.30it/s]
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  row['history_item_sid'] = eval(row['history_item_sid'])
100%|██████████| 4148/4148 [00:00<00:00, 8883.53it/s]


In [5]:
print(train_data1.pre_verl(1))
print(train_data2.pre_verl(1))
print(train_data3.pre_verl(1))
print(eval_data.pre_verl(1))

{'prompt': 'The user has interacted with items <a_40><b_116><c_254>, <a_120><b_233><c_163> in chronological order. Can you predict the next possible item that the user may expect?', 'ground_truth': '<a_120><b_135><c_223>'}
{'prompt': 'Which item has the title: Stanley TRA708T Sharpshooter 1/2-Inch Leg Length Staples, Steel (1000 Count)?', 'ground_truth': '<a_191><b_110><c_145>'}
{'prompt': 'Given the title sequence of user historical interactive items: "Fieldoor 44 Pound Strong Neodymium Indoor/Outdoor Magnet Hook, Heavy Duty Magnetic Hooks (4 Pack)", "Dewalt DW8061B5 4 x 0.045 Inch Metal and Stainless Steel Cutting Wheels , 4 Inch by 0.045-Inch", can you recommend a suitable next item for the user?', 'ground_truth': '<a_26><b_51><c_108>'}
{'prompt': 'The user has interacted with items <a_114><b_187><c_20>, <a_49><b_196><c_211>, <a_202><b_124><c_184>, <a_107><b_144><c_103> in chronological order. Can you predict the next possible item that the user may expect?', 'ground_truth': '<a_75>

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  row['history_item_sid'] = eval(row['history_item_sid'])


In [6]:
train_rl_sid = train_data1.get_verl()
print(len(train_rl_sid))

train_rl_title2sid = train_data2.get_verl()
print(len(train_rl_title2sid))

train_rl_seqtitle2sid = train_data3.get_verl()
print(len(train_rl_seqtitle2sid))

eval_rl_sid = eval_data.get_verl()
print(len(eval_rl_sid))

100%|██████████| 33185/33185 [00:03<00:00, 8939.68it/s]


33185


100%|██████████| 6059/6059 [00:00<00:00, 923817.22it/s]


6059


100%|██████████| 10000/10000 [00:00<00:00, 15387.25it/s]


10000


100%|██████████| 4148/4148 [00:00<00:00, 8801.57it/s]

4148





In [7]:
from pathlib import Path
import datasets

OUT_DIR = Path("./rl_data")
OUT_DIR.mkdir(parents=True, exist_ok=True)

SYSTEM_MSG = None   # 需要 system 就写字符串，不需要就 None
ABILITY = "rec"

train_sid_path = OUT_DIR / "train_sid.parquet"
train_title2sid_path = OUT_DIR / "train_title2sid.parquet"
train_seqtitle2sid_path = OUT_DIR / "train_seqtitle2sid.parquet"
val_sid_path = OUT_DIR / "val_sid.parquet"

print("OUT_DIR:", OUT_DIR.resolve())

OUT_DIR: /mnt/sdb1/sdb1_xiaojinsong/tiny-onerec/rl_data


In [16]:
jobs = [
    {
        "name": "train_sid",
        "data": train_rl_sid,
        "data_source": "SidDataset",
        "out_path": OUT_DIR / "train_sid.parquet",
        "shuffle": True,
        "print_example_idx": 1,
    },
    {
        "name": "train_title2sid",
        "data": train_rl_title2sid,
        "data_source": "RLTitle2SidDataset",
        "out_path": OUT_DIR / "train_title2sid.parquet",
        "shuffle": True,
        "print_example_idx": 1,
    },
    {
        "name": "train_seqtitle2sid",
        "data": train_rl_seqtitle2sid,
        "data_source": "RLSeqTitle2SidDataset",
        "out_path": OUT_DIR / "train_seqtitle2sid.parquet",
        "shuffle": True,
        "print_example_idx": 1,
    },
    {
        "name": "val_sid",
        "data": eval_rl_sid,
        "data_source": "SidDataset",
        "out_path": OUT_DIR / "val_sid.parquet",
        "shuffle": False,
        "print_example_idx": 1,
    },
]

In [18]:
saved_datasets = {}

for job in jobs:
    rows = []
    for i, ex in enumerate(job["data"]):
        prompt = ex["prompt"]
        gt = ex["ground_truth"]
        extra = {k: v for k, v in ex.items() if k not in ["prompt", "ground_truth"]}

        rows.append({
            "data_source": job["data_source"],
            "prompt": [{"role": "user", "content": prompt}],
            "ability": ABILITY,
            "reward_model": {"ground_truth": gt, "style": "rule"},
            "extra_info": {**extra, "index": i},
        })

    ds = datasets.Dataset.from_list(rows)
    if job["shuffle"]:
        ds = ds.shuffle(seed=42)

    # save!!
    ds.to_parquet(str(job["out_path"]))
    saved_datasets[job["name"]] = ds
    print(f"✅ saved: {job['out_path'].resolve()}  len={len(ds)}")
    
    print("schema:", ds.features)

    if job["print_example_idx"] is not None:
        idx = job["print_example_idx"]
        if 0 <= idx < len(rows):
            print(f"\nPretty sample ({job['name']}[{idx}]):")
            print(json.dumps(rows[idx], ensure_ascii=False, indent=2))
        print("-" * 80)

# ====== schema 一致性校验（强烈建议保留）======
features = [ds.features for ds in saved_datasets.values()]
assert all(f == features[0] for f in features), "❌ schema 不一致！请检查某个数据集字段类型/缺失"
print("✅ all schemas match across 4 parquet files")

Creating parquet from Arrow format: 100%|██████████| 1/1 [00:01<00:00,  1.30s/ba]


✅ saved: /mnt/sdb1/sdb1_xiaojinsong/tiny-onerec/rl_data/train_sid.parquet  len=33185
schema: {'data_source': Value('string'), 'prompt': List({'content': Value('string'), 'role': Value('string')}), 'ability': Value('string'), 'reward_model': {'ground_truth': Value('string'), 'style': Value('string')}, 'extra_info': {'index': Value('int64')}}

Pretty sample (train_sid[1]):
{
  "data_source": "SidDataset",
  "prompt": [
    {
      "role": "user",
      "content": "The user has interacted with items <a_40><b_116><c_254>, <a_120><b_233><c_163> in chronological order. Can you predict the next possible item that the user may expect?"
    }
  ],
  "ability": "rec",
  "reward_model": {
    "ground_truth": "<a_120><b_135><c_223>",
    "style": "rule"
  },
  "extra_info": {
    "index": 1
  }
}
--------------------------------------------------------------------------------


Creating parquet from Arrow format: 100%|██████████| 1/1 [00:00<00:00,  2.58ba/s]


✅ saved: /mnt/sdb1/sdb1_xiaojinsong/tiny-onerec/rl_data/train_title2sid.parquet  len=6059
schema: {'data_source': Value('string'), 'prompt': List({'content': Value('string'), 'role': Value('string')}), 'ability': Value('string'), 'reward_model': {'ground_truth': Value('string'), 'style': Value('string')}, 'extra_info': {'index': Value('int64')}}

Pretty sample (train_title2sid[1]):
{
  "data_source": "RLTitle2SidDataset",
  "prompt": [
    {
      "role": "user",
      "content": "Which item has the title: Stanley TRA708T Sharpshooter 1/2-Inch Leg Length Staples, Steel (1000 Count)?"
    }
  ],
  "ability": "rec",
  "reward_model": {
    "ground_truth": "<a_191><b_110><c_145>",
    "style": "rule"
  },
  "extra_info": {
    "index": 1
  }
}
--------------------------------------------------------------------------------


Creating parquet from Arrow format: 100%|██████████| 1/1 [00:00<00:00,  2.65ba/s]


✅ saved: /mnt/sdb1/sdb1_xiaojinsong/tiny-onerec/rl_data/train_seqtitle2sid.parquet  len=10000
schema: {'data_source': Value('string'), 'prompt': List({'content': Value('string'), 'role': Value('string')}), 'ability': Value('string'), 'reward_model': {'ground_truth': Value('string'), 'style': Value('string')}, 'extra_info': {'index': Value('int64')}}

Pretty sample (train_seqtitle2sid[1]):
{
  "data_source": "RLSeqTitle2SidDataset",
  "prompt": [
    {
      "role": "user",
      "content": "Given the title sequence of user historical interactive items: \"Fieldoor 44 Pound Strong Neodymium Indoor/Outdoor Magnet Hook, Heavy Duty Magnetic Hooks (4 Pack)\", \"Dewalt DW8061B5 4 x 0.045 Inch Metal and Stainless Steel Cutting Wheels , 4 Inch by 0.045-Inch\", can you recommend a suitable next item for the user?"
    }
  ],
  "ability": "rec",
  "reward_model": {
    "ground_truth": "<a_26><b_51><c_108>",
    "style": "rule"
  },
  "extra_info": {
    "index": 1
  }
}
--------------------------

Creating parquet from Arrow format: 100%|██████████| 1/1 [00:00<00:00, 171.27ba/s]

✅ saved: /mnt/sdb1/sdb1_xiaojinsong/tiny-onerec/rl_data/val_sid.parquet  len=4148
schema: {'data_source': Value('string'), 'prompt': List({'content': Value('string'), 'role': Value('string')}), 'ability': Value('string'), 'reward_model': {'ground_truth': Value('string'), 'style': Value('string')}, 'extra_info': {'index': Value('int64')}}

Pretty sample (val_sid[1]):
{
  "data_source": "SidDataset",
  "prompt": [
    {
      "role": "user",
      "content": "The user has interacted with items <a_114><b_187><c_20>, <a_49><b_196><c_211>, <a_202><b_124><c_184>, <a_107><b_144><c_103> in chronological order. Can you predict the next possible item that the user may expect?"
    }
  ],
  "ability": "rec",
  "reward_model": {
    "ground_truth": "<a_75><b_203><c_10>",
    "style": "rule"
  },
  "extra_info": {
    "index": 1
  }
}
--------------------------------------------------------------------------------
✅ all schemas match across 4 parquet files





In [10]:
# OUT_DIR = Path("./rl_data")   # <<< 改这里
# OUT_DIR.mkdir(parents=True, exist_ok=True)
# saved_paths = {}

# # train
# for name, ds in DATASETS_TRAIN.items():
#     out_path = OUT_DIR / f"{name}.jsonl"

#     print(f"[OK] {name}: -> {out_path}")

#     with open(out_path, "w", encoding="utf-8") as f:
#         for ex in ds:
#             ex["instruction"] = ex["instruction"].strip()
#             ex["input"] = ex["input"].strip()
#             ex["output"] = ex["output"].strip()
#             f.write(json.dumps(ex, ensure_ascii=False) + "\n")
# # eval
# for name, ds in DATASETS_EVAL.items():
#     out_path = OUT_DIR / f"{name}.jsonl"
#     print(f"[OK] {name}: -> {out_path}")
#     with open(out_path, "w", encoding="utf-8") as f:
#         for ex in ds:
#             ex["instruction"] = ex["instruction"].strip()
#             ex["input"] = ex["input"].strip()
#             ex["output"] = ex["output"].strip()
#             f.write(json.dumps(ex, ensure_ascii=False) + "\n")