这是一个非常好的计划。在 Colab 上实现这个最小验证实验（MVES）是验证我们理论可行性的最快路径。

为了保证代码的**可读性**、**可扩展性**和**可复现性**，我将采用以下策略来构建这个 Notebook：

1.  **`transformers` 优先**：我们将尽可能使用 `transformers` 库的 `LogitsProcessor` API。这是一个专门用于在 `model.generate()` 过程中修改 logits 的标准接口，非常适合水印嵌入。
2.  **效率与简洁的平衡**：LSH-GHW 方案需要在*生成时*获取*内部*的路由权重（RW）。我们将使用一种“预计算 (pre-pass)” 的方式在 `LogitsProcessor` 内部实现这一点。这虽然会轻微增加计算量（在 MVES 阶段可接受），但能使我们的代码**极度模块化**，并复用 `transformers` 强大的 `generate()` 函数。
3.  **配置驱动**：所有实验参数（$L, m_e, \delta$ 等）都将集中在一个配置对象中，方便快速迭代和扩展。

**注意**：此代码是一个完整的、可运行的 Colab 示例。您需要一个**有 T4 GPU**的 Colab 运行时来加速 MoE 模型的推理。

-----

### Colab Notebook：MoE 水印最小验证实验 (MVES)

In [None]:
# @title 0. 安装与环境设置
# @markdown (运行此单元格以安装所需库)
!pip install transformers datasets torch scipy sentencepiece accelerate bitsandbytes -q

import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader

from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    LogitsProcessor,
    LogitsProcessorList,
    T5ForConditionalGeneration  # 我们将使用 Switch-T5 的基类
)
from datasets import load_dataset
from dataclasses import dataclass
from typing import List, Dict, Tuple
import numpy as np
from scipy.stats import norm
import pandas as pd
import warnings

# 忽略 transformers 的一些已知警告
warnings.filterwarnings("ignore", category=UserWarning)

# 设置设备
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# 为可复现性设置随机种子
torch.manual_seed(42)
np.random.seed(42)

Using device: cuda


In [None]:
# @title 1. 实验核心配置 (Config)
# @markdown (所有实验参数都在这里定义，便于扩展和迭代)

@dataclass
class MVESConfig:
    # --- 模型与数据配置 ---
    MODEL_ID: str = "google/switch-base-8"  # MVES 关键：使用一个公开的 MoE 模型
    DATASET_NAME: str = "c4"
    DATASET_SPLIT: str = "validation"
    NUM_SAMPLES: int = 50           # MVES 关键：快速迭代，使用小样本量
    PROMPT_LENGTH: int = 50         # 续写任务的上下文长度
    GEN_LENGTH: int = 100           # 生成文本的长度

    # --- KGW 基线配置 ---
    KGW_GAMMA: float = 0.5          # 绿表大小比例
    KGW_DELTA: float = 2.0          # 绿表 logits 提升强度

    # --- LSH-GHW (我们的方案) 配置 ---
    LSH_L: int = 32                 # L: LSH 签名长度
    LSH_ME: int = 1                 # m_e: 每个专家池的平均大小
    LSH_MT: int = 200               # m_t: 每个词汇池的大小
    LSH_DELTA_MAX: float = 2.0      # delta_max: 最大水印强度
    # tau_low: 激活熵阈值，将在 "校准" 步骤中动态设置
    LSH_TAU_LOW: float = 0.0

    # --- 辅助参数 ---
    BATCH_SIZE: int = 4
    VOCAB_SIZE: int = 0             # 将在加载 tokenizer 后设置
    NUM_EXPERTS: int = 0            # 将在加载 model 后设置
    RW_DIM: int = 0                 # 将在加载 model 后设置

# 实例化配置
config = MVESConfig()

In [None]:
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader

from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    LogitsProcessor,
    LogitsProcessorList,
    T5ForConditionalGeneration  # 我们将使用 Switch-T5 的基类
)
from datasets import load_dataset
from dataclasses import dataclass
from typing import List, Dict, Tuple
import numpy as np
from scipy.stats import norm
import pandas as pd
import warnings

