The MIT License
Copyright (c) 2025 MEGA-GUI

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

In [1]:
import io
import os
import json
import time
from datetime import datetime

# For VLM
import re
import requests

import base64
from PIL import Image, ImageDraw, ImageFont
from mimetypes import guess_type


def local_image_to_data_url(image_path):
    # Guess the MIME type of the image based on the file extension
    mime_type, _ = guess_type(image_path)
    if mime_type is None:
        mime_type = 'application/octet-stream'  # Default MIME type if none is found

    # Read and encode the image file
    with open(image_path, "rb") as image_file:
        base64_encoded_data = base64.b64encode(image_file.read()).decode('utf-8')

    # Construct the data URL
    return f"data:{mime_type};base64,{base64_encoded_data}"

# Load Credentail information
credentials_path = '../../../../.credentials.json'
with open(credentials_path) as credentials_file:
    credentials = json.load(credentials_file)

VLLM_SERVER_IP = credentials['VLLM_SERVER_IP']
VLLM_SERVER_PORT = credentials['VLLM_SERVER_PORT']

# Load Dataset

In [2]:
import pandas as pd

### Load Benchmark Dataset
problem_path =  "/Data/OSWorld_G/os_world_G_including_byte.parquet"
df = pd.read_parquet(problem_path)
idx_list = list(df.index)

# For vLLM

In [3]:
HEADERS =  {
    'Authorization': 'TEST',
    'Content-Type': 'application/json'
}

PROXIES = {'http': None, 'https': None, 'no_proxy': VLLM_SERVER_IP}
MAX_RETRY = 4

# response parsing

In [4]:

def parse_response(response):
    try:
        pattern = r'(\d+)[,\s]+(\d+)'
        input_string = re.search(pattern, response)

        if input_string:
            x = int(input_string.group(1))
            y = int(input_string.group(2))
            return x, y
        else:
            print(f"The coordinates could not be found. ::: {response}")
            return -1, -1

    except Exception as e:
        print(f"Error during parsing response ::: {e}")
        pass

# Region-Of-Interest (ROI) Deduction

## UI-Tars-72B

In [5]:
def get_UI_TARS_72B_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0):
    #print("UI_TARS_72B [START]", end=" ")

    temp_image = Image.open(image_path)
    image_width, image_height = temp_image.size
    encoded_image = local_image_to_data_url(image_path)
    user_prompt_templete = [f"{problem_instruction}", f"Please complete the following [{problem_instruction}] diligently", f"Select Coordinate for the following [{problem_instruction}]", f"Click for the following [{problem_instruction}]"]

    for i in range(MAX_RETRY):
        try:
            prompt_idx = (i+try_idx)%len(user_prompt_templete)
            payload = {
                "model": "ui-tars-72b",
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "text",
                                "text": user_prompt_templete[prompt_idx]
                            },
                            {
                                "type": "image_url",
                                "image_url": {
                                    "url": f"{encoded_image}"
                                }
                            }
                        ]
                    }
                ],
                "temperature": temperature
            }
            time.sleep(1)
            response = requests.post(
                f"http://{VLLM_SERVER_IP}:{VLLM_SERVER_PORT}/v1/chat/completions",
                headers=HEADERS,
                data=json.dumps(payload),
                proxies=PROXIES,
                timeout=40
            )
            if response.status_code == 200:
                x, y = parse_response(response.json()['choices'][0]['message']['content'])
                if x == -1 and y == -1:
                    return {'answer_x': x, 'answer_y': y}
                x = int(image_width* (x/1000))
                y = int(image_height* (y/1000))
                return {'answer_x': x, 'answer_y': y}
        except TimeoutError as e:
            time.sleep(1)
            print(f"Timeout occurred : {e}")
        except Exception as e:            
            print(f"Failed to call LLM: {e}")

## Qwen 72B

In [6]:
INSTRUCTION_TEMPLATE = """You are a GUI agent. You are given an instruction and a screenshot. Your job is to output the most relevant point in the screenshot corresponding to the instruction.
## Instruction
Instruction: {instruction}

## Output Format                    
(x1, y1)                    
where x1, y1 are the coordinates of the target element.

## Note
- Ensure the chosen coordinate is a valid clickable area
- Output only the coordinate of one point in your response.
- The screen's resolution is image_width = {image_width} x  image_height = {image_height}.
"""

def get_QWEN_72B_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0):
    print("QWEN_72B [START]", end=" ")
    temp_image = Image.open(image_path)
    image_width, image_height = temp_image.size
    encoded_image = local_image_to_data_url(image_path)
    user_prompt_templete_list = [INSTRUCTION_TEMPLATE, f"Please complete the following [{INSTRUCTION_TEMPLATE}] diligently", f"Select Coordinate for the following [{INSTRUCTION_TEMPLATE}]", f"Click for the following [{INSTRUCTION_TEMPLATE}]"]

    for i in range(MAX_RETRY):
        try:
            prompt_idx = (i+try_idx)%len(user_prompt_templete_list)
            payload = {
                "model": "qwen2.5-vl-72b",
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "text",
                                "text": user_prompt_templete_list[prompt_idx].format(image_width=image_width,image_height=image_height,instruction=problem_instruction)
                            },
                            {
                                "type": "image_url",
                                "image_url": {
                                    "url": f"{encoded_image}"
                                }
                            }
                        ]
                    }
                ],
                "temperature": temperature
            }
            time.sleep(1)
            response = requests.post(
                f"http://{VLLM_SERVER_IP}:{VLLM_SERVER_PORT}/v1/chat/completions",
                headers=HEADERS,
                data=json.dumps(payload),
                proxies=PROXIES,
                timeout=100
            )
            if response.status_code == 200:
                x, y = parse_response(response.json()['choices'][0]['message']['content'])
                if x == -1 or y == -1:
                    print(response.json()['choices'][0]['message']['content'])
                    return {'answer_x': -1, 'answer_y': -1}
                return {'answer_x': x, 'answer_y': y}
        except TimeoutError as e:
            time.sleep(1)
            print(f"Timeout occurred : {e}")
        except Exception as e:            
            print(f"Failed to call LLM: {e}")

