In [4]:
import jsonlines
import json
from pathlib import Path
import random
import re
import os
import time
import numpy as np
import matplotlib.pyplot as plt
from openai import OpenAI
from sklearn.metrics import auc, roc_curve
import torch
from tqdm import tqdm
from utils.transformers_config import TransformersConfig  # 确保该模块可用
from transformers import AutoModelForCausalLM, AutoTokenizer
from watermark.auto_watermark import AutoWatermark


In [5]:
class WatermarkDetector:
    def __init__(self, model_name="gpt2", device="cuda"):
        # 使用绝对路径直接指定配置文件位置
        config_path = "D:/study_materals/AI_Watermark/watermark2/watermark/config/EWD.json"
        
        # 验证路径有效性
        if not Path(config_path).exists():
            raise FileNotFoundError(f"配置文件不存在: {config_path}")

        # 初始化模型组件
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(model_name).to(device)
        
        # 正确调用加载方法（传递路径字符串）
        self.watermark_detector = AutoWatermark.load(
            algorithm_name='EWD',
            algorithm_config=config_path,  # 直接传递路径字符串
            transformers_config=TransformersConfig(
                model=model,
                tokenizer=tokenizer,
                device=device
            )
        )
        self.tokenizer = tokenizer
        
    def compute_z_score(self, text):
        """带自动截断的检测方法"""
        try:
            # 编码并截断文本
            input_ids = self.tokenizer.encode(
                text, 
                truncation=True, 
                max_length=1024  # 显式设置最大长度
            )
            processed_text = self.tokenizer.decode(input_ids)
            
            # 获取完整检测结果（返回字典）
            detection_result = self.watermark_detector.detect_watermark(processed_text)
            
            # 关键修改：提取score字段值
            return float(detection_result['score'])  # <<< 这里修改为访问'score'键
            
        except KeyError as e:
            print(f"水印检测结果字段缺失: {str(e)}，使用默认值0.0")
            return 0.0
        except Exception as e:
            print(f"水印检测失败: {str(e)}")
            return 0.0
    
# %% [2] 修复后的攻击类（参数合并逻辑修正）
class AdvancedWatermarkAttacker:
    def __init__(self, api_key, language='en', params=None):
        self.client = OpenAI(api_key=api_key, base_url="https://api.moonshot.cn/v1")
        self.language = language
        
        # 合并默认参数与用户参数（关键修复）
        default_params = {
            'attack_modes': {
                'CP-3-10%': {'num_spans': 3, 'ratio': 0.1},
                'CP-1-25%': {'num_spans': 1, 'ratio': 0.25}
            },
            'min_span_length': 50 if language == 'en' else 20,
            'transition_phrases': {
                'en': ["Furthermore,", "However,", "Recent studies show"],
                'zh': ["值得注意的是，", "相关研究表明，", "根据最新数据"]
            },
            'coherence_threshold': 0.8,
            'max_retries': 3  # 确保该字段存在
        }
        
        # 深度合并策略改进
        def deep_update(source, overrides):
            """递归深度合并字典"""
            for key, value in overrides.items():
                if isinstance(value, dict) and key in source:
                    source[key] = deep_update(source.get(key, {}), value)
                else:
                    source[key] = value
            return source
        
        self.params = deep_update(default_params, params or {})

        for mode in self.params['attack_modes'].values():
            if mode['num_spans'] < 1:
                raise ValueError(f"攻击模式{mode}的num_spans必须≥1")

    def generate_context(self, prompt, target_length=600):
        """统一使用moonshot模型"""
        system_msg = {
        'en': f"Generate a coherent {target_length}-word passage with at least 5 paragraphs.",
        'zh': f"生成包含至少5个自然段落的{target_length}字文章"
        }[self.language]
        
        for retry in range(self.params['max_retries']):
            try:
                response = self.client.chat.completions.create(
                    model="moonshot-v1-8k",  # 修改点：统一使用moonshot
                    messages=[
                        {"role": "system", "content": system_msg},
                        {"role": "user", "content": prompt}
                    ],
                    temperature=0.7,
                    max_tokens=1024  # 添加长度限制
                )
                if response.choices[0].message.content.strip():
                    return response.choices[0].message.content
            except Exception as e:
                print(f"尝试 {retry+1}/{self.params['max_retries']} 失败: {str(e)}")
                if "rate limit" in str(e).lower():
                    time.sleep(5)  # 限流时等待
        return None


    def _segment_text(self, text):
        """语义边界分割（支持中英文）"""
        if self.language == 'en':
            return [s.strip() for s in re.split(r'(?<=[.!?])', text) if len(s) > 10]
        else:
            return [s.strip() for s in re.split(r'(?<=[。！？])', text) if len(s) > 5]

    def split_watermark(self, text, mode='CP-3-10%'):
        """动态语义分割"""
        config = self.params['attack_modes'][mode]
        sentences = self._segment_text(text)
        total_sents = len(sentences)
        
        span_size = max(
            self.params['min_span_length'],
            int(total_sents * config['ratio'] / config['num_spans'])
        )
        
        spans = []
        for i in range(config['num_spans']):
            start = i * (total_sents // config['num_spans'])
            end = min(start + span_size, total_sents)
            spans.append(' '.join(sentences[start:end]))
        return [s for s in spans if len(s) >= self.params['min_span_length']]

    def insert_watermark(self, context, spans, mode):
        # 分割原始上下文
        paras = [p.strip() for p in re.split(r'\n\n+', context) if p.strip()]
        num_paras = len(paras)
        
        # 获取攻击配置
        config = self.params['attack_modes'][mode]
        required_spans = config['num_spans']

        # 动态调整插入次数（关键修复）
        available_positions = max(0, num_paras - 1)  # range(1, num_paras)的可用位置数
        actual_spans = min(required_spans, available_positions)
        if actual_spans <= 0:
            return context  # 无法插入时返回原始文本
        

        # 计算插入位置
        try:
            positions = sorted(random.sample(
                range(1, num_paras), 
                actual_spans
            ))
        except ValueError:
        # 处理极端情况
            positions = list(range(1, num_paras))[:actual_spans]
    
        # 带过渡的插入（后续代码保持不变）
        new_paras = []
        span_idx = 0
        transition = random.choice(self.params['transition_phrases'][self.language])
    
        for i, para in enumerate(paras):
            new_paras.append(para)
            if span_idx < len(spans) and i >= positions[span_idx]:
                new_paras.append(f"{transition} {spans[span_idx]}")
                span_idx += 1
                transition = random.choice(self.params['transition_phrases'][self.language])
    
        return '\n\n'.join(new_paras)

    def validate_coherence(self, text):
        """LLM辅助连贯性验证"""
        prompt = {
            'en': "Rate text coherence from 0-1. Respond with number only.\nText:\n",
            'zh': "请为以下文本的连贯性打分（0-1），只需返回数字：\n文本："
        }[self.language]
        
        try:
            response = self.client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": prompt + text}],
                temperature=0.0
            )
            return float(response.choices[0].message.content.strip())
        except:
            return 1.0  # 失败时默认通过

    def attack(self, original_data, mode='CP-3-10%'):
        """完整攻击流程"""
        # 生成上下文
        context = self.generate_context(original_data['prompt'])
        if not context:
            return None
        
        # 分割水印
        spans = self.split_watermark(original_data['generated_answer'], mode)
        if len(spans) != self.params['attack_modes'][mode]['num_spans']:
            return None
        
        # 执行插入
        attacked_text = self.insert_watermark(context, spans, mode)
        
        # 连贯性验证与重试
        for _ in range(self.params['max_retries']):
            if self.validate_coherence(attacked_text) >= self.params['coherence_threshold']:
                break
            attacked_text = self.insert_watermark(context, spans, mode)
        
        return {
            "original_text": original_data['generated_answer'],
            "attacked_text": attacked_text,
            "context": context,
            "attack_mode": mode
        }
    

