In [35]:
import rich

In [1]:
from datasets import load_dataset

raw_data_path = "../../data/bfcl_multi_turn.json"
raw_ds = load_dataset("json", data_files=raw_data_path, split="train")
# 采样5k
# raw_ds = raw_ds.shuffle(seed=42).select(range(5000))

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
new_instruction = """You are an expert in composing functions. You are given a question and a set of possible functions. Based on the 
question, you will need to make one or more function/tool calls to achieve the purpose.
If none of the functions can be used, point it out. If the given question lacks the parameters required by the 
function, also point it out.
You should only return the function calls in your response.

If you decide to invoke any of the function(s), you MUST put it in the format of 
You SHOULD NOT include any other text in the response.

At each turn, you should try your best to complete the tasks requested by the user within the current turn. 
Continue to output functions to call until you have fulfilled the user's request to the best of your ability. Once 
you have no more functions to call, the system will consider the current turn complete and proceed to the next turn
or task."""
print(new_instruction)

You are an expert in composing functions. You are given a question and a set of possible functions. Based on the 
question, you will need to make one or more function/tool calls to achieve the purpose.
If none of the functions can be used, point it out. If the given question lacks the parameters required by the 
function, also point it out.
You should only return the function calls in your response.

If you decide to invoke any of the function(s), you MUST put it in the format of 
You SHOULD NOT include any other text in the response.

At each turn, you should try your best to complete the tasks requested by the user within the current turn. 
Continue to output functions to call until you have fulfilled the user's request to the best of your ability. Once 
you have no more functions to call, the system will consider the current turn complete and proceed to the next turn
or task.


In [49]:
def _get_node_name(node):
    """
    递归提取复杂的函数名。
    支持: print, os.path.join, tools['search'], get_tool()
    """
    if isinstance(node, ast.Name):
        return node.id
    elif isinstance(node, ast.Attribute):
        return f"{_get_node_name(node.value)}.{node.attr}"
    elif isinstance(node, ast.Subscript):
        value = _get_node_name(node.value)
        slice_val = "?"
        if isinstance(node.slice, ast.Constant):
            slice_val = repr(node.slice.value)
        else:
            slice_val = ast.unparse(node.slice)
        return f"{value}[{slice_val}]"
    elif isinstance(node, ast.Call):
        return ast.unparse(node)
    else:
        return ast.unparse(node)

def _get_arg_value(node):
    """
    解析参数值。
    策略：优先转为 Python 原生对象 (int, str, list...)，
    如果遇到变量或表达式 (如 x+1, func())，则回退为源码字符串。
    """
    try:
        # 尝试把 AST 节点转为 Python 对象 (例如: "hello", 123, [1, 2])
        return ast.literal_eval(node)
    except (ValueError, TypeError, SyntaxError):
        # 如果包含变量、函数调用或运算 (例如: x, 1+1, call())
        # 使用 unparse 还原为代码字符串
        return ast.unparse(node)

def convert_python_to_xml(input_str:str)->list[dict[str,str]]:
    """
    将 Python 函数调用列表转换为 <tool_call> XML 格式。
    静态解析，安全，不执行代码。
    """
    result_parts = []
    
    try:
        # 1. 解析模式：eval (处理表达式)
        tree = ast.parse(input_str.strip(), mode='eval')
        
        # 2. 校验最外层是否为列表
        if not isinstance(tree.body, ast.List):
            # 容错处理：如果用户没有包 [], 尝试把它当做单个 Call 处理?
            # 这里为了严谨，我们坚持要求是列表，或者你可以扩展逻辑
            return "Error: Input must be a list of calls, e.g., [func1(), func2()]"
            
        # 3. 遍历列表元素
        for node in tree.body.elts:
            # 只处理函数调用
            if isinstance(node, ast.Call):
                # --- A. 获取全能函数名 ---
                func_name = _get_node_name(node.func)
                
                args_dict = {}
                
                # --- B. 处理位置参数 (Positional Args) ---
                for i, arg in enumerate(node.args):
                    args_dict[f"arg_{i}"] = _get_arg_value(arg)
                
                # --- C. 处理关键字参数 (Keyword Args) ---
                for keyword in node.keywords:
                    key = keyword.arg
                    value = _get_arg_value(keyword.value)
                    args_dict[key] = value
                
                # --- D. 构建 json ---
                # 使用 ensure_ascii=False 保证中文不乱码
                json_args = json.dumps(args_dict, ensure_ascii=False)
                
                json_block :dict= {"name": func_name, "arguments": json_args}
                result_parts.append(json_block)
            else:
                return [{"errot":"error"}]
    except SyntaxError as e:
        return f"SyntaxError: 输入的代码不符合 Python 语法。详情: {e}"
    except Exception as e:
        return f"SystemError: 转换过程发生未知错误: {e}"

    return result_parts


In [50]:
import json

