In [13]:
# XGBoost 试验：按需现算 proxy 训练/验证
# 可配置：任务/搜索空间、特征列表（proxy + 可选架构哈希）、
# 训练采样（random），验证采样（random/percent/flops），batch_size/maxbatch/decoder_only 等。


In [14]:
import os, sys, time, random  # 基础库
from pathlib import Path  # 路径处理
import numpy as np  # 数值
import pandas as pd  # 表格
import torch  # 深度学习
from tqdm import tqdm  # 进度条

# xgboost 依赖
import xgboost as xgb  # 回归/排序模型

# 项目路径与导入（notebook 下无 __file__，用 cwd 回退）
try:
    CURRENT_DIR = Path(__file__).resolve().parent  # 脚本目录
except NameError:
    CURRENT_DIR = Path.cwd()  # notebook 回退为当前目录
ROOT_DIR = CURRENT_DIR.parent  # 项目根
NASLIB_ROOT = ROOT_DIR / "NASLib"  # NASLib 根
sys.path.insert(0, str(ROOT_DIR))  # 加入路径
sys.path.insert(0, str(NASLIB_ROOT))  # 加入 NASLib

from proxy_TransNAS.utils.load_model import (
    load_transbench_classes,  # 搜索空间类
    load_transbench_api,  # API 缓存
    make_train_loader,  # loader 构造
    truncate_loader,  # 截断 batch
    select_architectures_by_percentile,  # 百分位采样
    get_metric_name,  # metric 名
    set_op_indices_from_str,  # 写入 op_indices
)
from proxy_TransNAS.proxies.factory import compute_proxy_score  # 统一 proxy 调用
from proxy_TransNAS.proxies.zico import get_loss_fn  # 取 loss
from naslib import utils as nas_utils  # NASLib 工具

get_train_val_loaders = nas_utils.get_train_val_loaders  # 备用接口



In [15]:
# 配置区：按需修改
CFG = {
    # 训练集采样（目前仅 random）
    "train": {
        "sample_mode": "random",  # 仅支持 random
        "num_samples": 100,  # 训练采样数量
    },
    # 验证集采样方式：random / percent / flops
    "val": {
        "sample_mode": "random",  # 可改为 percent / flops
        "num_samples": 500,         # random 时有效
        "start_percent": 0.0,      # percent 时起点
        "end_percent": 10.0,       # percent 时终点
        "flops_csv": str(ROOT_DIR / "proxy_TransNAS" / "flops_lookup" / "flops_macro_autoencoder.csv"),  # flops 模式 CSV
        "start_arch_str": True,    # flops 模式起点架构
        "arch_count": 20,          # flops 模式数量
    },
    # 任务与搜索空间
    "tasks": ["autoencoder"],           # 可选: autoencoder / segmentsemantic / normal
    "search_space": "macro",          # macro / micro
    # 特征列表：可包含 proxies 中的任意名字，以及可选 "arch_hash"
    # "features": ["flops", "naswot", "swap", "zico", "fisher"],
    "features": ["flops", "fisher"],  # 当前使用的特征
    "use_arch_hash": False,             # 是否把架构码转数值特征
    # 计算相关配置
    "decoder_only": False,  # 仅 decoder
    "batch_size": 16,       # DataLoader batch
    "maxbatch": 2,          # 截断 batch 数
    "device": "cuda" if torch.cuda.is_available() else "cpu",  # 设备
    "seed": 42,             # 随机种子
}

random.seed(CFG["seed"])
np.random.seed(CFG["seed"])
torch.manual_seed(CFG["seed"])



<torch._C.Generator at 0x177df3b1bd0>

In [16]:
# 实用函数
TransBench101SearchSpaceMicro, TransBench101SearchSpaceMacro, graph_module = load_transbench_classes()  # 搜索空间类
Metric = graph_module.Metric  # metric 类

def arch_to_num(arch_str: str) -> float:
    """把架构串哈希为稳定的数值特征。"""
    return float(abs(hash(arch_str)) % (10**12)) / 1e12  # 归一化哈希