# 忽略 transformers 的一些已知警告
warnings.filterwarnings("ignore", category=UserWarning)

# 设置设备
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# 为可复现性设置随机种子
torch.manual_seed(42)
np.random.seed(42)


# @title 1. 实验核心配置 (Config)
# @markdown (所有实验参数都在这里定义，便于扩展和迭代)

@dataclass
class MVESConfig:
    # --- 模型与数据配置 ---
    MODEL_ID: str = "google/switch-base-8"  # MVES 关键：使用一个公开的 MoE 模型
    DATASET_NAME: str = "wikitext" # Modified: Changed from c4 to wikitext
    DATASET_CONFIG: str = "wikitext-2-raw-v1" # Modified: Changed config for wikitext
    DATASET_SPLIT: str = "validation"
    NUM_SAMPLES: int = 50           # MVES 关键：快速迭代，使用小样本量
    PROMPT_LENGTH: int = 50         # 续写任务的上下文长度
    GEN_LENGTH: int = 100           # 生成文本的长度

    # --- KGW 基线配置 ---
    KGW_GAMMA: float = 0.5          # 绿表大小比例
    KGW_DELTA: float = 2.0          # 绿表 logits 提升强度

    # --- LSH-GHW (我们的方案) 配置 ---
    LSH_L: int = 32                 # L: LSH 签名长度
    LSH_ME: int = 1                 # m_e: 每个专家池的平均大小
    LSH_MT: int = 200               # m_t: 每个词汇池的大小
    LSH_DELTA_MAX: float = 2.0      # delta_max: 最大水印强度
    # tau_low: 激活熵阈值，将在 "校准" 步骤中动态设置
    LSH_TAU_LOW: float = 0.0

    # --- 辅助参数 ---
    BATCH_SIZE: int = 4
    VOCAB_SIZE: int = 0             # 将在加载 tokenizer 后设置
    NUM_EXPERTS: int = 0            # 将在加载 model 后设置
    RW_DIM: int = 0                 # 将在加载 model 后设置

# 实例化配置
config = MVESConfig()


# @title 2. 加载模型、Tokenizer 与数据集

def load_model_and_data(config):
    """加载 MoE 模型、Tokenizer 和数据集"""
    print(f"Loading tokenizer: {config.MODEL_ID}")
    tokenizer = AutoTokenizer.from_pretrained(config.MODEL_ID)

    print(f"Loading model: {config.MODEL_ID}")
    # 使用 8-bit 加载以节省 Colab 内存
    model = AutoModelForSeq2SeqLM.from_pretrained(
        config.MODEL_ID,
        device_map="auto",
        load_in_8bit=True,
    )
    model.eval()

    # 更新配置中的模型特定参数
    config.VOCAB_SIZE = model.config.vocab_size
    # Switch-T5 (base-8) 在解码器中有 8 个专家
    config.NUM_EXPERTS = model.config.num_experts # 修正: 使用 num_experts
    # 路由器的输入维度。为了与LSH哈希的R_i维度匹配，这里将其设置为专家的数量。
    config.RW_DIM = config.NUM_EXPERTS # FIX: Align RW_DIM with NUM_EXPERTS for LSH hashing

    print(f"Model loaded. Vocab size: {config.VOCAB_SIZE}, Num Experts: {config.NUM_EXPERTS}, RW Dim: {config.RW_DIM}")

    print("Loading data...")
    dataset = load_dataset(config.DATASET_NAME, config.DATASET_CONFIG, split=config.DATASET_SPLIT)
    prompts = []
    for item in dataset.take(config.NUM_SAMPLES):
        # 对文本进行编码和解码，确保 prompt 长度一致
        text = item['text'] # The key 'text' is common for wikitext datasets as well
        tokens = tokenizer(text, return_tensors="pt", max_length=config.PROMPT_LENGTH, truncation=True).input_ids
        if tokens.shape[1] == config.PROMPT_LENGTH:
            prompts.append(tokenizer.decode(tokens[0], skip_special_tokens=True))
        if len(prompts) >= config.NUM_SAMPLES:
            break

    print(f"Loaded {len(prompts)} prompts.")
    return model, tokenizer, prompts

