In [None]:
import os
from datetime import datetime
import pandas as pd
import re
import logging
import time
import uuid
from typing import List, Dict, Any
import yaml
from concurrent.futures import ThreadPoolExecutor, as_completed

from openai import OpenAI

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

def load_config(config_path: str = "config.yaml") -> Dict[str, Any]:
    with open(config_path, 'r') as f:
        cfg = yaml.safe_load(f)
    return cfg

def load_prompts(prompts_path: str = "prompts.yaml") -> Dict[str, str]:
    with open(prompts_path, 'r') as f:
        p = yaml.safe_load(f)
    return p

config = load_config("config.yaml")
prompts = load_prompts("prompts.yaml")

api_key = os.getenv("OPENAI_API_KEY","Your api_key here.")
if not api_key:
    logging.error("未检测到 OPENAI_API_KEY 环境变量。请设置后重试。")
    raise EnvironmentError("OPENAI_API_KEY not set")

client = OpenAI(api_key=api_key)

output_dir = os.getenv("OUTPUT_DIR", config.get("output_dir", "./intent_dataset_outputs"))
os.makedirs(output_dir, exist_ok=True)

# 增加一些额外上下文描述的逻辑示例(根据类别)
category_context_map = config.get("category_context_map", {})

logging.info(f"Category context for 'photo': {category_context_map.get('photo')}")
logging.info(f"Category context for 'video': {category_context_map.get('video')}")

logging.info("步骤 1：环境与配置加载完成。")

def create_generation_prompt(instruction: str, language: str, count: int, category: str) -> str:
    context_description = category_context_map.get(category, "The user is giving an instruction to the device.")
    return prompts["generation_template"].format(
        instruction=instruction,
        language=language,
        count=count,
        context=context_description
    )

def create_scoring_prompt(instruction: str, variation: str) -> str:
    return prompts["scoring_template"].format(
        instruction=instruction,
        variation=variation
    )

def call_model(prompt: str, model_name: str, temperature: float, max_tokens: int, retry_attempts: int = 3) -> str:
    messages = [{"role": "user", "content": prompt}]
    # print("prompt:",prompt)
    for attempt in range(retry_attempts):
        try:
            completion = client.chat.completions.create(
                model=model_name,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens
            )
            return completion.choices[0].message.content.strip()
        except Exception as e:
            logging.error(f"调用模型出错 (第{attempt+1}/{retry_attempts}次尝试): {e}")
            time.sleep(5)
    raise RuntimeError("在重试数内未成功获取模型响应")

def parse_variations(response: str) -> List[str]:
    variations = []
    lines = response.split('\n')
    for line in lines:
        line = line.strip()
        # 去掉行首编号，如"44."
        line = re.sub(r'^\d+\.\s*', '', line)
        # 去掉项目符号
        line = line.strip("-•* \t").strip()
        # 去掉首尾引号
        line = re.sub(r'^"+|"+$', '', line)
        line = re.sub(r"^'+|'+$", '', line)
        # 去掉标点符号
        line = re.sub(r'[^\w\s]', '', line)
        
        if line:
            variations.append(line.strip())
    return list(set(variations))

def extract_score_from_response(response: str) -> int:
    match = re.search(r"\b([0-5])\b", response)
    if match:
        return int(match.group(1))
    return 0

logging.info("步骤 2：辅助函数定义完成。")

test_str = """
1. "Can you take a picture?"
2. "Please capture an image."
3. 'Snap a photo.'
"""
parsed = parse_variations(test_str)
logging.info(f"测试parse_variations结果: {parsed}")
# 应输出类似 ['Can you take a picture?', 'Please capture an image.', 'Snap a photo.']

def generate_variations_for_one_iteration(instruction: str, language: str, category: str, target_count: int) -> List[str]:
    gen_cfg = config["models"]["generation"]
    prompt = create_generation_prompt(instruction, language, target_count, category)
    response = call_model(
        prompt,
        gen_cfg["name"],
        gen_cfg["temperature"],
        gen_cfg["max_tokens"],
        gen_cfg.get("retry_attempts", 3)
    )
    return parse_variations(response)

def generate_variations(instruction: str, language: str, category: str, target_count: int, iterations: int) -> List[str]:
    all_variations = set()
    concurrency = config.get("concurrency", 10)  # 默认并发10个线程
    
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [
            executor.submit(generate_variations_for_one_iteration, instruction, language, category, target_count) 
            for _ in range(iterations)
        ]
        for future in as_completed(futures):
            try:
                result = future.result()
                all_variations.update(result)
            except Exception as e:
                logging.error(f"生成变体任务出现错误：{e}")

    final_variations = list(all_variations)
    if len(final_variations) < target_count:
        logging.warning(f"没有达到目标数量 {target_count} 个唯一变体，'{instruction}' 仅获得 {len(final_variations)} 个变体。")
    return final_variations

logging.info("步骤 3：生成变体函数定义完成。")

print("skip_scoring:",config.get("skip_scoring", False))
print("generation_count:", config["generation_count"],"iterations:", config["iterations"])