def convert_system_content(example):
    
    tools_text = ""
    if "tools" in example and example["tools"]:
        try:
            # 解析 tools 字符串
            tools_obj = json.loads(example["tools"])
            if isinstance(tools_obj, dict):
                tools_obj = [tools_obj]
            
            # 拼接模板要求的 XML 结构
            tools_header = (
                "\n\n# Tools\n\n"
                "You may call one or more functions to assist with the user query.\n\n"
                "You are provided with function signatures within <tools></tools> XML tags:\n"
                "<tools>"
            )
            
            # 将每个 tool 转为 JSON 字符串并换行拼接
            tools_json_body = ""
            for tool in tools_obj:
                tools_json_body += "\n" + json.dumps(tool, ensure_ascii=False)
            
            tools_footer = (
                "\n</tools>\n\n"
                "For each function call, return a json object with function name and arguments "
                "within <tool_call></tool_call> XML tags:\n"
                "<tool_call>\n"
                '{"name": <function-name>, "arguments": <args-json-object>}\n'
                "</tool_call>"
            )
            
            tools_text = tools_header + tools_json_body + tools_footer
        except Exception as e:
            print(f"解析 tools 失败: {e}")
            tools_text = ""

    # 3. 合并成最终的 content
    full_content = new_instruction + tools_text
    
    return {
        "role": "system",
        "content": full_content
    }

In [51]:
import json

def convert_assistant_content(example):
    content = example["content"]
    tool_calls = []
    
    if "</think>" in example["content"]:
        content = example["content"].rsplit("</think>",1)[-1].strip()
    if content.startswith("[") and content.endswith("]"):
        tool_calls = convert_python_to_xml(content)
        xml_content = ""
        for tool_call in tool_calls:
            if isinstance(tool_call,dict) and "name" in tool_call and "arguments" in tool_call:
                name = tool_call.get("name")
                args = tool_call.get("arguments")
                xml_content += f'<tool_call>\n{{"name": "{name}", "arguments": {args}}}\n</tool_call>\n'
            else:
                return {"role":"","content":""}
        content = xml_content.strip()

    return {
        "role": "assistant",
        "content": content
    }

In [52]:
def convert_user_content(example):
    return {
        "role":"user",
        "content":example["content"]
    }

In [53]:
import ast

def convert_tool_content(example):
    tool_calls:list[dict]=ast.literal_eval(example["content"])
    content=""
    for tool_call in tool_calls:
        for value in tool_call.values():
            content+=f"<tool_response>\n{value}\n</tool_response>\n"
    content=content.rstrip()
    return {
        "role":"user",
        "content":content
    }

In [54]:
def convert_messages(example):
    messages = []
    if example.get("0") is not None:
        messages.append(convert_system_content(example.get("0")))

    turn_keys = sorted([k for k in example.keys() if k.isdigit() and k != "0"], key=int)
    for key in turn_keys:
        turn_data = example[key]
        if not turn_data: 
            continue
        role = turn_data.get("role")
        message={}
        if role == "user":
            message=convert_user_content(turn_data)
            if message.get("role")!="user":
                return {"messages": []}
        elif role == "assistant":
            message=convert_assistant_content(turn_data)
            if message.get("role")!="assistant":
                return {"messages": []}
        elif role == "tool":
            message=convert_tool_content(turn_data)
            if message.get("role")!="user":
                return {"messages": []}
        messages.append(message)
    return {"messages": messages}

In [55]:
rich.print(convert_messages(raw_ds[4716]))

In [45]:
from transformers import AutoTokenizer

In [46]:
tokenizer=AutoTokenizer.from_pretrained("/dfs/data/models/Qwen3-4B-Instruct-2507/")

In [56]:
messages_ds=raw_ds.map(convert_messages,remove_columns=raw_ds.column_names)

Map: 100%|██████████| 16978/16978 [00:25<00:00, 657.68 examples/s]


In [57]:
messages_ds = messages_ds.filter(lambda x: x["messages"] is not None and len(x["messages"]) > 0)

Filter: 100%|██████████| 16978/16978 [00:01<00:00, 8605.25 examples/s]


In [58]:
len(messages_ds)

16931

In [60]:
def truncate_conversation_at_last_tool_call(example):
    messages = example["messages"]
    
    if not messages:
        return {"messages": []}

    # 1. 检查最后一条消息是否符合要求
    last_msg = messages[-1]
    if last_msg["role"] == "assistant" and last_msg["content"].startswith("<tool_call>"):
        return {"messages": messages}

    # 2. 从后往前找最后一个以 <tool_call> 开头的 assistant 消息索引
    last_valid_index = -1
    for i in range(len(messages) - 1, -1, -1):
        msg = messages[i]
        if msg["role"] == "assistant" and msg["content"].startswith("<tool_call>"):
            last_valid_index = i
            break

    # 3. 根据索引截断
    if last_valid_index != -1:
        # 保留到该条有效 tool_call 消息为止
        new_messages = messages[:last_valid_index + 1]
        return {"messages": new_messages}
    else:
        # 如果整条对话都没有符合要求的 tool_call，返回空
        return {"messages": []}


In [61]:
messages_ds = messages_ds.map(truncate_conversation_at_last_tool_call)

Map: 100%|██████████| 16931/16931 [00:03<00:00, 4307.74 examples/s]


In [62]:
messages_ds = messages_ds.filter(lambda x: len(x["messages"]) > 0)

Filter: 100%|██████████| 16931/16931 [00:01<00:00, 14363.76 examples/s]


In [63]:
len(messages_ds)

14941

In [68]:
import json

with open("../../data/openai_messages_fc.json", "w", encoding="utf-8") as f:
    for data in messages_ds:
        f.write(json.dumps(data, ensure_ascii=False) + "\n")