In [None]:
from peft import PeftModel, PeftConfig
from transformers import AutoModelForCausalLM, AutoTokenizer

# 加载基础模型（这个文件很大，需要另外下载）
base_model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-1.5B-Instruct")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-1.5B-Instruct")

In [None]:
model = PeftModel.from_pretrained(base_model, "sqlflow-qwen-lora-final")

测试（生成sql查询）

In [None]:
def generate_sql(question, db_schema=None):
    """
    生成SQL查询
    """
    # 准备输入
    if db_schema:
        input_text = f"问题: {question}\n数据库结构: {db_schema}\nSQL:"
    else:
        input_text = question

    # 编码输入
    inputs = tokenizer(input_text, return_tensors="pt", truncation=True, max_length=512)

    # 生成SQL
    outputs = model.generate(
        inputs.input_ids,
        max_length=256,
        num_return_sequences=1,
        temperature=0.1,
        do_sample=False
    )

    # 解码输出
    sql_query = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return sql_query

# 使用示例
question = "查询销售部门工资最高的员工"
db_schema = "employees(id, name, department, salary), departments(id, name)"

sql = generate_sql(question, db_schema)
print(f"问题: {question}")
print(f"生成的SQL: {sql}")

1.根据表名获取表结构，解析schema文档

In [None]:
import re

def get_table_schema(md_content, table_name):
    """
    根据表名从MD内容中提取完整的表结构

    Args:
        md_content: MD文档内容
        table_name: 表名

    Returns:
        表结构的字符串表示
    """
    # 构建表头的正则表达式模式
    table_header_pattern = rf'# Table:\s*{re.escape(table_name)}\s*\n\s*\['

    # 查找表开始位置
    table_start = re.search(table_header_pattern, md_content)
    if not table_start:
        return f"表 '{table_name}' 未找到"

    start_pos = table_start.end()

    # 查找表的结束位置（匹配对应的]）
    bracket_count = 1
    current_pos = start_pos

    while bracket_count > 0 and current_pos < len(md_content):
        if md_content[current_pos] == '[':
            bracket_count += 1
        elif md_content[current_pos] == ']':
            bracket_count -= 1
        current_pos += 1

    if bracket_count == 0:
        table_schema = md_content[start_pos-1:current_pos]  # -1 是为了包含开始的[
        return table_schema.strip()
    else:
        return f"表 '{table_name}' 的结构不完整"

def load_and_get_schema(md_file_path, table_name):
    """
    加载MD文件并获取指定表的结构

    Args:
        md_file_path: MD文件路径
        table_name: 表名
    """
    try:
        with open(md_file_path, 'r', encoding='utf-8') as file:
            md_content = file.read()

        schema = get_table_schema(md_content, table_name)
        return schema

    except FileNotFoundError:
        return f"错误：找不到文件 {md_file_path}"
    except Exception as e:
        return f"读取文件时出错：{e}"

md_file = "final_algorithm_competition_schema.md"  # 替换为你的MD文件路径
# table_name = "dwd_argothek_abilityinfo_effects_hi"  # 替换为你要查询的表名
#
# schema_str = load_and_get_schema(md_file, table_name)
# print(schema_str)

2.加载输入json并解析，构建prompt

In [None]:
import json

def generate_sql_prompt(json_data, md_file_path, max_schema_length=1000):
    """
    根据JSON数据生成Text-to-SQL的prompt

    Args:
        json_data: 包含问题、表列表和知识的JSON数据
        md_file_path: MD文件路径
        max_schema_length: 最大schema长度，超过会截断

    Returns:
        生成的prompt字符串
    """
    try:
        # 解析JSON数据
        if isinstance(json_data, str):
            data = json.loads(json_data)
        else:
            data = json_data

        # 提取关键信息
        question = data.get("question", "")
        table_list = data.get("table_list", [])
        knowledge = data.get("knowledge", "")

        # 读取MD文件内容
        with open(md_file_path, 'r', encoding='utf-8') as file:
            md_content = file.read()

        # 收集所有表的schema
        schema_parts = []
        for table_name in table_list:
            schema = get_table_schema(md_content, table_name)
            if schema and not schema.startswith("表"):
                # 如果schema太长，进行截断
                if len(schema) > max_schema_length:
                    schema = schema[:max_schema_length] + "...\n(由于长度限制，部分内容已省略)"
                schema_parts.append(f"表 {table_name} 的结构:\n{schema}")

        # 构建完整的prompt
        prompt_parts = []

        # 1. 问题描述
        prompt_parts.append(f"问题: {question}")

        # 2. 表结构
        if schema_parts:
            prompt_parts.append("\n相关表结构:")
            prompt_parts.extend(schema_parts)

        # 3. 业务知识
        if knowledge:
            prompt_parts.append(f"\n业务知识:\n{knowledge}")

        # 4. 输出要求
        prompt_parts.append("\n请根据以上信息生成相应的SQL查询语句。")

        return "\n".join(prompt_parts)

    except Exception as e:
        return f"生成prompt时出错: {e}"

