In [None]:
import pandas as pd

In [None]:
import json
import os
import re
from pathlib import Path
from tqdm import tqdm


def normalize_click_action(action_str: str) -> str:
    """
    Normalize various click operations to a unified format
    
    Args:
        action_str: Original pyautogui click call string
        
    Returns:
        Normalized click call string
    """
    # Match rightClick, middleClick, doubleClick, tripleClick
    special_click_pattern = r'pyautogui\.(rightClick|middleClick|doubleClick|tripleClick)\(([^)]*)\)'
    match = re.search(special_click_pattern, action_str)
    
    if match:
        click_type = match.group(1)
        args = match.group(2).strip()
        
        # Parse parameters
        x, y = None, None
        if args:
            # Extract x, y coordinates
            coord_match = re.search(r'(\d+)\s*,\s*(\d+)', args)
            if coord_match:
                x, y = coord_match.group(1), coord_match.group(2)
        
        # Convert to standard click format
        if click_type == 'rightClick':
            button = 'right'
            clicks = 1
        elif click_type == 'middleClick':
            button = 'middle'
            clicks = 1
        elif click_type == 'doubleClick':
            button = 'left'
            clicks = 2
        elif click_type == 'tripleClick':
            button = 'left'
            clicks = 3
        else:
            button = 'left'
            clicks = 1
        
        # Build normalized click call
        if x and y:
            if clicks == 1:
                return f"pyautogui.click(x={x}, y={y}, button='{button}')"
            else:
                return f"pyautogui.click(x={x}, y={y}, clicks={clicks}, button='{button}')"
        else:
            if clicks == 1:
                return f"pyautogui.click(button='{button}')"
            else:
                return f"pyautogui.click(clicks={clicks}, button='{button}')"
    
    # If it's a normal click, check if normalization is needed
    normal_click_pattern = r'pyautogui\.click\(([^)]*)\)'
    match = re.search(normal_click_pattern, action_str)
    
    if match:
        args = match.group(1).strip()
        
        # If only two numeric parameters, convert to x=, y= format
        simple_coords = re.match(r'^(\d+)\s*,\s*(\d+)$', args)
        if simple_coords:
            x, y = simple_coords.group(1), simple_coords.group(2)
            return f"pyautogui.click(x={x}, y={y}, button='left')"
    
    return action_str

def extract_pyautogui_actions(action_code: str) -> list[str]:
    """
    Extract all pyautogui function calls from action code
    
    Args:
        action_code: Code string containing pyautogui calls
        
    Returns:
        List of pyautogui function calls
    """
    if not action_code:
        return []
    
    # Match all pyautogui function calls
    # Support multi-line and nested parentheses
    pattern = r'pyautogui\.\w+\([^)]*\)'
    matches = re.findall(pattern, action_code)
    
    # Normalize click operations
    normalized_actions = []
    for action in matches:
        # Skip non-pyautogui calls like time.sleep
        if action.startswith('pyautogui.'):
            # Normalize click-type operations
            if any(click_type in action for click_type in ['click', 'Click']):
                normalized_action = normalize_click_action(action)
                normalized_actions.append(normalized_action)
            else:
                normalized_actions.append(action)
    
    return normalized_actions

def transform_osworld_trajectories(base_dir: str, output_file: str):
    """
    Read all trajectory files from OSWorld dataset and convert to specified format
    
    Args:
        base_dir: Base directory path
        output_file: Output jsonl file path
    """
    base_path = Path(base_dir)
    model_name = base_path.name  # claude-3-7-sonnet-20250219
    # Read all_result.json
    all_result_path = base_path / "all_result.json"
    all_results = {}
    if all_result_path.exists():
        with open(all_result_path, 'r') as f:
            all_results = json.load(f)
    all_trajectories = []
    
    # Iterate through all task folders
    for task_folder in tqdm(list(base_path.iterdir()), desc="Processing tasks"):
        if not task_folder.is_dir():
            continue
            
        task_name = task_folder.name
        # Get result dictionary for this task
        task_results = all_results.get(task_name, {})
        # Iterate through all sample folders under each task
        for sample_folder in task_folder.iterdir():
            
            if not sample_folder.is_dir():
                continue
            sample_id = sample_folder.name
            traj_file = sample_folder / "traj.jsonl"
            if not traj_file.exists():
                continue
            
            # Read traj.jsonl file
            steps = []
            with open(traj_file, 'r') as f:
                for line in f:
                    steps.append(json.loads(line.strip()))
            
            if not steps:
                continue
            
            # Build trajectory data
            trajectory = []
            
            for i, step in enumerate(steps):
                step_idx = step.get("step_num", i + 1)
                
                # Determine observation image path
                if i == 0:
                    continue
                else:
                    # Observation for subsequent steps is the screenshot_file from the previous step
                    prev_screenshot = steps[i - 1].get("screenshot", "")
                    if prev_screenshot:
                        obs_file = sample_folder / prev_screenshot
                        if obs_file.exists():
                            observation = str(obs_file.absolute())
                        else:
                            observation = "empty"
                    else:
                        observation = "empty"
                
                # Extract action - extract pyautogui function calls from code
                action_code = step.get("action", "")
                actions = extract_pyautogui_actions(action_code)
                
                trajectory.append({
                    "step_idx": step_idx,
                    "observation": observation,
                    "action": actions
                })
            # Get success identifier for this sample
            success = task_results.get(sample_id, 0.0)
            # Build complete trajectory record
            traj_record = {
                "dataset": "worldagentarena",
                "model": model_name,
                "task": task_name,
                "data_dir": str(sample_folder.absolute()),
                "success": success,
                "trajectory": trajectory
            }
            
            all_trajectories.append(traj_record)
    
    # Save to jsonl file
    output_path = Path(output_file)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    
    with open(output_path, 'w') as f:
        for traj in all_trajectories:
            f.write(json.dumps(traj, ensure_ascii=False) + '\n')
    
    print(f"\nTotal trajectories processed: {len(all_trajectories)}")
    print(f"Successful trajectories: {sum(1 for t in all_trajectories if t['success'])}")
    print(f"Failed trajectories: {sum(1 for t in all_trajectories if not t['success'])}")
    print(f"Results saved to: {output_path.absolute()}")
    
    return all_trajectories



In [None]:
base_dir = "waa/qwen3_vl_235b_step_50"
output_file = base_dir + "_transformed_trajectories.jsonl"

trajectories = transform_osworld_trajectories(base_dir, output_file)


In [None]:

# View first trajectory example
if trajectories:
    print("\nFirst trajectory example:")
    print(json.dumps(trajectories[0], indent=2, ensure_ascii=False))

In [None]:
len(trajectories)