In [None]:
def score_one_variation(instruction: str, variation: str) -> (str, int, int):
    scoring_cfg = config["models"]["scoring"]
    prompt = create_scoring_prompt(instruction, variation)
    response1 = call_model(prompt, scoring_cfg["name"], scoring_cfg["temperature"], scoring_cfg["max_tokens"])
    score1 = extract_score_from_response(response1)

    response2 = call_model(prompt, scoring_cfg["name"], scoring_cfg["temperature"], scoring_cfg["max_tokens"])
    score2 = extract_score_from_response(response2)

    return variation, score1, score2

def filter_variations_by_score(instruction: str, variations: List[str], threshold: int) -> List[str]:
    concurrency = config.get("concurrency", 20) #20并发
    valid_variations = []
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [executor.submit(score_one_variation, instruction, var) for var in variations]
        for future in as_completed(futures):
            try:
                var, s1, s2 = future.result()
                if s1 >= threshold and s2 >= threshold:
                    valid_variations.append(var)
            except Exception as e:
                logging.error(f"打分任务出现错误：{e}")
    return valid_variations

logging.info("步骤 4：打分函数定义完成。")


In [4]:
def process_instruction(instruction_data: Dict[str, Any], config: Dict[str, Any], output_dir: str) -> Dict[str, Any]:
    """
    处理单个指令：生成变体、打分、验证并保存完整的结果。
    """
    instruction = instruction_data["instruction"]
    language = instruction_data["language"]
    category = instruction_data.get("category", "chat")

    # 生成变体
    variations = generate_variations(
        instruction,
        language,
        category,
        config["generation_count"],
        config["iterations"]
    )
    logging.info(f"Generated {len(variations)} variations for instruction: '{instruction}'")

    # 保存生成的变体
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    variations_filename = os.path.join(output_dir, f"{instruction.replace(' ', '_')}_variations_{timestamp}.xlsx")
    pd.DataFrame({"variations": variations}).to_excel(variations_filename, index=False)
    logging.info(f"Saved generated variations to: {variations_filename}")

    # 对变体打分
    scored_results = []
    validated_variations = []
    for v in variations:
        var, score1, score2 = score_one_variation(instruction, v)
        passed = score1 >= config["scoring_threshold"] and score2 >= config["scoring_threshold"]
        scored_results.append([var, score1, score2, var if passed else ""])
        if passed:
            validated_variations.append(var)

    # 保存评分结果
    score_filename = os.path.join(output_dir, f"{instruction.replace(' ', '_')}_scores_{timestamp}.xlsx")
    pd.DataFrame(scored_results, columns=["variation", "score1", "score2", "passed_variation"]).to_excel(score_filename, index=False)
    logging.info(f"Saved scoring results to: {score_filename}")

    # 保存验证通过的变体
    validated_filename = os.path.join(output_dir, f"{instruction.replace(' ', '_')}_validated_{timestamp}.xlsx")
    pd.DataFrame({"validated_variations": validated_variations}).to_excel(validated_filename, index=False)
    logging.info(f"Saved validated variations to: {validated_filename}")

    return {
        "instruction": instruction,
        "validated_variations": validated_variations,
        "scored_results_file": score_filename
    }

In [None]:
# 生成负样本
def create_generation_prompt_negative(instruction: str, language: str, count: int, category: str) -> str:
    context_description = category_context_map.get(category, "The user is giving an instruction to the device.")
    return prompts["negative_generation_template"].format(
        instruction=instruction,
        language=language,
        count=count,
        context=context_description
    )

def create_scoring_prompt_negative(instruction: str, variation: str) -> str:
    return prompts["negative_scoring_template"].format(
        instruction=instruction,
        variation=variation
    )

test_str = """
1. "How to take a picture?"
2. "Please teach me to capture an image."
3. 'Analyze a photo.'
"""
parsed = parse_variations(test_str)
logging.info(f"测试parse_variations结果: {parsed}")

def generate_variations_for_one_iteration_negative(instruction: str, language: str, category: str, target_count: int) -> List[str]:
    gen_cfg = config["models"]["generation"]
    prompt = create_generation_prompt_negative(instruction, language, target_count, category)
    response = call_model(
        prompt,
        gen_cfg["name"],
        gen_cfg["temperature"],
        gen_cfg["max_tokens"],
        gen_cfg.get("retry_attempts", 3)
    )
    return parse_variations(response)

def generate_variations_negative(instruction: str, language: str, category: str, target_count: int, iterations: int) -> List[str]:
    all_variations = set()
    concurrency = config.get("concurrency", 10)  # 默认并发10个线程
    
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [
            executor.submit(generate_variations_for_one_iteration_negative, instruction, language, category, target_count) 
            for _ in range(iterations)
        ]
        for future in as_completed(futures):
            try:
                result = future.result()
                all_variations.update(result)
            except Exception as e:
                logging.error(f"生成变体任务出现错误：{e}")

    final_variations = list(all_variations)
    if len(final_variations) < target_count:
        logging.warning(f"没有达到目标数量 {target_count} 个唯一变体，'{instruction}' 仅获得 {len(final_variations)} 个变体。")
    return final_variations