model, tokenizer, prompts = load_model_and_data(config)

Using device: cuda
Loading tokenizer: google/switch-base-8


The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


Loading model: google/switch-base-8
Model loaded. Vocab size: 32128, Num Experts: 8, RW Dim: 8
Loading data...
Loaded 20 prompts.


In [None]:
# @title 3. 辅助函数 (Z-Score & 熵)

def z_score(num_hits, num_scored, p0):
    """计算 Z-score (标准 KGW 检测统计量)"""
    if num_scored == 0:
        return 0.0

    n = num_scored
    x = num_hits

    # 避免除以零
    if n * p0 * (1 - p0) == 0:
        return 0.0

    # z = (x - n*p0) / sqrt(n*p0*(1-p0))
    z = (x - n * p0) / np.sqrt(n * p0 * (1 - p0))
    return z

def entropy(logits):
    """计算 logits 分布的熵 H(r(x))"""
    probs = F.softmax(logits, dim=-1)
    log_probs = F.log_softmax(logits, dim=-1)
    return -torch.sum(probs * log_probs, dim=-1)

def get_router_states(model, input_ids):
    """
    (MVES 关键) "预计算" 步骤：获取内部路由状态。
    这是一个轻量级的前向传播，只为了获取路由信息。
    """
    with torch.no_grad():
        outputs = model(
            input_ids=input_ids,
            decoder_input_ids=input_ids, # 简化的 T5 推理
            output_router_logits=True
        )

        # Access the router logits for the last MoE layer in the decoder.
        # This typically returns a tuple of tuples: ((tensor(batch, seq_len, num_experts),),)
        # We want the actual tensor, which is outputs.decoder_router_logits[0][0].
        router_logits_tensor = outputs.decoder_router_logits[0][0].float()

        # We need the router logits for the *last token* in the sequence.
        # The input_ids are (batch_size, current_sequence_length).
        # router_logits_tensor is (batch_size, current_sequence_length, num_experts).
        # We extract the last token's router logits for all items in the batch.
        # Since model.generate uses batch_size=1, we expect router_logits_tensor to be (1, current_sequence_length, num_experts).
        # So, we want the logits for the last token, and then squeeze the batch dimension.
        router_logits_for_current_step = router_logits_tensor[0, -1, :] # Shape: (num_experts,)

        # 1. Router Weights (RW) vector R_i
        R_i = router_logits_for_current_step.cpu()

        # 2. Activation Entropy H(r(x))
        H_r = entropy(router_logits_for_current_step).cpu()

        # 3. Activated Expert Sigma(x) (top-k, k=1 for Switch-T5)
        Sigma_x = torch.topk(router_logits_for_current_step, k=1, dim=-1).indices.cpu().squeeze()

        return R_i, H_r, Sigma_x

In [None]:
# @title 4. KGW 基线 (LogitsProcessor & Detector)

class KGWLogitsProcessor(LogitsProcessor):
    """实现标准 KGW (红绿词表) 逻辑"""
    def __init__(self, config):
        self.gamma = config.KGW_GAMMA
        self.delta = config.KGW_DELTA
        self.vocab_size = config.VOCAB_SIZE
        self.green_list_size = int(self.vocab_size * self.gamma)

    def _get_green_list(self, last_token_id):
        """使用 last_token_id 作为种子生成确定性的绿表"""
        # (为了简单，我们使用 torch 的伪随机，而非 KGW 的哈希)
        g = torch.Generator()
        g.manual_seed(last_token_id)
        indices = torch.randperm(self.vocab_size, generator=g)
        green_list = indices[:self.green_list_size]
        return green_list

    def __call__(self, input_ids, scores):
        # 1. 获取上一个 token
        last_token_id = input_ids[0, -1].item()

        # 2. 获取绿表
        green_list = self._get_green_list(last_token_id)

        # 3. 施加扰动 (boost)
        scores[0, green_list] = scores[0, green_list] + self.delta
        return scores

