In [1]:
import datasets
from transformers import AutoTokenizer
import preprocess_utils
import importlib
importlib.reload(preprocess_utils)
from preprocess_utils import sanity_check, MultiTurnDatasetAugmented
import json 

In [2]:
# 所有工具

In [3]:
with open('/root/PycharmProjects/llm_robo_control/prompts/task_decomposition_rr_api.json', 'r') as file:
    # Load the JSON data from the file
    all_tools = json.load(file)

In [4]:
all_tools 

[{'name': 'general_control',
  'description': 'This API is used for controlling general tasks, typically providing control for listed user-defined tasks, or that is not directly tied to specific task types, or non-clean task',
  'parameters': {'type': 'object',
   'properties': {'action': {'type': 'string',
     'enum': ['start', 'stop', 'pause', 'resume']},
    'user_defined_task': {'type': 'string',
     'description': 'user-defined task described in user request'}},
   'required': ['operation_type']}},
 {'name': 'clean',
  'description': 'Initiate a task of an immediate or unscheduled cleaning or partial clean, i.e., cleaning a part while leaving another part untouched.',
  'parameters': {'type': 'object',
   'properties': {'action': {'type': 'string',
     'enum': ['start', 'stop', 'pause', 'resume']},
    'location': {'type': 'string',
     'description': "Fill in the description of the location from the user request word for word, using the same language as in the user's descript

In [5]:
# 训练集探查

In [6]:
train_set = datasets.load_from_disk("/root/PycharmProjects/llm_finetuning/gpt_task_decomposition/train")

In [7]:
train_set[0]

{'user_request': 'Utilize the balanced suction for a quieter clean.',
 'task_decomposition': '{"name": "set_object_property", "parameters": {"property_name": "suction power", "property_value": "Balanced/标准/lower"}}'}

In [8]:
parse_errors = []
tools_not_found_errors = []
multi_turn = []
for i in range(len(train_set)):
    data = train_set[i]
    try:
        decomposition = data['task_decomposition']
        if (len(decomposition.split(',{')) > 1) or (len(decomposition.split(', {')) > 1):
            multi_turn.append(i)
            continue
        tool_name = json.loads(decomposition)['name']
    except:
        parse_errors.append(i)
        continue
    find = False
    for tool in all_tools:
        if tool_name == tool['name']:
            find = True
            break 
    if not find:
        tools_not_found_errors.append(i)
        continue

In [9]:
len(parse_errors)

47

In [8]:
for i in parse_errors:
    print(i, train_set[i])

71 {'user_request': '开始模仿我的动作', 'task_decomposition': '"<null>"'}
110 {'user_request': 'Upgrade operation ongoing', 'task_decomposition': '"<null>"'}
229 {'user_request': 'user_defined_task:[]$$Power up operation', 'task_decomposition': '"<null>"'}
266 {'user_request': 'user_defined_task:[Living Area Dust-off|Garage Organize]$$Activate Attic Overhaul', 'task_decomposition': '"<null>"'}
308 {'user_request': 'user_defined_task:[午后休闲|深夜工作|日常维护]$$执行假日准备模式', 'task_decomposition': '"<null>"'}
331 {'user_request': '全力以赴', 'task_decomposition': '"<null>"'}
351 {'user_request': 'user_defined_task:[Patio Shine|Closet Clarify]$$Set in motion Staircase Sweep', 'task_decomposition': '"<null>"'}
412 {'user_request': 'user_defined_task:[Rapid Radiance|Deep Dive]$$Power up Flash Flare', 'task_decomposition': '"<null>"'}
433 {'user_request': 'user_defined_task:[Quick Sweep|Deep Dive Cleaning]$$Start Flash Dusting', 'task_decomposition': '"<null>"'}
446 {'user_request': 'user_defined_task:[Quick Glisten

In [9]:
tools_not_found_errors 

[]

In [10]:
train_set[multi_turn[0]] 

{'user_request': '查看前五条和后五条预约任务',
 'task_decomposition': '[{"name": "schedule_operation", "parameters": {"operation_type": "query", "index_from": 1, "index_to": 5}}, {"name": "schedule_operation", "parameters": {"operation_type": "query", "index_from": -5, "index_to": -1}}]'}

In [8]:
# 训练集dataset

In [13]:
# def get_assistant(tools):
#     def check_list(lst):
#         return all(x == lst[0] for x in lst)
#     if not isinstance(tools, list):
#         content = "no corresponding tools."
#     else:
#         tool_names = [tool["name"] for tool in tools]
#         if check_list(tool_names) and len(tool_names) > 1:
#             prefix = "I need to use the following tool sequentially for {} times but each time with different parameters: ".format(len(tool_names))
#             content = prefix + tool_names[0]
#         elif len(tool_names) > 1:
#             prefix = "I need to use the following tools sequentially: "
#             content = prefix + ", ".join(tool_names)
#         else:
#             prefix = "I need to use the following tool: "
#             content = prefix + tool_names[0]
#     assistant = {"role": "assistant", "content": content}
#     return assistant
def get_assistant(tools):
    def check_list(lst):
        return all(x == lst[0] for x in lst)
    if not isinstance(tools, list):
        content = "no corresponding tools."
        assistant = {"role": "assistant", "content": content}
    else:
        assistant = None
    return assistant

In [14]:
def get_user(user_request):
    return {"role": "user", "content": user_request}

In [15]:
def parse_tools(tools_string):
    tools = json.loads(tools_string)
    if not isinstance(tools, list):
        if tools == "<null>":
            return tools
        else:
            tools = [tools]
    for i in range(len(tools)):
        new_tool = {'role': 'tool'}
        new_tool.update(tools[i])
        tools[i] = new_tool
    return tools

In [16]:
def transforms(examples):
    user_request, task_decomposition = examples["user_request"], examples["task_decomposition"]
    user = get_user(user_request)
    tools = parse_tools(task_decomposition)
    assistant = get_assistant(tools)
    conversation = [user]
    if assistant is not None:
        conversation.append(assistant)
        hit = False
    else:
        hit = True
    if isinstance(tools, list):
        conversation += tools
        num_decomposition = len(tools)
    else:
        num_decomposition = 1
    conversations = conversation
    # conversations = []
    # for user_request, task_decomposition in zip(examples["user_request"], examples["task_decomposition"]):
    #     user = get_user(user_request)
    #     tools = parse_tools(task_decomposition)
    #     assistant = get_assistant(tools)
    #     conversation = [user]
    #     conversation.append(assistant)
    #     if isinstance(tools, list):
    #         conversation += tools 
    #     conversations.append(conversation)
    return {"tools": json.dumps(all_tools, ensure_ascii=False), 
            "conversations": json.dumps(conversations, ensure_ascii=False),
            "num_decomposition": num_decomposition, 'hit': hit} 

In [17]:
new = train_set.map(transforms)

Map:   0%|          | 0/3232 [00:00<?, ? examples/s]

In [18]:
new.to_json('formatted_data/robocontrol_dataset.jsonl')

Creating json from Arrow format:   0%|          | 0/4 [00:00<?, ?ba/s]

31836730

In [19]:
new[0]

{'user_request': 'Utilize the balanced suction for a quieter clean.',
 'task_decomposition': '{"name": "set_object_property", "parameters": {"property_name": "suction power", "property_value": "Balanced/标准/lower"}}',
 'tools': '[{"name": "general_control", "description": "This API is used for controlling general tasks, typically providing control for listed user-defined tasks, or that is not directly tied to specific task types, or non-clean task", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["start", "stop", "pause", "resume"]}, "user_defined_task": {"type": "string", "description": "user-defined task described in user request"}}, "required": ["operation_type"]}}, {"name": "clean", "description": "Initiate a task of an immediate or unscheduled cleaning or partial clean, i.e., cleaning a part while leaving another part untouched.", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["start", "stop", "pause", "re

In [20]:
new[parse_errors[0]]

{'user_request': '开始模仿我的动作',
 'task_decomposition': '"<null>"',
 'tools': '[{"name": "general_control", "description": "This API is used for controlling general tasks, typically providing control for listed user-defined tasks, or that is not directly tied to specific task types, or non-clean task", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["start", "stop", "pause", "resume"]}, "user_defined_task": {"type": "string", "description": "user-defined task described in user request"}}, "required": ["operation_type"]}}, {"name": "clean", "description": "Initiate a task of an immediate or unscheduled cleaning or partial clean, i.e., cleaning a part while leaving another part untouched.", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["start", "stop", "pause", "resume"]}, "location": {"type": "string", "description": "Fill in the description of the location from the user request word for word, using the same langu

In [25]:
len(new.filter(lambda example: example['hit']==False)) == len(parse_errors)

True

In [27]:
len(new.filter(lambda example: example['num_decomposition'] > 1)) == len(multi_turn)

Filter:   0%|          | 0/3232 [00:00<?, ? examples/s]

True

In [28]:
# 正确性验证

In [29]:
with open('formatted_data/robocontrol_dataset.jsonl', "r", encoding="utf-8") as f:
    if 'formatted_data/robocontrol_dataset.jsonl'.endswith(".json"):
        train_data = json.load(f)
    elif 'formatted_data/robocontrol_dataset.jsonl'.endswith(".jsonl"):
        train_data = [json.loads(line) for line in f]

In [30]:
len(train_data)

3232

In [28]:
json.loads(train_data[0]['tools'])

[{'name': 'general_control',
  'description': 'This API is used for controlling general tasks, typically providing control for listed user-defined tasks, or that is not directly tied to specific task types, or non-clean task',
  'parameters': {'type': 'object',
   'properties': {'action': {'type': 'string',
     'enum': ['start', 'stop', 'pause', 'resume']},
    'user_defined_task': {'type': 'string',
     'description': 'user-defined task described in user request'}},
   'required': ['operation_type']}},
 {'name': 'clean',
  'description': 'Initiate a task of an immediate or unscheduled cleaning or partial clean, i.e., cleaning a part while leaving another part untouched.',
  'parameters': {'type': 'object',
   'properties': {'action': {'type': 'string',
     'enum': ['start', 'stop', 'pause', 'resume']},
    'location': {'type': 'string',
     'description': "Fill in the description of the location from the user request word for word, using the same language as in the user's descript

In [29]:
len(train_data[0]['tools'])

7808

In [16]:
tokenizer = AutoTokenizer.from_pretrained("/data/dataset/huggingface/hub/models--THUDM--chatglm3-6b/snapshots/456aa875cf1f46623006edaa23103774ea9c0eae", trust_remote_code=True)

In [31]:
len(tokenizer.encode(train_data[0]['tools']))

2346

In [17]:
train_dataset = MultiTurnDatasetAugmented(
    train_data,
    tokenizer,
    4096,
)

In [18]:
len(train_dataset)

3232

In [34]:
for i in range(len(train_dataset)):
    train_dataset[i]

In [19]:
train_data[33]['conversations']

'[{"role": "user", "content": "赶紧走。"}, {"role": "tool", "name": "go_away", "parameters": null}]'

In [None]:
json.dumps(json.loads(train_data[33]['conversations']), ensure_ascii=False) == train_data[33]['conversations']

In [20]:
json.loads(train_data[33]['conversations'])

[{'role': 'user', 'content': '赶紧走。'},
 {'role': 'tool', 'name': 'go_away', 'parameters': None}]

In [21]:
json.loads(train_data[33]['conversations'])[1]["parameters"] is None 

True

In [23]:
import ast, astunparse 
def format_function_call(function_name: str, parameters):
    function_name = ast.Name(id=function_name)
    if parameters is None:
        parameters = {}
    keywords = [
        ast.keyword(arg=arg_name, value=ast.Constant(arg_value))
        for arg_name, arg_value in parameters.items()
    ]
    func_call = ast.Call(func=function_name, args=[], keywords=keywords)
    return astunparse.unparse(func_call).strip()

In [24]:
format_function_call('mapping', None)

'mapping()'

In [29]:
sanity_check(train_dataset[parse_errors[1]]['input_ids'], train_dataset[parse_errors[1]]['labels'], tokenizer)  

Sanity Check >>>>>>>>>>>>>
           '[gMASK]':  64790 ->   -100
               'sop':  64792 ->   -100
        '<|system|>':  64794 ->   -100
                  '':  30910 ->   -100
                '\n':     13 ->   -100
            'Answer':  20115 ->   -100
               'the':    267 ->   -100
         'following':   1762 ->   -100
         'questions':   2554 ->   -100
                'as':    362 ->   -100
              'best':   1077 ->   -100
                'as':    362 ->   -100
               'you':    344 ->   -100
               'can':    457 ->   -100
                 '.':  30930 ->   -100
               'You':    809 ->   -100
              'have':    431 ->   -100
            'access':   1675 ->   -100
                'to':    289 ->   -100
               'the':    267 ->   -100
         'following':   1762 ->   -100
             'tools':   4159 ->   -100
                 ':':  30954 ->   -100
                '\n':     13 ->   -100
                 '[':  30995 ->   -10

In [31]:
# 测试集数据探查
train_set = datasets.load_from_disk("/root/PycharmProjects/llm_finetuning/gpt_task_decomposition/test")

In [32]:
len(train_set)

1078

In [33]:
train_set[0]

{'user_request': '重新启动绘图程序。',
 'task_decomposition': '{"name": "mapping", "parameters": {"action": "start"}}'}

In [34]:
parse_errors = []
tools_not_found_errors = []
multi_turn = []
for i in range(len(train_set)):
    data = train_set[i]
    try:
        decomposition = data['task_decomposition']
        if (len(decomposition.split(',{')) > 1) or (len(decomposition.split(', {')) > 1):
            multi_turn.append(i)
            continue
        tool_name = json.loads(decomposition)['name']
    except:
        parse_errors.append(i)
        continue
    find = False
    for tool in all_tools:
        if tool_name == tool['name']:
            find = True
            break
    if not find:
        tools_not_found_errors.append(i)
        continue

In [35]:
len(parse_errors)

19

In [36]:
for i in parse_errors:
    print(i, train_set[i])

10 {'user_request': 'user_defined_task:[Morning Routine|Evening Wind Down]$$Begin Nighttime Ritual', 'task_decomposition': '"<null>"'}
47 {'user_request': 'user_defined_task:[例行巡查|紧急响应]$$实施安静模式', 'task_decomposition': '"<null>"'}
129 {'user_request': '加入我的队伍', 'task_decomposition': '"<null>"'}
313 {'user_request': 'user_defined_task:[]$$执行饭后清扫计划', 'task_decomposition': '"<null>"'}
367 {'user_request': 'user_defined_task:[窗帘清洗|地毯清理]$$开始床铺整理', 'task_decomposition': '"<null>"'}
392 {'user_request': 'user_defined_task:[Morning Touch-Up|Nightfall Refresh]$$Engage Afternoon Aeration', 'task_decomposition': '"<null>"'}
408 {'user_request': 'user_defined_task:[Holiday Mode|Vacation Prep]$$Engage Workday Setup', 'task_decomposition': '"<null>"'}
463 {'user_request': '开始模仿', 'task_decomposition': '"<null>"'}
487 {'user_request': 'Software upgrade in process', 'task_decomposition': '"<null>"'}
528 {'user_request': 'user_defined_task:[Whisper Wash|Turbo Thrust]$$Turn on Midday Mode', 'task_decompo

In [37]:
tools_not_found_errors

[]

In [38]:
multi_turn

[]

In [39]:
new = train_set.map(transforms)
new.to_json('formatted_data/robocontrol_dataset_testset.jsonl')

Map:   0%|          | 0/1078 [00:00<?, ? examples/s]

Creating json from Arrow format:   0%|          | 0/2 [00:00<?, ?ba/s]

10618998

In [40]:
new[0]

{'user_request': '重新启动绘图程序。',
 'task_decomposition': '{"name": "mapping", "parameters": {"action": "start"}}',
 'tools': '[{"name": "general_control", "description": "This API is used for controlling general tasks, typically providing control for listed user-defined tasks, or that is not directly tied to specific task types, or non-clean task", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["start", "stop", "pause", "resume"]}, "user_defined_task": {"type": "string", "description": "user-defined task described in user request"}}, "required": ["operation_type"]}}, {"name": "clean", "description": "Initiate a task of an immediate or unscheduled cleaning or partial clean, i.e., cleaning a part while leaving another part untouched.", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["start", "stop", "pause", "resume"]}, "location": {"type": "string", "description": "Fill in the description of the location from the u

In [41]:
new[parse_errors[0]]

{'user_request': 'user_defined_task:[Morning Routine|Evening Wind Down]$$Begin Nighttime Ritual',
 'task_decomposition': '"<null>"',
 'tools': '[{"name": "general_control", "description": "This API is used for controlling general tasks, typically providing control for listed user-defined tasks, or that is not directly tied to specific task types, or non-clean task", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["start", "stop", "pause", "resume"]}, "user_defined_task": {"type": "string", "description": "user-defined task described in user request"}}, "required": ["operation_type"]}}, {"name": "clean", "description": "Initiate a task of an immediate or unscheduled cleaning or partial clean, i.e., cleaning a part while leaving another part untouched.", "parameters": {"type": "object", "properties": {"action": {"type": "string", "enum": ["start", "stop", "pause", "resume"]}, "location": {"type": "string", "description": "Fill in the description of t

In [42]:
len(new.filter(lambda example: example['hit']==False)) == len(parse_errors)

Filter:   0%|          | 0/1078 [00:00<?, ? examples/s]

True

In [43]:
len(new.filter(lambda example: example['num_decomposition'] > 1)) == len(multi_turn)

Filter:   0%|          | 0/1078 [00:00<?, ? examples/s]

True