# 示例JSON数据
json_input = {
    "sql_id": "sql_1",
    "question": "统计2025.07.24的手游全量用户且标签为其他，在竞品业务下2025.05.30-2025.07.24的在线时长。\n输出：suserid、sgamecode、ionlinetime\n\n",
    "复杂度": "中等",
    "table_list": [
        "dws_mgamejp_login_user_activity_di",
        "dim_vplayerid_vies_df"
    ],
    "knowledge": "竞品业务：\nsgamecode in (\"initiatived\",\"jordass\",\"esports\",\"allianceforce\",\"strategy\",\"playzone\",\"su\")\nsaccounttype = \"-100\" -- 账号体系，取-100表示汇总\nand suseridtype in (\"qq\",\"wxid\") -- 用户类型\nand splattype = \"-100\" -- 平台类型\nand splat = \"-100\" -- 平台，写死为-100\n"
}

prompt = generate_sql_prompt(json_input, md_file)
print(prompt)

3.加载json文件

In [None]:
import json
import os
from datetime import datetime

def batch_generate_prompts(json_array_file, md_file_path, output_file=None, max_schema_length=1000):
    """
    批量处理JSON数组文件，为每个对象生成prompt

    Args:
        json_array_file: JSON数组文件路径
        md_file_path: MD文件路径
        output_file: 输出文件路径，如果为None则自动生成
        max_schema_length: 最大schema长度

    Returns:
        生成的prompt数量
    """
    try:
        # 读取JSON数组文件
        with open(json_array_file, 'r', encoding='utf-8') as file:
            data_array = json.load(file)

        # 读取MD文件内容
        with open(md_file_path, 'r', encoding='utf-8') as file:
            md_content = file.read()

        # 生成输出文件名
        if output_file is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            output_file = f"sql_prompts_{timestamp}.txt"

        # 生成所有prompt
        prompts = []
        for i, data in enumerate(data_array, 1):
            print(f"正在处理第 {i}/{len(data_array)} 个对象...")

            prompt = generate_single_prompt(data, md_content, max_schema_length)
            prompts.append(prompt)

        # 保存到文件
        with open(output_file, 'w', encoding='utf-8') as file:
            for i, prompt in enumerate(prompts, 1):
                file.write(f"=== Prompt {i} (SQL ID: {data_array[i-1].get('sql_id', 'N/A')}) ===\n")
                file.write(prompt)
                file.write("\n\n" + "="*80 + "\n\n")

        print(f"成功生成 {len(prompts)} 个prompt，已保存到: {output_file}")
        return len(prompts)

    except Exception as e:
        print(f"批量处理时出错: {e}")
        return 0

def generate_single_prompt(data, md_content, max_schema_length=1000):
    """
    为单个JSON对象生成prompt

    Args:
        data: 单个JSON对象
        md_content: MD文件内容
        max_schema_length: 最大schema长度

    Returns:
        生成的prompt字符串
    """
    # 提取关键信息
    question = data.get("question", "")
    table_list = data.get("table_list", [])
    knowledge = data.get("knowledge", "")
    complexity = data.get("复杂度", "")

    # 收集所有表的schema
    schema_parts = []
    for table_name in table_list:
        schema = get_table_schema(md_content, table_name)
        if schema and not schema.startswith("表"):
            # 如果schema太长，进行截断
            if len(schema) > max_schema_length:
                schema = schema[:max_schema_length] + "...\n(由于长度限制，部分内容已省略)"
            schema_parts.append(f"表 {table_name} 的结构:\n{schema}")

    # 构建完整的prompt
    prompt_parts = []

    # 1. 问题描述
    prompt_parts.append(f"问题: {question}")

    # 2. 复杂度信息
    if complexity:
        prompt_parts.append(f"复杂度: {complexity}")

    # 3. 表结构
    if schema_parts:
        prompt_parts.append("\n相关表结构:")
        prompt_parts.extend(schema_parts)

    # 4. 业务知识
    if knowledge:
        prompt_parts.append(f"\n业务知识:\n{knowledge}")

    # 5. 输出要求
    prompt_parts.append("\n请根据以上信息生成相应的SQL查询语句。")

    return "\n".join(prompt_parts)
# 文件路径配置
json_array_file = "final_dataset.json"  # 替换为你的JSON数组文件路径
output_file = "sql_prompts.txt"  # 可选的输出文件路径，如果为None则自动生成

# 批量生成prompt
count = batch_generate_prompts(json_array_file, md_file, output_file)

if count > 0:
    print(f"处理完成！共生成 {count} 个prompt。")
else:
    print("处理失败！")

4.分类/分解（这个没做）

5.自校正（暂时没做）

6.批量生成结果

In [None]:
import json
import re
import torch
from tqdm import tqdm

def parse_prompts_file(prompts_file):
    """
    解析prompts文件，提取每个prompt块

    Args:
        prompts_file: prompts文件路径

    Returns:
        包含所有prompt块的列表，每个块是一个字典
    """
    with open(prompts_file, 'r', encoding='utf-8') as f:
        content = f.read()

    # 使用正则表达式分割不同的prompt块
    # 每个prompt块以 "=== Prompt X (SQL ID: sql_X) ===" 开始，以 "=" * 80 结束
    pattern = r'=== Prompt \d+ \(SQL ID: (sql_\d+)\) ===(.*?)' + re.escape('=' * 80)
    matches = re.findall(pattern, content, re.DOTALL)

    prompts = []
    for sql_id, prompt_content in matches:
        prompts.append({
            'sql_id': sql_id.strip(),
            'content': prompt_content.strip()
        })

    return prompts