def detect_kgw(text, tokenizer, config):
    """KGW 检测器"""
    tokens = tokenizer(text, return_tensors="pt").input_ids[0].to(DEVICE)
    processor = KGWLogitsProcessor(config)

    num_hits = 0
    num_scored = len(tokens) - 1

    for i in range(1, len(tokens)):
        last_token_id = tokens[i-1].item()
        current_token_id = tokens[i].item()

        green_list = processor._get_green_list(last_token_id)

        if current_token_id in green_list:
            num_hits += 1

    # H0 期望
    p0 = config.KGW_GAMMA
    return z_score(num_hits, num_scored, p0)

In [None]:
# @title 5. LSH-GHW (我们的方案) - 辅助类 (Keys)
# @markdown (这个类管理 LSH-GHW 的所有秘密密钥)

class LSH_GHW_Keys:
    """管理 LSH-GHW 方案的所有密钥 (配置文件)"""
    def __init__(self, config):
        self.L = config.LSH_L
        self.Me = config.LSH_ME
        self.Mt = config.LSH_MT
        self.NumExperts = config.NUM_EXPERTS
        self.VocabSize = config.VOCAB_SIZE
        self.RwDim = config.RW_DIM

        # 1. K_lsh: L x d_RW 随机投影向量
        self.K_lsh = torch.randn((self.L, self.RwDim))

        # 2. E_Pools: L 个专家池
        self.E_Pools = []
        for _ in range(self.L):
            indices = np.random.choice(self.NumExperts, self.Me, replace=False)
            self.E_Pools.append(set(indices))

        # 3. T_Pools: n 个词汇池 (每个专家一个)
        self.T_Pools = []
        g = torch.Generator()
        g.manual_seed(42) # 确保词汇池可复现
        for _ in range(self.NumExperts):
            indices = torch.randperm(self.VocabSize, generator=g)[:self.Mt]
            self.T_Pools.append(set(indices.tolist()))

        # H0 期望的 p0
        # E[|E_green|] = L * 0.5 * Me
        # E[|Sigma_x|] = k (k=1 for Switch-T5)
        # E[|Sigma_wm|] approx E[|E_green|] * (k / NumExperts)
        # E[|G_i|] approx E[|Sigma_wm|] * Mt
        expected_sigma_wm = (self.L * 0.5 * self.Me) * (1 / self.NumExperts)
        expected_gi_size = expected_sigma_wm * self.Mt
        self.p0 = expected_gi_size / self.VocabSize
        if self.p0 > 1.0: self.p0 = 1.0
        print(f"LSH-GHW H0 (p0) = {self.p0:.6f}")

In [None]:
# @title 6. LSH-GHW (我们的方案) - LogitsProcessor
# @markdown (MVES 核心：实现 LSH-GHW 嵌入逻辑)

class LSH_GHW_LogitsProcessor(LogitsProcessor):
    """
    (MVES 核心) 实现 LSH-GHW (分层) 逻辑。
    使用 "pre-pass" 方式获取内部状态。
    """
    def __init__(self, model, keys, config):
        self.model = model
        self.keys = keys
        self.config = config

    def _get_lsh_ghw_state(self, input_ids):
        """
        执行方案的 步骤 1-4。
        这是 MVES 核心逻辑的 "预计算" 部分。
        """
        # 1. 获取内部状态
        R_i, H_r, Sigma_x = get_router_states(self.model, input_ids.to(DEVICE))

        # DEBUG print
        print(f"DEBUG from LogitsProcessor: R_i shape: {R_i.shape}, K_lsh.T shape: {self.keys.K_lsh.T.shape}")

        # 2. 语义门控 (LSH)
        # R_i (d_RW) @ K_lsh.T (d_RW, L) -> S_i_cont (L)
        S_i_cont = R_i.float() @ self.keys.K_lsh.T.float()
        S_i_binary = (S_i_cont > 0) # L-bit 签名

        E_green = set()
        for j in range(self.keys.L):
            if S_i_binary[j]:
                E_green.update(self.keys.E_Pools[j])

        # 3. 路由门控 (交集)
        Sigma_watermark = E_green.intersection({Sigma_x.item()})

        return H_r.item(), Sigma_watermark

    def __call__(self, input_ids, scores):
        # 1. 获取 LSH-GHW 状态
        H_r, Sigma_watermark = self._get_lsh_ghw_state(input_ids)

        # 2. 计算动态强度 (权衡点)
        delta_adaptive = 0.0
        if H_r >= self.config.LSH_TAU_LOW:
            # 只有在模型 "不确定" (高熵) 时才加水印
            delta_adaptive = self.config.LSH_DELTA_MAX

        # 3. 构建绿色词表 G_i
        G_i = set()
        for k in Sigma_watermark:
            G_i.update(self.keys.T_Pools[k])

        # 4. 施加扰动 (boost)
        if delta_adaptive > 0 and G_i:
            green_list_indices = torch.tensor(list(G_i), dtype=torch.long, device=scores.device)
            scores[0, green_list_indices] += delta_adaptive

        return scores