def sample_arch_strings(dataset_api, task: str, search_space: str, mode: str, cfg_val):
    api = dataset_api["api"]  # API 对象
    if mode == "random":
        pool = api.all_arch_dict[search_space]  # 架构池
        k = min(cfg_val["num_samples"], len(pool))  # 取样数量
        return random.sample(pool, k)  # 随机采样
    elif mode == "percent":
        return select_architectures_by_percentile(
            dataset_api, search_space, task, cfg_val["start_percent"], cfg_val["end_percent"],
        )  # 百分位采样
    elif mode == "flops":
        df = pd.read_csv(cfg_val["flops_csv"]).sort_values(by="flops").reset_index(drop=True)  # 读 flops CSV
        if cfg_val["start_arch_str"] is None:
            start_idx = 0  # 默认起点
        else:
            hit = df.index[df["arch_str"] == cfg_val["start_arch_str"]].tolist()  # 找起点
            assert len(hit) > 0, "start_arch_str 未找到"
            start_idx = hit[0]
        end_idx = start_idx + cfg_val["arch_count"]  # 终点
        return df.iloc[start_idx:end_idx]["arch_str"].tolist()  # 切片
    else:
        raise ValueError(f"未知采样模式: {mode}")


def build_graph(ss_name, task, arch_str):
    if ss_name == "micro":
        if task == "segmentsemantic":
            ss = TransBench101SearchSpaceMicro(dataset=task, create_graph=True, n_classes=17)  # seg 特殊类别数
        else:
            ss = TransBench101SearchSpaceMicro(dataset=task, create_graph=True)
    else:
        ss = TransBench101SearchSpaceMacro(dataset=task, create_graph=True)
    graph = ss.clone()  # 克隆
    graph = set_op_indices_from_str(ss_name, graph, arch_str)  # 写入架构
    graph.parse()  # 解析为 model
    return graph


def prepare_data_and_loss(task, device, batch_size, maxbatch):
    train_loader = make_train_loader(task, Path(CFG["train"].get("data_root", NASLIB_ROOT / "data")), batch_size, CFG["seed"])  # loader
    train_batches = truncate_loader(train_loader, maxbatch)  # 截断 batch
    loss_fn = get_loss_fn(task).to(device)  # 任务损失
    return train_batches, loss_fn


def compute_features_for_arch(model, arch_str, features, train_batches, loss_fn, device, decoder_only):
    feats = {}  # 存储特征
    for proxy_name in features:
        score = compute_proxy_score(model, proxy_name, train_batches, loss_fn, device, decoder_only=decoder_only)  # 计算 proxy
        feats[proxy_name] = float(score) if score is not None else 0.0  # 落地
    if CFG["use_arch_hash"]:
        feats["arch_hash"] = arch_to_num(arch_str)  # 加入哈希
    return feats


def collect_split(split_name: str, cfg_split, dataset_api, task, search_space, features, device, decoder_only, batch_size, maxbatch):
    arch_strings = sample_arch_strings(dataset_api, task, search_space, cfg_split["sample_mode"], cfg_split)  # 采样
    if len(arch_strings) == 0:
        print(f"[{split_name}] {task} 无样本，跳过")
        return pd.DataFrame()

    train_batches, loss_fn = prepare_data_and_loss(task, device, batch_size, maxbatch)  # 数据与损失

    rows = []
    for arch_str in tqdm(arch_strings, desc=f"[{split_name}-{task}]", unit="arch", disable=False, leave=False):
        try:
            graph = build_graph(search_space, task, arch_str)  # 构图
            model = graph.to(device)  # 上设备
            feats = compute_features_for_arch(model, arch_str, features, train_batches, loss_fn, device, decoder_only)  # 计算特征
            metric_name = get_metric_name(task)  # metric 名
            api = dataset_api["api"]
            gt = api.get_single_metric(arch_str, task, metric_name, mode="final")  # 取 GT
            feats.update({"arch_str": arch_str, "task": task, "gt": float(gt)})  # 补字段
            rows.append(feats)
        except RuntimeError as e:
            if "out of memory" in str(e):
                print(f"OOM 跳过 {arch_str}")
                torch.cuda.empty_cache()
                continue
            else:
                raise e
        finally:
            if 'model' in locals():
                model.cpu()
                del model
                torch.cuda.empty_cache()
    return pd.DataFrame(rows)



In [17]:
# 构建训练 / 验证数据
all_train = []  # 训练集列表
all_val = []    # 验证集列表

data_root = Path(NASLIB_ROOT / "data")  # 数据根