## CUA

In [None]:
#### FOR CUA
from openai import AzureOpenAI
import httpx
client_ssl= httpx.Client(verify=False)
base_url = f"{credentials['OPENAI_CUA_API_BASE']}/openai/v1/"
api_key = credentials['OPENAI_CUA_API_KEY']
api_type = credentials['OPENAI_API_TYPE']
deployment_name = credentials['OPENAI_API_ENGINE_CUA']

client = AzureOpenAI(
    api_key=api_key,
    api_version='preview',
    base_url=base_url,
    http_client=client_ssl
)

def get_CUA_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0):
    print("CUA [START]", end=" ")

    temp_image = Image.open(image_path)
    image_width, image_height = temp_image.size
    encoded_image = local_image_to_data_url(image_path)
    
    
    user_prompt =f"""You are a GUI agent. You are given an instruction and a screenshot. Your job is to output the most relevant point in the screenshot corresponding to the instruction.
## Instruction
Instruction: {problem_instruction}

## Output Format                    
(x1, y1)                    
where x1, y1 are the coordinates of the target element.

## Note
- Ensure the chosen coordinate is a valid clickable area
- Output only the coordinate of one point in your response."""
    messages = [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "input_text",
                            "text": user_prompt
                        },
                        {
                            "type": "input_image",
                            "image_url": f"{encoded_image}"
                        }
                    ]
                }
            ]

    for i in range(MAX_RETRY):
        try:
        
            response = client.responses.create(
                model=deployment_name,
                tools=[{
                    "type": "computer_use_preview",
                    "display_width": image_width,
                    "display_height": image_height,
                    "environment": 'linux'
                }],
                input=messages,
                truncation="auto",
                timeout=60,
            )
            
            result = json.loads(response.model_dump_json())
            response_string = "(-1, -1)"
            for outputs in result['output']:
                if 'action' in outputs :
                    response_string = f"({outputs['action']['x']},{outputs['action']['y']})"
                if 'content' in outputs:
                    response_string = outputs['content'][0]['text']
            
            x, y = parse_response(response_string)
            if x == -1 and y == -1:
                return {'answer_x': x, 'answer_y': y}
        
            return {'answer_x': x, 'answer_y': y}
        except TimeoutError as e:
            time.sleep(1)
            print(f"Timeout occurred : {e}")
        except Exception as e:            
            print(f"Failed to call LLM: {e}")

## GTA1 7B

In [None]:
SYS_PROMPT_GTA1 = f"""- given role
        You are an expert UI element locator..
        You should predict the x,y centor position of UI element for user request.
        Output format should be only the coordinate x,y single pair exactly:
        (x,y)
"""

def get_gta1_7b_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0):
    print("GTA1_7B [S]", end=" ")
    temp_image = Image.open(image_path)
    image_width, image_height = temp_image.size
    encoded_image = local_image_to_data_url(image_path)    

    user_prompt_templete = [f"{problem_instruction}", f"Please complete the following [{problem_instruction}] diligently", f"Select Coordinate for the following [{problem_instruction}]", f"Click for the following [{problem_instruction}]"]

    for i in range(MAX_RETRY):
        try:
            prompt_idx = (i+try_idx)%len(user_prompt_templete)
            chat_history = [
                {
                    "role": "system",
                    "content": [{"type": "text", "text": SYS_PROMPT_GTA1}],
                }
            ]
            user_chat = {
                "role": "user",
                "content": [{
                                "type": "text",
                                "text": f"find x,y position on {user_prompt_templete[prompt_idx]}",
                            },
                            {
                                "type": "image_url",
                                "image_url": {
                                    "url": f"{encoded_image}"
                                }
                            }]
            }
            chat_history.append(user_chat)
            payload = {
                "model": "gta1-7b",
                "messages": chat_history,
                "temperature": temperature
            }
            time.sleep(1)
            response = requests.post(
                f"http://{VLLM_SERVER_IP}:{VLLM_SERVER_PORT}/v1/chat/completions",
                headers=HEADERS,
                data=json.dumps(payload),
                proxies=PROXIES,
                timeout=100
            )
            if response.status_code == 200:
                x, y = parse_response(response.json()['choices'][0]['message']['content'])
                if x == -1 or y == -1:
                    print(response.json()['choices'][0]['message']['content'])
                    return {'answer_x': -1, 'answer_y': -1}
                return {'answer_x': x, 'answer_y': y}
        except TimeoutError as e:
            time.sleep(1)
            print(f"Timeout occurred : {e}")
        except Exception as e:            
            print(f"Failed to call LLM: {e}")

## GTA1 72B

In [None]:
SYS_PROMPT_GTA1 = f"""- given role
        You are an expert UI element locator..
        You should predict the x,y centor position of UI element for user request.
        Output format should be only the coordinate x,y single pair exactly:
        (x,y)
"""

