In [107]:
import glob
import gzip
import io
import os
import pickle
import time
from PIL import Image, ImageDraw, ImageFont
import json
from tqdm import tqdm
# font = ImageFont.load_default()



def _unzip_and_read_pickle(file_path: str):
  """Reads a gzipped pickle file using 'with open', unzips, and unpickles it.

  Args:
      file_path: The path to the gzipped pickle file.

  Returns:
      The original Python object that was pickled and gzipped.
  """
  with open(file_path, 'rb') as f:
    compressed = f.read()

  with gzip.open(io.BytesIO(compressed), 'rb') as f_in:
    return pickle.load(f_in)


def merge_image_horizon(image_list):

    total_width = 0
    max_height = 0
    for image in image_list:
        total_width += image.size[0]
        max_height = max(max_height, image.size[1])

    new_image = Image.new('RGB', (total_width, max_height))
    x, y = 0, 0
    for image in image_list:
        new_image.paste(image, (x, y))
        x += image.size[0]
    
    return new_image


def create_gif(images, output_path, duration=2000):
    # 保存为GIF
    images[0].save(
        output_path,
        save_all=True,
        append_images=images[1:],
        duration=duration,
        loop=0
    )


def text_with_wrap(text, max_width):
    output_text = ""
    cur_idx = 0
    while cur_idx < len(text):
        output_text += text[cur_idx: cur_idx+max_width]
        output_text += '\n'
        cur_idx += max_width
    return output_text


font_path = "/usr/share/fonts/truetype/ubuntu/Ubuntu-BI.ttf"  # Replace with the actual path to a .ttf file
font_size = 48
font = ImageFont.truetype(font_path, font_size)

# tasks = ["SystemBluetoothTurnOffVerify","SystemBrightnessMaxVerify","SystemBrightnessMinVerify"]
# run_path = "/home/xieck13/workspace/android_workspace/runs/m3a4_llava_ft/run"

# task_paths = [os.path.join(run_path, f"{task}_0.pkl.gz") for task in tasks]

# data = [_unzip_and_read_pickle(path) for path in task_paths]

In [2]:
!ls ../runs/train_m3a_gpt4o

run_20241015T174951  run_20241016T002521  run_20241016T020526
run_20241015T182858  run_20241016T005107  run_20241016T020930
run_20241015T202638  run_20241016T005311  run_20241016T091120
run_20241015T210454  run_20241016T010653  run_20241016T091552
run_20241015T220821  run_20241016T013057  run_20241016T100039
run_20241015T231844  run_20241016T020121  run_20241016T115806
run_20241015T233308  run_20241016T020339  run_20241016T132905


In [75]:
path_list = glob.glob("../runs/train_m3a_gpt4o/*/*.pkl.gz")
success_path_list = []

for path in path_list:
    ins = _unzip_and_read_pickle(path)[0]
    if ins['is_successful'] == 1.0:
        success_path_list.append(path)

In [109]:
def get_show_case(path):
    ins = _unzip_and_read_pickle(path)[0]
    goal = ins['goal']
    task_template = ins['task_template']
    episode_data = ins['episode_data']
    episode_length = ins['episode_length']
    train_data = episode_data['train_data']
    raw_screenshot = episode_data['raw_screenshot']
    action_output_json = episode_data['action_output_json']
    screenshot_list = []
    action_list = []
    for step in range(episode_length):
        action = action_output_json[step]
        if train_data[step] is not None:
            screenshot = Image.fromarray(train_data[step][1])
            x_max, x_min, y_max, y_min = train_data[step][3:]
            x, y = (x_max + x_min) // 2, (y_max + y_min) // 2
            # print(x, y)
            draw = ImageDraw.Draw(screenshot)

            # Define the radius of the circle
            radius = 10

            # Draw the circle on the image
            draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill="red", outline="red")

            
            action_type = action.action_type
            if action_type == "input_text":
                text = "input_text:\n" +  text_with_wrap(action.text, 30)
            else:
                text = action_type
        
            # Calculate text position, adjust line spacing if needed
            text_position = x + 20, y + 20
            draw.text(text_position, text, fill="red", font=font)


        else:
            
            screenshot = Image.fromarray(raw_screenshot[step])
            x, y = screenshot.size[0] // 4, screenshot.size[1] // 2
            draw = ImageDraw.Draw(screenshot)
            action_type = action.action_type
            if action_type == "open_app":
                text = "open_app:\n" + action.app_name
            elif action_type == "answer":
                text = "answer:\n" + action.text
            elif action_type == "status":
                text = "status:\n" + action.goal_status
            elif action_type == "scroll":
                text = "scroll:\n" + action.direction
            else:
                text = action_type
            text_position = (x, y)
            draw.text(text_position, text, fill="red", font=font)

        

        screenshot_list.append(screenshot)
        action_list.append(action)


    # save all screenshot as gif
    gif_dir = "../show_case/gpt4o/gif"
    os.makedirs(gif_dir, exist_ok=True)
    gif_path = os.path.join(gif_dir, f"{task_template}.gif")
    create_gif(screenshot_list, gif_path)

    # merge all screenshot
    show_case_image = merge_image_horizon(screenshot_list)
    png_dir = "../show_case/gpt4o/png"
    os.makedirs(png_dir, exist_ok=True)
    show_case_image.save(os.path.join(png_dir, f"{task_template}.png"))

    # save goal
    goal_dir = "../show_case/gpt4o/goal"
    os.makedirs(goal_dir, exist_ok=True)
    with open(os.path.join(goal_dir, f"{task_template}.txt"), "w") as f:
        f.write(goal)

    return show_case_image

for path in tqdm(success_path_list):
    try:
        get_show_case(path)
    except Exception as e:
        print(e)


 36%|███▌      | 15/42 [00:34<00:59,  2.21s/it]

'NoneType' object has no attribute 'action_type'


 48%|████▊     | 20/42 [00:45<00:47,  2.14s/it]

In [103]:
!rm -r ../show_case/gpt4o