In [None]:
# @title 7. LSH-GHW (我们的方案) - Detector

def detect_lsh_ghw(text, tokenizer, model, keys, config):
    """LSH-GHW 检测器"""
    tokens = tokenizer(text, return_tensors="pt").input_ids[0]

    num_hits = 0
    num_scored = 0

    # 我们从第2个 token 开始检测 (需要至少1个 token 作为上下文)
    for i in range(1, len(tokens)):
        C_i = tokens[:i].unsqueeze(0).to(DEVICE) # 上下文
        t_i = tokens[i].item()                   # 实际 token

        # 1. 重构状态 (使用干净模型)
        with torch.no_grad():
            outputs = model(
                input_ids=C_i,
                decoder_input_ids=C_i, # 简化的 T5 推理
                output_router_logits=True
            )

        router_logits = outputs.decoder_router_logits[-1][:, -1, :]
        R_i = router_logits.float().mean(dim=0).cpu()
        H_r = entropy(router_logits).cpu().item()
        Sigma_x = torch.topk(router_logits, k=1, dim=-1).indices.cpu().squeeze().item()

        # 2. 重构 LSH 签名
        S_i_cont = R_i.float() @ keys.K_lsh.T.float()
        S_i_binary = (S_i_cont > 0)

        E_green = set()
        for j in range(keys.L):
            if S_i_binary[j]:
                E_green.update(keys.E_Pools[j])

        # 3. 重构 Sigma_watermark 和 G_i
        Sigma_watermark = E_green.intersection({Sigma_x})
        G_i = set()
        for k in Sigma_watermark:
            G_i.update(keys.T_Pools[k])

        # 4. 重构强度决策 (我们是否 *本应* 在此加水印?)
        if H_r >= config.LSH_TAU_LOW:
            num_scored += 1
            if t_i in G_i:
                num_hits += 1

    return z_score(num_hits, num_scored, keys.p0)

In [None]:
# @title 8. 步骤 8: 校准 (Calibration)
# @markdown (MVES 关键：动态设置 tau_low 阈值)

def calibrate_tau(model, tokenizer, prompts, config, percentile=25):
    """
    运行模型，收集激活熵 H(r(x)) 的分布。
    设置 tau_low 为分布的 `percentile` 分位数。
    (e.g., percentile=25 意味着我们在 75% "最不确定" 的 token 上加水印)
    """
    print("Starting calibration for LSH_TAU_LOW...")
    entropies = []

    for prompt in prompts:
        input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(DEVICE)
        # 只取一个 token 来获取其熵
        _, H_r, _ = get_router_states(model, input_ids)
        entropies.append(H_r.item())

    tau_low = np.percentile(entropies, percentile)
    config.LSH_TAU_LOW = tau_low
    print(f"Calibration complete. LSH_TAU_LOW set to: {tau_low:.4f}")

# 运行校准
calibrate_tau(model, tokenizer, prompts, config, percentile=25)

Starting calibration for LSH_TAU_LOW...


IndexError: too many indices for tensor of dimension 1

In [None]:
# @title 9. 实验执行：文本生成 (Generation)

# 存储所有生成结果
results = {
    "prompts": prompts,
    "C1_Clean": [],
    "C2_KGW": [],
    "C3_LSH_GHW": [],
}

# 初始化 LSH-GHW 密钥
lsh_keys = LSH_GHW_Keys(config)

