In [None]:
import os
import json
import yaml
from datetime import datetime
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'

import importlib
import help_functions
importlib.reload(help_functions)
from help_functions import *

In [2]:
from transformers import AutoModelForCausalLM, AutoTokenizer

model_path = "../../Qwen2.5-Coder-3B-Instruct"

model_code = AutoModelForCausalLM.from_pretrained(
    model_path,
    torch_dtype="auto",
    device_map="auto"
)
tokenizer_code = AutoTokenizer.from_pretrained(model_path)

Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [3]:
with open('prompt.yaml', 'r') as file:
    prompts = yaml.safe_load(file)
prompt_code = prompts['prompt_code']

In [4]:
def extract_code_from_text(text):
    """
    从文本中提取 Python 代码块（代码在 ```python 和 ``` 之间）
    """
    # 使用正则表达式匹配 ```python 和 ``` 之间的内容
    pattern1 = r"```python\n(.*?)\n```"
    match1 = re.search(pattern1, text, re.DOTALL)
    if match1:
        code = match1.group(1)
        
        # 这里是为了加上可执行的代码
        pattern2 = r'def\s+(.*?):'
        match2 = re.search(pattern2, code)
        execute_code = match2.group(1)
        code_add = f"\n# Execute and return result\nresult = {execute_code}"
        code += code_add

        return code
    else:
        raise ValueError("未能在文本中找到 Python 代码块")

def get_code_response(subtask, prompt):
    # subtask = "Subtask: Enter the search term \"winter coat\" into the search bar."
    messages = [
        {"role": "system", "content": prompt},
        {"role": "user", "content": subtask}
    ]
    text = tokenizer_code.apply_chat_template(
        messages,
        tokenize=False
    )
    model_inputs = tokenizer_code([text], return_tensors="pt").to(model_code.device)

    generated_ids = model_code.generate(
        **model_inputs,
        max_new_tokens=512
    )
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    response = tokenizer_code.batch_decode(generated_ids, skip_special_tokens=True)[0]

    return response

In [5]:
def get_label_function(name, key_nodes, trajectory, code_output_dir):
    if not os.path.exists(code_output_dir):
        os.makedirs(code_output_dir)

    file_path = os.path.join(code_output_dir, f"{name}.txt")
    with open(file_path, "w", encoding="utf-8") as file:  # 先清空文件
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        file.write(f"{current_time} - Task Name: {name}\n\n")

    for i, key_action in enumerate(key_nodes):
        code_list, result_list, errors = [], [], []
        print(key_action, flush=True)
        for _ in range(5):
            code, result, error = "", "", None
            try:
                response = get_code_response(key_action, prompt_code)
                code = extract_code_from_text(response)
                params = {"trajectory": trajectory}
                exec(code, params)
                result = params.get('result', None)
            except Exception as e:
                result = None
                error = str(e)  # 记录错误信息
                print(f"Error occurred: {error}", flush=True)

            print("Result = ", result, flush=True)
            print()
            code_list.append(code)
            result_list.append(result)
            errors.append(error)
            # 遇到 True 的直接退出
            if result == True:
                break

        # 追加写入当前 key_action 的结果
        with open(file_path, "a", encoding="utf-8") as file:
            file.write(f"Key Action: {key_action}\n\n")
            for i, (code, result, error) in enumerate(zip(code_list, result_list, errors), 1):
                file.write(f"Code Block {i}:\n")
                file.write(code + "\n")
                file.write("Result = " + str(result)+ "\n")
                if error is not None:
                    file.write(f"Error occurred: {error}" + "\n")
                file.write("="*20 + "\n\n")
            file.write("="*50 + "\n\n")  # 添加分隔线区分不同 key_action
        # break

In [6]:
def get_label_function_for_expert_demo():
    output_dir = "json_files/OS-Atlas/InnovAll/Iter3"
    # output_dir = "json_files/GUI-R1/InnovAll/Iter2"
    # success_tasks_path = f"{output_dir}/success_tasks_all.json"
    success_tasks_path = f"{output_dir}/success_tasks_new.json"
    code_output_dir = f"{output_dir}/code_lists"
    data_file = f"{output_dir}/tasks_and_key_nodes.json"

    with open(data_file, 'r') as f:
        tasks_and_key_nodes = json.load(f)
    with open(success_tasks_path, 'r', encoding='utf-8') as json_file:
        success_tasks = json.load(json_file)

    for name, details in tasks_and_key_nodes.items():
        # if name != "SystemWifiTurnOffVerify":
        #     continue
        file_path = success_tasks[name]
        trajectory = get_trajectory(file_path)

        print("===========")
        print(name)
        print("Objective:", trajectory['objective'])
        print("Template :", trajectory['task_template'])
        print(trajectory['map'])
        print("Action list:")
        for action in trajectory['action_description']:
            print(action)
        print("Key Actions:")
        key_nodes = details['key_nodes']
        for key_node in key_nodes:
            print(key_node)
        get_label_function(name, key_nodes, trajectory, code_output_dir)

In [None]:
# get_label_function_for_expert_demo()

SystemBluetoothTurnOn
Objective: Turn bluetooth on.
Template : Turn bluetooth on.
None
Action list:
Scroll up on the screen.
Click on a UI element 'Off' on the screen.
Set the task's status as 'complete'.
Key Actions:
Scroll up on the screen.
Click on a UI element 'Off' on the screen.
Set the task's status as 'complete'.
Scroll up on the screen.
Result =  True

Click on a UI element 'Off' on the screen.
Result =  True

Set the task's status as 'complete'.
Result =  True

SystemBrightnessMinVerify
Objective: Turn brightness to the min value.
Template : Turn brightness to the min value.
None
Action list:
Scroll down on the screen.
Click on a UI element 'Calendar' on the screen.
Set the task's status as 'infeasible'.
Key Actions:
Scroll down on the screen.
Click on a UI element 'Calendar' on the screen.
Set the task's status as 'infeasible'.
Scroll down on the screen.
Result =  True

Click on a UI element 'Calendar' on the screen.
Result =  True

Set the task's status as 'infeasible'.
Res

### 测试阶段

In [None]:
# filename = "MarkorCreateNoteAndSms"
# folder_path = "../runs/expert"
# file_path = os.path.join(folder_path, filename + "_0.pkl.gz")
# trajectory = get_trajectory(file_path)

# import importlib
# import Function_APIs
# # from help_functions import *
# importlib.reload(Function_APIs)
# from Function_APIs import *

# def verify_function(trajectory):
#     if not validate_type_action(trajectory, "Actions speak louder than words.", "Type a message…"):
#         return False
#     if not validate_keyboard_enter_action(trajectory):
#         return False
#     return True

# result = verify_function(trajectory)
# print(result)


objective:  Create a new note in Markor named fair_ant_XOdi.txt with the following text: Actions speak louder than words.. Share the entire content of the note with the phone number +13192815309 via SMS using Simple SMS Messenger
template :  Create a new note in Markor named {file_name} with the following text: {text}. Share the entire content of the note with the phone number {number} via SMS using Simple SMS Messenger
{'file_name': 'fair_ant_XOdi.txt', 'text': 'Actions speak louder than words.', 'number': '+13192815309'}
False