for task in CFG["tasks"]:  # 遍历任务
    dataset_api = load_transbench_api(data_root, task)  # 载入 API
    # train（仅 random）
    df_tr = collect_split(
        split_name="train",
        cfg_split=CFG["train"],
        dataset_api=dataset_api,
        task=task,
        search_space=CFG["search_space"],
        features=CFG["features"],
        device=CFG["device"],
        decoder_only=CFG["decoder_only"],
        batch_size=CFG["batch_size"],
        maxbatch=CFG["maxbatch"],
    )
    if len(df_tr):
        all_train.append(df_tr)
    # val（可 random/percent/flops）
    df_val = collect_split(
        split_name="val",
        cfg_split=CFG["val"],
        dataset_api=dataset_api,
        task=task,
        search_space=CFG["search_space"],
        features=CFG["features"],
        device=CFG["device"],
        decoder_only=CFG["decoder_only"],
        batch_size=CFG["batch_size"],
        maxbatch=CFG["maxbatch"],
    )
    if len(df_val):
        all_val.append(df_val)

train_df = pd.concat(all_train, ignore_index=True) if all_train else pd.DataFrame()  # 合并训练
val_df = pd.concat(all_val, ignore_index=True) if all_val else pd.DataFrame()        # 合并验证
print("train shape:", train_df.shape)
print("val shape:", val_df.shape)
train_df.head(), val_df.head()



                                                                        

train shape: (100, 5)
val shape: (500, 5)




(       flops    fisher         arch_str         task        gt
 0  22.399253  0.000025  64-441114-basic  autoencoder  0.502551
 1  23.204164  0.000024   64-14213-basic  autoencoder  0.569561
 2  22.676382  0.000018    64-3421-basic  autoencoder  0.540097
 3  21.865235  0.000022  64-414341-basic  autoencoder  0.423320
 4  21.432799  0.000020  64-334111-basic  autoencoder  0.497628,
        flops    fisher         arch_str         task        gt
 0  21.558844  0.000025  64-331124-basic  autoencoder  0.472741
 1  23.158980  0.000016  64-121331-basic  autoencoder  0.591058
 2  21.516149  0.000015   64-31431-basic  autoencoder  0.502346
 3  21.652650  0.000031  64-314321-basic  autoencoder  0.508427
 4  24.372371  0.000021  64-111242-basic  autoencoder  0.618440)

In [18]:
# 训练并验证 XGBoost 回归
from scipy.stats import kendalltau, spearmanr  # 相关性

def eval_rank(y_true, y_pred):
    kt = kendalltau(y_true, y_pred, nan_policy="omit").correlation  # Kendall
    sp = spearmanr(y_true, y_pred, nan_policy="omit").correlation  # Spearman
    return kt, sp

if len(train_df) == 0 or len(val_df) == 0:
    print("训练或验证数据为空，检查采样配置")
else:
    feature_cols = [c for c in train_df.columns if c not in ["gt", "arch_str", "task"]]  # 特征列
    X_train = train_df[feature_cols]
    y_train = train_df["gt"]
    X_val = val_df[feature_cols]
    y_val = val_df["gt"]

    model = xgb.XGBRegressor(
        n_estimators=500,       # 树数
        learning_rate=0.05,    # 学习率
        max_depth=8,           # 深度
        subsample=0.9,         # 行采样
        colsample_bytree=0.9,  # 列采样
        objective="reg:squarederror",  # 回归
        random_state=CFG["seed"],
    )
    model.fit(X_train, y_train)  # 训练

    val_pred = model.predict(X_val)  # 预测
    kt, sp = eval_rank(y_val, val_pred)
    mse = np.mean((val_pred - y_val) ** 2)
    print(f"val kendall={kt:.4f}, spearman={sp:.4f}, mse={mse:.4f}")

    # 对比单 proxy 的排名相关性
    print("\n单 proxy 对比 (val 集)：")
    for col in feature_cols:
        kt_single, sp_single = eval_rank(y_val, val_df[col])
        print(f"  {col:12s}  kendall={kt_single:.4f}, spearman={sp_single:.4f}")



val kendall=0.3221, spearman=0.4638, mse=0.0052

单 proxy 对比 (val 集)：
  flops         kendall=0.5728, spearman=0.7763
  fisher        kendall=-0.2821, spearman=-0.4162