# 初始化 LogitsProcessors
kgw_processor = KGWLogitsProcessor(config)
lsh_ghw_processor = LSH_GHW_LogitsProcessor(model, lsh_keys, config)

print("Starting text generation...")

for i, prompt in enumerate(prompts):
    print(f"Generating sample {i+1}/{len(prompts)}...")
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(DEVICE)

    # --- C1: 干净模型 (Baseline) ---
    outputs_c1 = model.generate(
        input_ids,
        max_length=config.PROMPT_LENGTH + config.GEN_LENGTH,
        do_sample=True, top_k=10
    )
    results["C1_Clean"].append(tokenizer.decode(outputs_c1[0], skip_special_tokens=True))

    # --- C2: KGW 基线 ---
    outputs_c2 = model.generate(
        input_ids,
        logits_processor=LogitsProcessorList([kgw_processor]),
        max_length=config.PROMPT_LENGTH + config.GEN_LENGTH,
        do_sample=True, top_k=10
    )
    results["C2_KGW"].append(tokenizer.decode(outputs_c2[0], skip_special_tokens=True))

    # --- C3: LSH-GHW (我们的方案) ---
    outputs_c3 = model.generate(
        input_ids,
        logits_processor=LogitsProcessorList([lsh_ghw_processor]),
        max_length=config.PROMPT_LENGTH + config.GEN_LENGTH,
        do_sample=True, top_k=10
    )
    results["C3_LSH_GHW"].append(tokenizer.decode(outputs_c3[0], skip_special_tokens=True))

print("Generation complete.")

In [None]:
# @title 10. 实验执行：指标评估 (PPL & Z-Score)
# @markdown (计算文本质量 PPL 和可检测性 Z-Score)

# (注意：PPL 计算很慢。在 MVES 中，我们可以只看 Z-Score，或抽样 PPL)
# (为简洁起见，我们跳过 PPL，重点关注 Z-Score，这已足够验证)

print("Starting evaluation...")
report_data = []

for i in range(len(prompts)):
    # 获取 C1, C2, C3 的文本
    text_c1 = results["C1_Clean"][i]
    text_c2 = results["C2_KGW"][i]
    text_c3 = results["C3_LSH_GHW"][i]

    # --- 评估可检测性 (Z-Score) ---

    # 1. 检测 KGW (TP 和 FP)
    z_c2_as_c2 = detect_kgw(text_c2, tokenizer, config) # 真阳性 TP
    z_c1_as_c2 = detect_kgw(text_c1, tokenizer, config) # 假阳性 FP

    # 2. 检测 LSH-GHW (TP 和 FP)
    z_c3_as_c3 = detect_lsh_ghw(text_c3, tokenizer, model, lsh_keys, config) # TP
    z_c1_as_c3 = detect_lsh_ghw(text_c1, tokenizer, model, lsh_keys, config) # FP

    report_data.append({
        "sample_id": i,
        "z_KGW_TP": z_c2_as_c2,
        "z_KGW_FP": z_c1_as_c2,
        "z_LSH_TP": z_c3_as_c3,
        "z_LSH_FP": z_c1_as_c3,
    })

report_df = pd.DataFrame(report_data)
print("Evaluation complete.")

In [None]:
# @title 11. 实验执行：语义鲁棒性 (R_input) 评估
# @markdown (这是我们方案的核心优势验证)

print("Starting R_input (Robustness) evaluation...")

# MVES 关键：使用一个简单的、模拟的释义攻击 (Paraphrase Attack)
# 真实的实验需要一个 T5 释义模型
def run_paraphrase_attack(prompt):
    # 简单攻击：添加标点，改变大小写
    return prompt.strip() + ". " + "In other words,"

robustness_data = []