# %% [3] 修改后的实验流程
class WatermarkExperiment:
    def __init__(self, attacker, detector):
        self.attacker = attacker
        self.detector = detector
    
    def run(self, input_path, output_path):
        results = []
        with jsonlines.open(input_path) as f:
            dataset = [line for line in f]
        
        for data in tqdm(dataset):
            # 构建攻击需要的输入格式
            attack_input = {
                "prompt": data["prompt"],
                "generated_answer": data["generated_answer"]
            }
            
            # 执行攻击
            attacked_data = self.attacker.attack(attack_input)
            if not attacked_data:
                continue
                
            # 计算攻击后文本水印分数
            attacked_score = self.detector.compute_z_score(
                attacked_data['attacked_text']
            )
            
            # 记录原始分数（从数据集直接获取）
            original_score = data["generated_score"]
            
            results.append({
                "original_text": data["generated_answer"],  # 原始文本
                "attacked_text": attacked_data["attacked_text"],  # 攻击后文本
                "original_score": float(data["generated_score"]),   # 原始分数
                "attacked_score": float(attacked_score),  # 攻击后分数
                "delta": float(data["generated_score"]) - float(attacked_score),  # 分数变化
                "attack_params": attacked_data["attack_mode"]
            })
        
        # 保存结果
        with jsonlines.open(output_path, 'w') as f:
            f.write_all(results)
        
        return results
    
 

In [None]:
#moonshot_api_key = "sk-VQhUCpASDoZc141KQz1gydCvgtJFgSaDi2YM6r0fRDZwnAmp"  # 替换为真实API密钥
#input_jsonl_path = "D:/study_materals/AI_Watermark/watermark2/watermark/results/backtranslation_KGW.jsonl"
#output_jsonl_path = "D:/study_materals/AI_Watermark/watermark2/watermark/results/KGW_attack_results.jsonl"
    
# %% [4] 最终使用示例
if __name__ == "__main__":
    # 初始化攻击器（确保使用正确的API端点）
    attacker = AdvancedWatermarkAttacker(
        api_key="sk-VQhUCpASDoZc141KQz1gydCvgtJFgSaDi2YM6r0fRDZwnAmp",
        language='en',
        params={'attack_modes': {'CP-3-10%': {'num_spans': 3, 'ratio': 0.1}}}
    )
    print("攻击参数验证:", attacker.params.keys())  # 应包含'max_retries'等所有必要字段
    
    # 初始化检测器（使用与原始水印相同的模型）
    # 测试水印检测初始化
    try:
        detector = WatermarkDetector("gpt2")
        print("水印检测器初始化成功")
    except Exception as e:
        print(f"检测器初始化失败: {str(e)}")
    
    # 运行实验
    experiment = WatermarkExperiment(attacker, detector)
    results = experiment.run(
        input_path="D:/study_materals/AI_Watermark/watermark2/watermark/results/backtranslation_EWD.jsonl",
        output_path="D:/study_materals/AI_Watermark/watermark2/watermark/achieve/EWD_attack_results.jsonl"
    )

攻击参数验证: dict_keys(['attack_modes', 'min_span_length', 'transition_phrases', 'coherence_threshold', 'max_retries'])
水印检测器初始化成功


100%|██████████| 721/721 [2:45:44<00:00, 13.79s/it]  


: 