def get_gta1_72b_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0):
    print("GTA1_72B [S]", end=" ")
    temp_image = Image.open(image_path)
    image_width, image_height = temp_image.size
    encoded_image = local_image_to_data_url(image_path)    

    user_prompt_templete = [f"{problem_instruction}", f"Please complete the following [{problem_instruction}] diligently", f"Select Coordinate for the following [{problem_instruction}]", f"Click for the following [{problem_instruction}]"]

    for i in range(MAX_RETRY):
        try:
            prompt_idx = (i+try_idx)%len(user_prompt_templete)
            chat_history = [
                {
                    "role": "system",
                    "content": [{"type": "text", "text": SYS_PROMPT_GTA1}],
                }
            ]
            user_chat = {
                "role": "user",
                "content": [{
                                "type": "text",
                                "text": f"find x,y position on {user_prompt_templete[prompt_idx]}",
                            },
                            {
                                "type": "image_url",
                                "image_url": {
                                    "url": f"{encoded_image}"
                                }
                            }]
            }
            chat_history.append(user_chat)
            payload = {
                "model": "gta1-72b",
                "messages": chat_history,
                "temperature": temperature
            }
            time.sleep(1)
            response = requests.post(
                f"http://{VLLM_SERVER_IP}:{VLLM_SERVER_PORT}/v1/chat/completions",
                headers=HEADERS,
                data=json.dumps(payload),
                proxies=PROXIES,
                timeout=100
            )
            if response.status_code == 200:
                x, y = parse_response(response.json()['choices'][0]['message']['content'])
                if x == -1 or y == -1:
                    print(response.json()['choices'][0]['message']['content'])
                    return {'answer_x': -1, 'answer_y': -1}
                return {'answer_x': x, 'answer_y': y}
        except TimeoutError as e:
            time.sleep(1)
            print(f"Timeout occurred : {e}")
        except Exception as e:            
            print(f"Failed to call LLM: {e}")

## Gemini 2.5 pro

In [None]:
def parse_response_for_gemini(response):
    try:        
        pattern = r'(\d+)[,\s]+(\d+)[,\s]+(\d+)[,\s]+(\d+)'
        input_string = re.search(pattern, response)

        if input_string:
            y1 = int(input_string.group(1))
            x1 = int(input_string.group(2))
            y2 = int(input_string.group(3))
            x2 = int(input_string.group(4))
            x = 0.5 * (x1 + x2)
            y = 0.5 * (y1 + y2)
            return x, y
        else:
            print(f"The coordinates could not be found. ::: {response}")
            return -1, -1

    except Exception as e:
        print(f"Error during parsing response ::: {e}")
        pass

def get_gemini_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0):
    print("G [S]", end=" ")
    temp_image = Image.open(image_path)
    image_width, image_height = temp_image.size
    encoded_image = local_image_to_data_url(image_path) 


    INSTRUCTION_TEMPLATE = """You are a GUI agent. You are given an instruction and a screenshot. Your job is to output the most relevant point in the screenshot corresponding to the instruction.
## Instruction
Instruction: {instruction}

## Output Format                    
The box_2d should be [ymin, xmin, ymax, xmax] normalized to 0-1000.

## Note
- Ensure the chosen coordinate is a valid clickable area
- Output only the coordinate of one point in your response.
- The screen's resolution is image_width = {image_width} x  image_height = {image_height}.
"""
    
    user_prompt_templete_list = [INSTRUCTION_TEMPLATE, f"Please complete the following [{INSTRUCTION_TEMPLATE}] diligently", f"Select Coordinate for the following [{INSTRUCTION_TEMPLATE}]", f"Click for the following [{INSTRUCTION_TEMPLATE}]"]

    for i in range(MAX_RETRY):
        try:
            prompt_idx = (i+try_idx)%len(user_prompt_templete_list)
            response = requests.post(
                url="https://openrouter.ai/api/v1/chat/completions",
                headers={
                    "Authorization": f"Bearer {credentials['OPENROUTER_API_KEY']}",
                    "Content-Type": "application/json",
                },
                data=json.dumps({
                    "model": "google/gemini-2.5-pro",
                    "messages": [
                    {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": user_prompt_templete_list[prompt_idx].format(image_width=image_width,image_height=image_height,instruction=problem_instruction)
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"{encoded_image}"
                            }
                        }
                    ]
                    }
                ],
                })
            )
            
            if response.status_code == 200:
                x, y = parse_response_for_gemini(response.json()['choices'][0]['message']['content'])
                
                if x == -1 and y == -1:
                    return {'answer_x': x, 'answer_y': y}
                x = int(image_width* (x/1000))
                y = int(image_height* (y/1000))
                return {'answer_x': x, 'answer_y': y}
        except TimeoutError as e:
            time.sleep(1)
            print(f"Timeout occurred : {e}")
        except Exception as e:
            print(f"Failed to call LLM")

## Utils

In [6]:
def get_coordinate(vlm_model_name, image_path, problem_instruction, try_idx=0, temperature=0.0):
    if vlm_model_name == "GEMINIPRO":
        return get_gemini_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0)
    elif vlm_model_name == "GTA1_7B":
        return get_gta1_7b_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0)
    elif vlm_model_name == "GTA1_72B":
        return get_gta1_72b_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0)
    elif vlm_model_name == "CUA":
        return get_CUA_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0)
    elif vlm_model_name == "UI_TARS_72B":
        return get_UI_TARS_72B_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0)
    elif vlm_model_name == "QWEN_72B":
        return get_QWEN_72B_coordinate(image_path, problem_instruction, try_idx=0, temperature=0.0)
    else:
        raise("Check VLM (Vision-Language Model) model name")
        