for i, prompt in enumerate(prompts):
    print(f"Running robustness attack {i+1}/{len(prompts)}...")

    # 1. 制造释义 prompt
    para_prompt = run_paraphrase_attack(prompt)
    para_input_ids = tokenizer(para_prompt, return_tensors="pt").input_ids.to(DEVICE)

    # 2. 用 KGW 生成释义文本
    outputs_c2_para = model.generate(
        para_input_ids,
        logits_processor=LogitsProcessorList([kgw_processor]),
        max_length=config.PROMPT_LENGTH + config.GEN_LENGTH,
        do_sample=True, top_k=10
    )
    text_c2_para = tokenizer.decode(outputs_c2_para[0], skip_special_tokens=True)

    # 3. 用 LSH-GHW 生成释义文本
    outputs_c3_para = model.generate(
        para_input_ids,
        logits_processor=LogitsProcessorList([lsh_ghw_processor]),
        max_length=config.PROMPT_LENGTH + config.GEN_LENGTH,
        do_sample=True, top_k=10
    )
    text_c3_para = tokenizer.decode(outputs_c3_para[0], skip_special_tokens=True)

    # 4. 检测 Z-Score (看衰减)
    z_c2_para_decay = detect_kgw(text_c2_para, tokenizer, config)
    z_c3_para_decay = detect_lsh_ghw(text_c3_para, tokenizer, model, lsh_keys, config)

    robustness_data.append({
        "sample_id": i,
        "z_KGW_Robust": z_c2_para_decay,
        "z_LSH_Robust": z_c3_para_decay,
    })

robust_df = pd.DataFrame(robustness_data)
print("Robustness evaluation complete.")

# 合并报告
report_df = report_df.merge(robust_df, on="sample_id")

In [None]:
# @title 12. 最终结果报告
# @markdown (显示所有指标的平均值)

# --- 计算平均值 ---
avg_z_kgw_tp = report_df['z_KGW_TP'].mean()
avg_z_kgw_fp = report_df['z_KGW_FP'].mean()
avg_z_kgw_robust = report_df['z_KGW_Robust'].mean()

avg_z_lsh_tp = report_df['z_LSH_TP'].mean()
avg_z_lsh_fp = report_df['z_LSH_FP'].mean()
avg_z_lsh_robust = report_df['z_LSH_Robust'].mean()

# --- 计算衰减 (Z-score Decay) ---
# KGW 不关心语义，其衰减应接近 0 (Z-Score 不变)
# LSH-GHW 关心语义，我们期望衰减 > 0，但 Z-Score 仍远高于 FP
decay_kgw = avg_z_kgw_tp - avg_z_kgw_robust
decay_lsh = avg_z_lsh_tp - avg_z_lsh_robust

# --- 打印报告 ---
summary = pd.DataFrame({
    "Metric": [
        "Avg. Z-Score (TP)",
        "Avg. Z-Score (FP)",
        "Avg. Z-Score (Robustness Attack)",
        "Z-Score Decay (TP - Robust)"
    ],
    "C2 (KGW Baseline)": [
        f"{avg_z_kgw_tp:.2f}",
        f"{avg_z_kgw_fp:.2f}",
        f"{avg_z_kgw_robust:.2f}",
        f"{decay_kgw:.2f}"
    ],
    "C3 (LSH-GHW)": [
        f"{avg_z_lsh_tp:.2f}",
        f"{avg_z_lsh_fp:.2f}",
        f"{avg_z_lsh_robust:.2f}",
        f"{decay_lsh:.2f}"
    ]
})

print("--- MVES 最终报告 ---")
print(summary.to_string(index=False))

print("\n--- 结果分析 (期望) ---")
print(f"1. 可检测性: LSH-GHW (TP={avg_z_lsh_tp:.2f}) 和 KGW (TP={avg_z_kgw_tp:.2f}) 都应远高于 FP (Z > 4.0)。")
print(f"2. 隐蔽性: LSH-GHW (FP={avg_z_lsh_fp:.2f}) 和 KGW (FP={avg_z_kgw_fp:.2f}) 都应接近 0。")
print(f"3. 语义鲁棒性 (核心):")
print(f"   - KGW 不关心语义，其 Z-Score 几乎不衰减 (Decay={decay_kgw:.2f})。")
print(f"   - LSH-GHW 依赖语义，我们*期望* Z-Score 衰减 (Decay={decay_lsh:.2f})。")
print(f"   - **成功标准**: 尽管衰减了，LSH-GHW 的鲁棒性 Z-Score ({avg_z_lsh_robust:.2f}) 仍然远高于 FP 阈值。")