In [None]:
import os
import sys
import pandas as pd
import numpy as np
import torch
from pickle import load

from transformers import Trainer, GPT2ForSequenceClassification

from MiCoGPT.utils.corpus import (
    SequenceClassificationDataset,
)
from MiCoGPT.utils.mgm_utils import eval_and_save

from argparse import Namespace
from configparser import ConfigParser


In [None]:
cfg = ConfigParser()
cfg.read("config.ini")  # 如果没有可以先跳过或改成其他路径

# 手动构造一个等价于命令行的 args 对象
args = Namespace(
    input="../data/try2_withCC/ResMicroDB_90338.pkl",
    model="../models/finetuned_model_ResMicroDB_90338",
    output="../outputs/predict_ResMicroDB_90338",
    evaluate=True,
)

args


In [None]:
# 1. 载入 corpus.pkl
corpus = load(open(args.input, "rb"))
tokenizer = corpus.tokenizer

print("样本数量（整个 corpus）:", len(corpus))
display(corpus.data.head())

meta = corpus.metadata

# 如果你的这个 pkl 已经是纯 B 组，其实可以去掉 Split_Group == "B" 这一条
if "Split_Group" in meta.columns:
    group_mask = (meta["Split_Group"] == "B")
else:
    # 如果没有这个列，就全 True，当成都属于 B 组
    group_mask = pd.Series(True, index=meta.index)

if args.evaluate:
    # 评估模式：只使用有标签的样本（Is_Healthy 非 NA）
    mask = group_mask & meta["Is_Healthy"].notna()
    print("用于评估的样本数:", mask.sum())
else:
    # 仅预测：可以对整个 B 组样本做预测（包括 Is_Healthy 为 NA 的）
    mask = group_mask
    print("用于预测的样本数:", mask.sum())

# 2. 根据 mask 取出样本的行索引（在 corpus.tokens 中的位置）
idx = np.where(mask.to_numpy())[0]

# 对应样本 ID（用于 y_score.csv 的 index）
sample_ids = corpus.data.index[mask]

print("前几个样本 ID:", sample_ids[:5])

# 3. 准备 input_ids 和 attention_mask
input_ids = corpus.tokens[idx]                      # shape: [N_subset, max_len]
pad_id = tokenizer.pad_token_id
attention_mask = (input_ids != pad_id).long()       # 0/1 tensor

# 4. 载入 label encoder（无论是否 evaluate，我们都需要它来确定类别顺序）
le = load(open(f"{args.model}/label_encoder.pkl", "rb"))    
num_labels = len(le.categories_[0])
print("类别数:", num_labels)
print("类别名称:", le.categories_[0])

# 5. 构造 labels_tensor
if args.evaluate:
    # 从 metadata 中提取 Is_Healthy 标签（与 mask 对齐）
    labels_series = meta.loc[mask, "Is_Healthy"]

    # 用训练时的 label encoder 做 transform，保证类别 index 一致
    labels_array = le.transform(labels_series.values.reshape(-1, 1)).toarray()
    labels_tensor = torch.tensor(labels_array.argmax(axis=1), dtype=torch.long)
else:
    # 仅预测时，Trainer 不一定需要 labels，这里构建一个 dummy 向量占位即可
    labels_tensor = torch.zeros(len(idx), dtype=torch.long)

# 6. 构建 SequenceClassificationDataset（用位置参数）
dataset = SequenceClassificationDataset(
    input_ids,
    attention_mask,
    labels_tensor,
)

print("Dataset 大小:", len(dataset))



In [None]:
# 这里 num_labels 已在上一个 chunk 里计算好了
model = GPT2ForSequenceClassification.from_pretrained(
    args.model,
    num_labels=num_labels,
)

model.eval()  # 进入 eval 模式

trainer = Trainer(model=model)
model


In [None]:
# 运行预测
predictions = trainer.predict(dataset)

# 确保输出目录存在
os.makedirs(args.output, exist_ok=True)

# 预测得分矩阵（样本数 × 类别数）
y_score = predictions.predictions

# 保存为 csv，index 对齐我们刚才筛选出的 sample_ids，列名来自 label encoder
score_path = os.path.join(args.output, "y_score.csv")
pd.DataFrame(
    y_score,
    index=sample_ids,          # 注意：是筛选后的样本 ID 子集
    columns=le.categories_[0], # 类别名来自 label encoder
).to_csv(score_path)

print("y_score 已保存到:", score_path)

# 如果需要 evaluation，就计算并保存
if args.evaluate:
    # Trainer.predict 会把 labels_tensor 作为 label_ids 原样返回
    y_true = predictions.label_ids  # shape: [N_subset]
    eval_dir = os.path.join(args.output, "evaluation")

    eval_and_save(
        y_score,
        y_true,
        le.categories_[0],
        eval_dir,
    )

    print("evaluation 结果已保存到:", eval_dir)
else:
    print("只做预测，没有 evaluation。记得根据输出得分设置你自己的阈值或判定规则。")