def propose_candidate_point(state):
    problem = state['problem']
    problem_instruction = problem['instruction']
    source_image = Image.open(io.BytesIO(problem['image_bytes'])).convert("RGBA")
    roi_cropped_image = source_image.copy()

    roi_x1, roi_y1, roi_x2, roi_y2 = state['roi_list'][-1]
    roi_cropped_image = source_image.crop((roi_x1, roi_y1, roi_x2, roi_y2))
    roi_cropped_image_path = state['roi_cropped_image_path']
    roi_cropped_image.save(roi_cropped_image_path)    

    for try_idx in range(4):        
        result = get_coordinate(state['vlm_model_name'], roi_cropped_image_path, problem_instruction, try_idx)
        if result is None or result['answer_x'] == -1:
            print(f"answer_x is -1 ")
            continue
        else:
            break


    find_coordinate_state = False
    if result is not None and 'answer_x' in result:
        global_x = result['answer_x'] + roi_x1
        global_y = result['answer_y'] + roi_y1        
        if roi_x1 <= global_x <= roi_x2 and roi_y1 <= global_y <= roi_y2:
            find_coordinate_state = True
            
    else:
        global_x = -1
        global_y = -1

    candidate_point = {'state': find_coordinate_state, 'x': global_x, 'y': global_y}    
    candidate_point_list = state['candidate_point_list']
    candidate_point_list.append(candidate_point)

    valid_candidate_point_list = state['valid_candidate_point_list']
    if find_coordinate_state:
        valid_candidate_point_list.append(candidate_point)
    
    return {'candidate_point_list': candidate_point_list, "valid_candidate_point_list": valid_candidate_point_list}



def refine_search_area(state):
    roi_list = state['roi_list']
    problem = state['problem']
    source_image = Image.open(io.BytesIO(problem['image_bytes']))
    image_width, image_height = source_image.size

    candidate_point = state['candidate_point_list'][-1]
    cpx = candidate_point['x']
    cpy = candidate_point['y']

    step_size = state['step_size']
    max_zoom_out_count = state['max_zoom_out_count']
    
    roi_x1, roi_y1, roi_x2, roi_y2 = roi_list[-1]

    roi_width = roi_x2 - roi_x1
    roi_height = roi_y2 - roi_y1
    
    # predict coordinate in ROI : Zoom-in 
    if candidate_point['state']:        
        if roi_width > roi_height:
            if abs(cpx - roi_x1) > abs(cpx - roi_x2):
                roi_x1 += step_size
            else:
                roi_x2 -= step_size
        
        else:
            if abs(cpy - roi_y1) > abs(cpy - roi_y2):
                roi_y1 += step_size
            else:
                roi_y2 -= step_size
    else:
        if max_zoom_out_count == 0:
            # Correction Zoom-in 
            if step_size < roi_width:
                roi_x1 += step_size//2
                roi_x2 -= step_size//2
            if step_size < roi_height:
                roi_y1 += step_size//2
                roi_y2 -= step_size//2
        else:
            # Zoom-Out
            print(f"\n ROI {roi_list[-1]} \n{candidate_point}")
            roi_x1 -= step_size//2
            roi_x2 += step_size//2
            roi_y1 -= step_size//2
            roi_y2 += step_size//2
            max_zoom_out_count -= 1

    roi_list.append((roi_x1, roi_y1, roi_x2, roi_y2))

    return {'roi_list': roi_list,'max_zoom_out_count':max_zoom_out_count}


def get_point_dist(candidate_point_a, candidate_point_b):
    a_x, a_y = candidate_point_a['x'], candidate_point_a['y']
    b_x, b_y = candidate_point_b['x'], candidate_point_b['y']

    return math.sqrt((a_x - b_x)**2 + (a_y - b_y)**2)

def check_terminate_condition(state):
    roi_list = state['roi_list']
    roi_x1, roi_y1, roi_x2, roi_y2 = roi_list[-1]
    roi_width = roi_x2 - roi_x1
    roi_height = roi_y2 - roi_y1
    max_length = max(roi_width, roi_height)
    step_size = int( state['step_ratio'] * max_length )

    target_roi_size = state['target_roi_size']
    if roi_width <= target_roi_size and roi_height <= target_roi_size:
        center_x = int(0.5*(roi_x2 + roi_x1))
        center_y = int(0.5*(roi_y2 + roi_y1))
        roi_list[-1] = (
            center_x - target_roi_size//2, center_y - target_roi_size//2, 
            center_x + target_roi_size//2, center_y + target_roi_size//2)

    return {'step_size': step_size, 'roi_list' : roi_list}

# Fine_Grained Grounding

## setting

In [7]:
import traceback
import numpy as np

def _load_credentials(file_path: str = credentials_path) -> dict:
    with open(file_path, "r", encoding="utf-8") as f:
        return json.load(f)
    
def _ensure_openai_credentials() -> None:
    """
    Load credentials from file if not in env vars,
    then configure openai.
    """
    required_vars = [
        "OPENAI_API_BASE",
        "OPENAI_API_KEY",
        "OPENAI_API_TYPE",
        "OPENAI_API_VERSION",
        "OPENAI_API_ENGINE_GPT4",  # or whichever engine names you have
        "OPENAI_API_ENGINE_GPT3"
    ]
    if not all(var in os.environ for var in required_vars):
        creds = _load_credentials()
        for k, v in creds.items():
            os.environ[k] = v
_ensure_openai_credentials()


## LVM 

In [8]:
import base64
from langchain.schema import HumanMessage
from langchain_openai import AzureChatOpenAI

llm = AzureChatOpenAI(
    azure_endpoint=os.environ["OPENAI_API_BASE"],
    api_key=os.environ["OPENAI_API_KEY"],
    api_version="2024-08-01-preview",
    max_retries=2,
    temperature=0.1,
    model="gpt-4o-1120"
)

## Functions for Fine-Grained Grounding

In [9]:
import base64
import json
import os
import io
from PIL import Image
import requests
import re


def parse_response(response):
    try:
        pattern = r'\((\d+),\s*(\d+)\)'
        input_string = response  

        if input_string:
            match = re.search(pattern, input_string)
            if match:
                x = int(match.group(1))
                y = int(match.group(2))
                return x, y
            else:
                print("No coordinates found")
                raise Exception("No coordinates found")
        else:
            print("Empty response")
            raise Exception("Empty response")

    except Exception as e:
        print(f"‚ùó Error: {e}")
        return None
        
def generate_prompt(step_action):
    return f'Output only the precise coordinates (x, y). Find the exact position to click for the action: "{step_action}".'
        

def request_coordinates(instruction, encoded_image):
    prompt = generate_prompt(instruction)
    
    data = {
        "model": "ui-tars-72b",
        "messages": [
            {
                "role": "system",
                "content": 
                f"""
                You are given a UI screenshot and a precise UI element description.
                ### üîç UI Element Description:
                {instruction}
                ### üìå Instructions:
                - ONLY one coordinate pair must be returned.
                - The coordinate must be the **center** of the visual UI element (not label text unless it is the clickable part).
                - The output format MUST be strictly:
                {{ "x": <integer>, "y": <integer> }}
                ### ‚ö†Ô∏è Format Rules:
                - Return ONLY valid JSON.
                - No extra text, comments, or markdown.
                - If the element is not found, return: {{ "x": null, "y": null }}
                Now return the precise coordinates.
                """.strip()
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": prompt
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{encoded_image}"
                        }
                    }
                ]
            }
        ],
        "temperature": 0.2,
        "max_tokens": 1024
    }

    try:
        response = requests.post(
            url=f"http://{VLLM_SERVER_IP}:{VLLM_SERVER_PORT}/v1/chat/completions",
            headers=HEADERS,
            json=data,
            proxies=PROXIES,
            timeout=30
        )

        if response.status_code == 200:
            response_text = response.json()['choices'][0]['message']['content'].strip()
            coordinates = parse_response(response_text)
            return {'answer_x': coordinates[0], 'answer_y': coordinates[1]} if coordinates else None
        else:
            print(response.text)
            return None
    except Exception as e:
        print(f"‚ö†Ô∏è exception: {e}")
        return None