def generate_sql_from_prompts(prompts_file, model, tokenizer, output_file=None, device=None):
    """
    使用已加载的模型从prompts文件生成SQL语句

    Args:
        prompts_file: 包含prompts的文件路径
        model: 已加载的模型
        tokenizer: 已加载的tokenizer
        output_file: 输出文件路径，如果为None则自动生成
        device: 使用的设备，如果为None则自动选择

    Returns:
        生成的SQL数量
    """
    # 设置设备
    if device is None:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"使用设备: {device}")

    # 设置生成参数
    generation_config = {
        "max_new_tokens": 512,
        "do_sample": False,
        "temperature": 0.1,
        "top_p": 0.9,
        "pad_token_id": tokenizer.eos_token_id,
        "eos_token_id": tokenizer.eos_token_id,
    }

    # 解析prompts文件
    prompts = parse_prompts_file(prompts_file)
    print(f"找到 {len(prompts)} 个prompts")

    results = []

    print("开始批量生成SQL...")
    for prompt in tqdm(prompts):
        sql_id = prompt['sql_id']
        prompt_text = prompt['content']

        # 生成SQL
        try:
            # 编码输入
            inputs = tokenizer(prompt_text, return_tensors="pt", truncation=True, max_length=2048)
            inputs = {k: v.to(device) for k, v in inputs.items()}

            # 生成SQL
            with torch.no_grad():
                outputs = model.generate(
                    **inputs,
                    **generation_config
                )

            # 解码输出
            generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)

            # 提取SQL部分（去掉prompt部分）
            sql = generated_text[len(prompt_text):].strip()

            results.append({
                'sql_id': sql_id,
                'prompt': prompt_text,
                'sql': sql
            })
        except Exception as e:
            print(f"生成SQL时出错 (SQL ID: {sql_id}): {e}")
            results.append({
                'sql_id': sql_id,
                'prompt': prompt_text,
                'sql': f"ERROR: {str(e)}"
            })

    # 生成输出文件名
    if output_file is None:
        import datetime
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = f"generated_sql_{timestamp}.json"

    # 保存结果
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=2)

    print(f"成功生成 {len(results)} 个SQL语句，已保存到: {output_file}")
    return len(results), results

# 使用示例
if __name__ == "__main__":
    # 假设 model 和 tokenizer 已经在笔记本中加载

    # 配置路径
    prompts_file = "sql_prompts.txt"  # 替换为你的prompts文件路径
    output_file = "generated_sql_results.json"  # 可选的输出文件路径

    # 生成SQL
    count, results = generate_sql_from_prompts(prompts_file, model, tokenizer, output_file)

    if count > 0:
        print(f"SQL生成完成！共生成 {count} 个SQL语句。")

        # 打印前几个结果作为示例
        print("\n前几个生成结果示例:")
        for i, result in enumerate(results[:3]):
            print(f"\n--- 结果 {i+1} (SQL ID: {result['sql_id']}) ---")
            print(f"生成的SQL: {result['gsql']}")
    else:
        print("SQL生成失败！")

8.整合结果到结果文件里

In [None]:
import json

def merge_sql_to_questions(sql_file, questions_file, output_file):
    """
    将SQL语句根据sql_id合并到问题对象中

    Args:
        sql_file: 包含SQL语句的JSON文件路径
        questions_file: 包含问题的JSON文件路径
        output_file: 输出文件路径
    """

    # 读取SQL数据
    with open(sql_file, 'r', encoding='utf-8') as f:
        sql_data = json.load(f)

    # 读取问题数据
    with open(questions_file, 'r', encoding='utf-8') as f:
        questions_data = json.load(f)

    # 创建SQL字典便于查找
    sql_dict = {item["sql_id"]: item["sql"] for item in sql_data}

    # 合并数据
    merged_data = []
    for question_item in questions_data:
        sql_id = question_item["sql_id"]

        # 创建新对象，包含原问题对象的所有属性
        merged_item = question_item.copy()

        # 添加SQL语句
        if sql_id in sql_dict:
            merged_item["sql"] = sql_dict[sql_id]
        else:
            merged_item["sql"] = None  # 或者设置为空字符串 ""

        merged_data.append(merged_item)

    # 保存结果
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(merged_data, f, ensure_ascii=False, indent=2)

    print(f"合并完成！共处理 {len(merged_data)} 个对象")
    print(f"结果已保存到: {output_file}")

# 使用示例
if __name__ == "__main__":
    # 文件路径 - 请根据实际情况修改
    sql_file_path = "sql_data.json"  # 包含SQL语句的JSON文件
    questions_file_path = "final_dataset.json"  # 包含问题的JSON文件
    output_file_path = "merged_data.json"  # 输出文件

    merge_sql_to_questions(sql_file_path, questions_file_path, output_file_path)