logging.info("步骤 3：生成变体函数定义完成。")

print("skip_scoring:",config.get("skip_scoring", False))
print("generation_count:", config["generation_count"],"iterations:", config["iterations"])

def score_one_variation_negative(instruction: str, variation: str) -> (str, int, int):
    scoring_cfg = config["models"]["scoring"]
    prompt = create_scoring_prompt_negative(instruction, variation)
    response1 = call_model(prompt, scoring_cfg["name"], scoring_cfg["temperature"], scoring_cfg["max_tokens"])
    score1 = extract_score_from_response(response1)

    response2 = call_model(prompt, scoring_cfg["name"], scoring_cfg["temperature"], scoring_cfg["max_tokens"])
    score2 = extract_score_from_response(response2)

    return variation, score1, score2

def filter_variations_by_score_negative(instruction: str, variations: List[str], threshold: int) -> List[str]:
    concurrency = config.get("concurrency", 20) #20并发
    valid_variations = []
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [executor.submit(score_one_variation_negative, instruction, var) for var in variations]
        for future in as_completed(futures):
            try:
                var, s1, s2 = future.result()
                if s1 >= threshold and s2 >= threshold:
                    valid_variations.append(var)
            except Exception as e:
                logging.error(f"打分任务出现错误：{e}")
    return valid_variations

logging.info("步骤 4：打分函数定义完成。")

def process_instruction_negative(instruction_data: Dict[str, Any], config: Dict[str, Any], output_dir: str) -> Dict[str, Any]:
    """
    处理单个指令：生成变体、打分、验证并保存完整的结果。
    """
    instruction = instruction_data["instruction"]
    language = instruction_data["language"]
    category = instruction_data.get("category", "chat")

    # 生成变体
    variations = generate_variations_negative(
        instruction,
        language,
        category,
        config["generation_count"],
        config["iterations"]
    )
    logging.info(f"Generated {len(variations)} variations for instruction: '{instruction}'")

    # 保存生成的变体
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    variations_filename = os.path.join(output_dir, f"{instruction.replace(' ', '_')}_variations_negative_{timestamp}.xlsx")
    pd.DataFrame({"variations": variations}).to_excel(variations_filename, index=False)
    logging.info(f"Saved generated variations to: {variations_filename}")

    # 对变体打分
    scored_results = []
    validated_variations = []
    for v in variations:
        var, score1, score2 = score_one_variation_negative(instruction, v)
        passed = score1 >= config["scoring_threshold"] and score2 >= config["scoring_threshold"]
        scored_results.append([var, score1, score2, var if passed else ""])
        if passed:
            validated_variations.append(var)

    # 保存评分结果
    score_filename = os.path.join(output_dir, f"{instruction.replace(' ', '_')}_scores_negative_{timestamp}.xlsx")
    pd.DataFrame(scored_results, columns=["variation", "score1", "score2", "passed_variation"]).to_excel(score_filename, index=False)
    logging.info(f"Saved scoring results to: {score_filename}")

    # 保存验证通过的变体
    validated_filename = os.path.join(output_dir, f"{instruction.replace(' ', '_')}_validated_negative_{timestamp}.xlsx")
    pd.DataFrame({"validated_variations": validated_variations}).to_excel(validated_filename, index=False)
    logging.info(f"Saved validated variations to: {validated_filename}")

    return {
        "instruction": instruction,
        "validated_variations": validated_variations,
        "scored_results_file": score_filename
    }

#### 最终生成

In [None]:
final_data = []  # 用于存储处理结果

# 配置是否是测试模式或负样本模式
TEST_MODE = True  # 如果为 True，则只处理第一个指令；为 False 时处理所有指令
NEGATIVE_MODE = True    # 如果为True，则生成负样本

# 更新生成参数
if TEST_MODE:
    # 测试模式下固定生成个数和迭代次数
    test_config = config.copy()  # 创建一个测试用的配置
    test_config["generation_count"] = 10
    test_config["iterations"] = 10

    # 测试模式，仅运行第一个指令
    instr_data = test_config["instructions_list"][0]
    if NEGATIVE_MODE:
        r = process_instruction_negative(instr_data, test_config, output_dir)
    else:
        r = process_instruction(instr_data, test_config, output_dir)
    final_data.append(r)

else:
    # 全量模式，使用原始配置处理所有指令
    for instr_data in config["instructions_list"]:
        if NEGATIVE_MODE:
            r = process_instruction_negative(instr_data, config, output_dir)
        else:
            r = process_instruction(instr_data, config, output_dir)
        final_data.append(r)

# 保存每个指令的单独结果（在 `process_instruction` 中已保存）

# 打印完成信息
if TEST_MODE:
    logging.info(f"测试模式已完成，处理指令：{config['instructions_list'][0]['instruction']}")
else:
    logging.info(f"全量处理已完成，总计处理了 {len(config['instructions_list'])} 个指令。")