## LVM for Fine-Grained grounding

In [10]:
from PIL import Image, ImageDraw

def image_bytes_to_data_url(image_bytes: bytes, mime_type: str = 'image/png') -> str:
    """
    encodes the image byte and converts it to a data URL.
    """
    base64_encoded_data = base64.b64encode(image_bytes).decode('utf-8')
    return f"data:{mime_type};base64,{base64_encoded_data}"

def find_application(image_bytes: bytes, raw_instruction: str):    
    encoded_image = image_bytes_to_data_url(image_bytes)

    messages = [
        HumanMessage(content=[
            {
                "type": "text",
                "text": (
                    f"""
                    You are given a screenshot of a user interface along with a user instruction.
                    Your task is to identify which **application or software** this interface belongs to, based on visual clues and the instruction content.
                    Please follow these rules:
                    1. Look at visible UI elements (e.g., toolbars, menus, icons, window titles) in the screenshot to identify the application.
                    2. Use the instruction context to help disambiguate the purpose of the UI.
                    3. Be specific. For example, say "Google Chrome", "Microsoft Word", "Windows Settings", "Adobe Photoshop", etc.
                    4. If you cannot confidently determine the exact application, respond with "Unknown".
                    Output Format:
                    {{
                      "application": "<inferred application name>"
                    }}
                    Instruction:
                    {raw_instruction}
                    """
     
                )
            },
            {
                "type": "image_url",
                "image_url": {"url": encoded_image}
            }
        ])
    ]

    try:
        response = llm.invoke(messages)
        return response.content

    except Exception as e:
        print("Error while find instruction:", str(e))
        traceback.print_exc()
        return None
        

def generate_click_instruction(image_bytes: bytes, raw_instruction: str, app_name: str):
    #print("[GENERATE CLICK INSTRUCTION in fun]", "-"*10)   

    encoded_image = image_bytes_to_data_url(image_bytes)

    # LLM message
    messages = [
        HumanMessage(content=[
            {
                "type": "text",
                "text": (
                    f"""
                    You are given a screenshot of a user interface and an instruction.
                    This screenshot comes from the following application: **{app_name}**
                    Your task is to identify the **most relevant and precisely located UI element** (e.g. button, dropdown, input box, icon) that matches the instruction **for performing a click or action**. Focus only on **visible** UI elements.
                    To ensure accurate position prediction, follow these detailed rules when generating the description:
                    1. Be specific and unambiguous. Avoid vague references like "icon", "window", or just "button". Clearly identify **one unique** UI element.
                    2. Describe the UI element in detail:
                       - The **element type** (e.g. button, textfield, checkbox, toggle, dropdown).
                       - Its **visual characteristics**: shape (e.g. rectangle, circle), color, border style, text label (if any), icons, and other distinctive traits.
                       - Its **spatial relationships**: position relative to other visible elements (e.g., "to the left of the 'Settings' gear icon", "below the search bar").
                       - Its **general screen location**: upper/lower/left/right/center part of the screen.
                       - The target UI element is guaranteed to be present in the screenshot, usually in the form of single icon.\n
                    3. Clearly distinguish between **similar-looking elements** by referring to nearby labels, icons, or arrangement.
                    4. The given instruction does not implicate the name represented on element directly.
                    5. Identify a UI element that best matches in order to follow my instruction, specifically focusing to the user action or clickable gui element of the instruction.
                    6. Do not speculate or describe elements not visible in the screenshot.
                    7. Translate and recognize the Chinese characters that appear on the screen into English.
                    8. Only one UI element should be described in the output.
                    Return the result in the following JSON format:
                    {{
                      "clarified_instruction": "<rewritten instruction in one sentence, with ambiguity removed>",
                      "element_description": "<clear, specific description of the UI element including visual and positional info>",
                      "target_action": "click"                      
                    }}
                    Instruction:
                    {raw_instruction}
                    """
               )
            },
            {
                "type": "image_url",
                "image_url": {"url": encoded_image}
            }
        ])
    ]

    try:
        response = llm.invoke(messages)
        return response.content

    except Exception as e:
        print("Error while generating click instruction:", str(e))
        traceback.print_exc()
        return None

## Utils for Fine-Grained grounding

In [None]:
from collections import defaultdict
from io import BytesIO

instruction_counter = defaultdict(int)

# sanitize_filename cleans and normalizes a string so it can be safely used as a filename.
def sanitize_filename(text):
    text = text.strip().lower()
    text = re.sub(r'[^\w\s-]', '', text)
    text = re.sub(r'\s+', '_', text)
    return text


def get_area_size(target_roi):
    roi_x1, roi_y1, roi_x2, roi_y2 = target_roi
    roi_width = roi_x2 - roi_x1
    roi_height = roi_y2 - roi_y1
    return max(roi_width, roi_height)

def final_roi_extraction(state):
    candidate_point = state['candidate_point_list']
    x, y = candidate_point[-1]['x'], candidate_point[-1]['y']
    
    index = -1
    while True:
        if candidate_point[index]['state']:
            x, y = candidate_point[index]['x'], candidate_point[index]['y']
            break
        index -= 1

    target_roi_size = state['target_roi_size']
    roi_x1 = x - (target_roi_size//2)
    roi_x2 = x + (target_roi_size//2)    
    roi_y1 = y - (target_roi_size//2)
    roi_y2 = y + (target_roi_size//2)

    roi_self_consistent = [roi_x1, roi_y1, roi_x2, roi_y2]

    return {'final_roi': roi_self_consistent}
        
def fine_grained_grounding(state):
    problem = state['problem']
    instruction = problem['instruction']
    roi_box = state['final_roi']

    org_image = Image.open(BytesIO(problem['image_bytes']))
    x1_roi, y1_roi, x2_roi, y2_roi = roi_box
    roi_image = org_image.crop((x1_roi, y1_roi, x2_roi, y2_roi))
    roi_wid, roi_hei = roi_image.size

    long_side = max(roi_wid, roi_hei)
    scale_v = max(3000 / long_side, 1.0)
    up_width = int(roi_wid * scale_v)
    up_height = int(roi_hei * scale_v)
    upscale_image = roi_image.resize((up_width, up_height), Image.BICUBIC)

    # ROI ‚Üí base64 (org scaleup)
    buf_raw = BytesIO()
    roi_image.save(buf_raw, format="PNG")
    encoded_raw = base64.b64encode(buf_raw.getvalue()).decode("utf-8")

    # Scale Agent
    buf_up = BytesIO()
    upscale_image.save(buf_up, format="PNG")
    encoded_up = base64.b64encode(buf_up.getvalue()).decode("utf-8")

    case_hits = {}
    app_name = "unknown"

    try:
        app_info = find_application(buf_raw.getvalue(), instruction)
        if isinstance(app_info, str):
            try:
                app_info = json.loads(app_info)
            except json.JSONDecodeError:
                print(f"‚ö†Ô∏è Invalid JSON in app_info: {app_info}")
                app_info = {}
        app_name = app_info.get("application", "unknown")

        # Rewrite Agent
        new_instruction = generate_click_instruction(buf_raw.getvalue(), instruction, app_name)
        # Grounding Agent
        coords4 = request_coordinates(new_instruction, encoded_up)
        x4n = coords4.get('answer_x', -1)
        y4n = coords4.get('answer_y', -1)
        x4 = int((x4n * up_width / 1000) / scale_v)
        y4 = int((y4n * up_height / 1000) / scale_v)
        x_org4 = x4 + x1_roi
        y_org4 = y4 + y1_roi
        
        answer = [x_org4, y_org4]

        return {'final_target_prediction': answer} 

    except Exception as e:
        print(f"‚ö†Ô∏è Case 4 error: {e}")

    return {'final_target_prediction': [-1,-1]}
        
    

# Refusal

In [12]:
from openai import OpenAI

client = OpenAI(
  base_url="https://openrouter.ai/api/v1",
  api_key=credentials['OPENROUTER_API_KEY'],
)

def call_prompt(messages, params):
    completion = client.chat.completions.create(
        model="google/gemini-2.5-pro",
        messages=messages,
        max_tokens= 4096,
        temperature=0.1            
    )

    # print(f"call_prompt : {completion}")

    return completion.choices[0].message.content

def extract_content_from_response(content):
    split_result = content.split("```json")
    try:
        json_string = split_result[1].replace("```json", "").replace("```", "")
    except IndexError as E1:
        json_string = {
            "answer": "no"
        }  
    
    return json.loads(json_string)

def refusal_choice(state):
    problem = state['problem']
    instruction = problem['instruction']

    image = Image.open(BytesIO(problem['image_bytes']))

    byte_io = io.BytesIO()
    image.save(byte_io, format="png")
    encoded_image = base64.b64encode(byte_io.getvalue()).decode("utf-8")

    SYS_REASONING_PROMPT = """
            - given role
                You are an judge expert to predict whether the next gui action on the given image can excute or not.
                So You must analyze whether you have an area to execute user instruction from the given screen.
            - given rules to process
                You must state the reason for your judgment in the reasoning field.
                You must answer 'yes' or 'no' considering reasoning field.
            - given rules to judge
                If there is even tiny information indicating that user instruction can be executed from the given screen, you must answer 'yes'.
                If logically user instruction can't be executed from the given screen, you must answer 'no'.
            - given rules to output format
                You should return only json format with embrace ```json``` without any comments.
                ## format example
            ```json
            {   
                "reasoning": "#your judgement",
                "answer": "#your answer from reasoning"
            }    ```
    """

    params = {'max_tokens': 4096, 'top_p': 0.9, 'temperature': 0.0}    
    chat_history = [
            {
                "role": "system",
                "content": [{"type": "text", "text": SYS_REASONING_PROMPT}],
            },
            {
                "role": "user",  
                "content": [
                      {
                            "type": "text",
                            "text": f"{instruction}"                        
                      },
                      {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/png;base64,{encoded_image}"
                            },
                      }
                ]
            }
    ]

    # gemini pro llm Call 
    response = call_prompt(chat_history, params)
    contents = extract_content_from_response(response)

    return {'refusal': contents['answer']}

# Graph Definition

In [13]:
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END


class GraphState(TypedDict):
    """
    'GraphState` class inherits from `TypedDict`.

    problem field stores the problem.

    roi_list field manages a list of regions of interest (ROIs), storing from the original to the Jongno area.
    candidate_point_list field stores coordinate results from the VLM for each ROI in a list.
    valid_candidate_point_list field stores a list of valid results from the VLM's coordinate inference.

    step_size field saves the size to reduce when creating a new ROI.
    target_roi_size field determines when to stop if the ROI size is less than or equal to the target size.
    max_zoom_out_count field determines the maximum size for zooming out.

    step_ratio field saves the ratio at which to reduce the ROI.

    vlm_model_name field stores the VLM model name.
    roi_cropped_image_path field is the temporary file name used during the process of reducing the ROI.

    final_roi Stage 1's roi result
    final_target_prediction Stage 2's Final target prediction
    """
    problem: dict

    roi_list: list    
    candidate_point_list: list
    valid_candidate_point_list: list

    step_size: int
    target_roi_size: int
    max_zoom_out_count: int

    step_ratio: float

    vlm_model_name: str
    roi_cropped_image_path: str

    final_roi: list
    final_target_prediction: list

    refusal: str



graph_builder = StateGraph(GraphState)

### Stage 1
graph_builder.add_node("propose_candidate_point", propose_candidate_point)
graph_builder.add_node("refine_search_area", refine_search_area)
graph_builder.add_node("check_terminate_condition", check_terminate_condition)

graph_builder.add_node("final_roi_extraction", final_roi_extraction)

### Stage 2
graph_builder.add_node("fine_grained_grounding", fine_grained_grounding)

### Stage 0
graph_builder.add_node("refusal_choice", refusal_choice)


def get_search_converged(state):
    valid_candidate_point_list = state['valid_candidate_point_list']
    if len(valid_candidate_point_list) >= 3:
        end_condition = True
        recent_candidate_point = valid_candidate_point_list[-1]
        for valid_candidate_point in valid_candidate_point_list[-3:-1]:
            if get_point_dist(valid_candidate_point, recent_candidate_point) > 50:
                end_condition = False
        return end_condition

def router(state):
    target_roi = state['roi_list'][-1]

    # Under Min area size
    area_size = get_area_size(target_roi)
    target_roi_size = state['target_roi_size']
    if area_size <= target_roi_size :
        return 'condition_met'

    # check converge
    if get_search_converged(state):
        return 'condition_met'

    return "condition_not_met"

def router_refusal(state):
    refusal_result = state['refusal']
    if refusal_result == 'yes':
        return 'next'
    else:
        return 'end'
        
graph_builder.add_edge(START,  "refusal_choice")
graph_builder.add_conditional_edges("refusal_choice", router_refusal,
        {
            "next": "propose_candidate_point",
            "end":END
         }
)

graph_builder.add_edge("propose_candidate_point", "check_terminate_condition")
graph_builder.add_conditional_edges("check_terminate_condition", router,
        {
            "condition_met": "final_roi_extraction",
            "condition_not_met": "refine_search_area"
         }
)
graph_builder.add_edge("refine_search_area", "propose_candidate_point")

graph_builder.add_edge("final_roi_extraction", "fine_grained_grounding")

gui_grounding = graph_builder.compile()


print(gui_grounding.get_graph().draw_ascii())
print("=+=*-*"*10)
graph_mmd = 'graph ' + gui_grounding.get_graph().draw_mermaid().split('%%')[-1].strip().split('graph ')[-1]
print(graph_mmd)

              +-----------+                                        
              | __start__ |                                        
              +-----------+                                        
                    *                                              
                    *                                              
                    *                                              
            +----------------+                                     
            | refusal_choice |                                     
            +----------------+                                     
            ...            ...                                     
          ..                  ..                                   
        ..                      ..                                 
+---------+           +-------------------------+                  
| __end__ |           | propose_candidate_point |                  
+---------+           +-------------------------

# Inference

In [None]:
import math


# estimate accuracy
pass_check_result = dict()
# List of ROI(region of interest)
roi_list_result = dict()
# List of coordinate information for each ROI(region of interest)
candidate_point_list_result = dict()

# List of final coordinate
final_coordinate_result = dict()

#VLM_MODEL_NAME = "GEMINIPRO"
VLM_MODEL_NAME = "UI_TARS_72B"

step_ratio = 0.1 
target_roi_size = 1000
max_zoom_out_count = 5
Image.MAX_IMAGE_PIXELS = None
prefix = f'OSWORLD_full_{VLM_MODEL_NAME}_{step_ratio}_{target_roi_size}_mzoc_{max_zoom_out_count}'
roi_cropped_image_path = f'{prefix}_roi_cropped_image.png'

full_count = len(idx_list)
start_time = datetime.now()
for row_count, idx in enumerate(idx_list):
    end_time = datetime.now()
    elapsed_time = end_time - start_time
    #print(f"\nidx : {idx} ::: row_id : {row_count} :: progress ({100*row_count/full_count}%) :: elasped_time: {elapsed_time}",  end="\t")
    
    # Load each Problem
    problem = df.loc[idx]
    source_image = Image.open(io.BytesIO(problem['image_bytes']))
    width, height = source_image.size

    step_size = int( step_ratio * max(width, height) )
    result = gui_grounding.invoke({
        "vlm_model_name" : VLM_MODEL_NAME,
        "problem": problem, 
        "candidate_point_list": list(), 
        "valid_candidate_point_list": list(), 
        "roi_list": [(0, 0, width, height)], 
        "step_size": step_size,
        "step_ratio": step_ratio,
        "max_zoom_out_count": max_zoom_out_count,
        "target_roi_size": target_roi_size,
        "roi_cropped_image_path": roi_cropped_image_path,
        "final_target_prediction": [-1, -1]},        
        {"recursion_limit": 100000})

    gt_box = problem['bbox']
    gt_x1 = gt_box[0] * width
    gt_y1 = gt_box[1] * height
    gt_x2 = gt_box[2] * width
    gt_y2 = gt_box[3] * height

    # for negative sampling
    bbox = [gt_x1, gt_y1, gt_x2, gt_y2]

    roi_list = result['roi_list']

    find_zoom_area_check_list = list()
    pass_check_list = list()

    candidate_point_list = result['candidate_point_list']
    for c_p_idx, candidate_point in enumerate(candidate_point_list):        
        cpx, cpy = candidate_point['x'], candidate_point['y']

        if candidate_point['state'] == False:
            continue

        if gt_x1 <= cpx <= gt_x2 and gt_y1 <= cpy <= gt_y2:
            pass_check_list.append(1)
        else:
            pass_check_list.append(0)
        

    #print(f"\n{pass_check_list}", end="")


    key_name = f"{prefix}_{idx}"

    #print(result['refusal'])
    roi_list_result[key_name] = roi_list
    candidate_point_list_result[key_name] = candidate_point_list
    final_coordinate_result[key_name] = result['final_target_prediction']
    print(f"idx : {idx}    final coordinate: {result['final_target_prediction']}")

    #if idx > 1:
        #break



# Evaluation

In [15]:
hit_cnt = 0 ## the number of correct answer
fine_grained_grounding_result = dict() ## answer
ROI_result = dict() ##ROI result
ROI_hit_cnt = 0 ## the number of correct ROI

def calculate_overlap_ratio(box_a, box_b):
    # Box A Coordinate
    x1_a, y1_a, x2_a, y2_a = box_a
    # Box B Coordinate
    x1_b, y1_b, x2_b, y2_b = box_b

    # overlap
    x_overlap = max(0, min(x2_a, x2_b) - max(x1_a, x1_b))
    y_overlap = max(0, min(y2_a, y2_b) - max(y1_a, y1_b))

    # overlapped area
    overlap_area = x_overlap * y_overlap

    if overlap_area == 0:
        return 0
    return overlap_area/((x2_a - x1_a) * (y2_a-y1_a))

for idx in idx_list:
    #if idx > 2:
        #break

    key_name = f"{prefix}_{idx}"
    problem = df.loc[idx]

    gt_box = problem['bbox']
    gt_x1 = gt_box[0] * width
    gt_y1 = gt_box[1] * height
    gt_x2 = gt_box[2] * width
    gt_y2 = gt_box[3] * height
    gt_bbox = [gt_x1, gt_y1, gt_x2, gt_y2]

    ### answer evaluation
    if gt_x1 <= final_coordinate_result[key_name][0] <= gt_x2 and gt_y1 <= final_coordinate_result[key_name][1] <= gt_y2:
        hit_cnt += 1
        fine_grained_grounding_result[idx] = True
    else:
        fine_grained_grounding_result[idx] = False    

    ### ROI evaluation
    roi_x1, roi_y1, roi_x2, roi_y2 = roi_list_result[key_name][-1]
    roi_region = [roi_x1, roi_y1, roi_x2, roi_y2]
    if calculate_overlap_ratio(gt_bbox, roi_region) >= 0.8:
        ROI_hit_cnt += 1
        ROI_result[idx] = True
    else:
        ROI_result[idx] = False 
    
    
print(f"anser: {fine_grained_grounding_result}") ### True or False for each problem
print(f"Number of correct answer: {hit_cnt}")  ### Number of correct answer
print()
print(f"ROI: {ROI_result}") ### True or False for each problem
print(f"Number of correct ROI: {ROI_hit_cnt}")  ### Number of correct answer


anser: {0: True, 1: False, 2: True}
Number of correct answer: 2

ROI: {0: True, 1: True, 2: True}
Number of correct ROI: 3


### Save ROI_LIST & CANDIDATE_POINT_LIST

In [None]:
import json

# Stage 1 result's
with open(f'roi_list_result_{prefix}.json', 'w', encoding='utf-8') as file:
    json.dump(roi_list_result, file)
with open(f'candidate_point_list_result_{prefix}.json', 'w', encoding='utf-8') as file:
    json.dump(candidate_point_list_result, file)

# Stage 2 result's
with open(f'fine_grained_grounding_result_{prefix}.json', 'w', encoding='utf-8') as file:
    json.dump(fine_grained_grounding_result, file)