In [1]:
import json

In [5]:
data = json.load(open('../data-collection/2024-09-14-16:43:01.json'))

In [6]:
data.keys()

dict_keys(['goal', 'history', 'is_complete'])

In [7]:
print(data['goal'])

Who are the artists behind the top 5 streaming songs globally in 2023 and what are their birthplaces?


In [8]:
len(data['history'])

38

In [9]:
data['history'][3]

[{'url': 'https://www.google.com/',
  'scroll_position': {'scrollTop': 0,
   'windowHeight': 720,
   'documentHeight': 720,
   'remainingPixels': 0},
  'raw_axtree_txt': 'RootWebArea \'Google\', focused\n\t[22] navigation \'\'\n\t\t[24] link \'About\', clickable\n\t\t[25] link \'Store\', clickable\n\t\t[33] link \'Gmail\', clickable\n\t\t[35] link \'Search for Images\', clickable\n\t\t[40] button \'Google apps\', clickable, expanded=False\n\t\t\t[41] image \'\'\n\t\t[42] link \'Sign in\', clickable\n\t\t[a] IframePresentational \'\'\n\t[51] image \'Google\'\n\t[81] search \'\'\n\t\t[91] image \'\'\n\t\t[95] combobox \'Search\' value=\'artists behind the top 5 streaming songs globally in 2023\', focused, autocomplete=\'both\', hasPopup=\'listbox\', expanded=True, owns=\'Alh6id\', controls=\'Alh6id\'\n\t\t[101] button \'Clear\', clickable\n\t\t\t[103] image \'\'\n\t\t[106] button \'Search by voice\', clickable\n\t\t\t[107] image \'\'\n\t\t[109] button \'Search by image\', clickable\n\t\t

In [10]:
def get_history_prompt(history):
    history_steps = []
    for i, (obs, state, instruction, action) in enumerate(history):
        step = f"""\
## Step {i + 1}:\

### State:
{state}\

### Instruction:
{instruction}\

### Action:
{action}
"""
        history_steps.append(step)

    history_prompt = '\n\n'.join(history_steps)
    if len(history_prompt) > 0:
        return history_prompt
    else:
        return 'Beginning of task'

In [11]:
def get_future_actons(future):
    future_steps = []
    for i, (obs, state, instruction, action) in enumerate(future):
        future_steps.append(action)
    future_steps = ' -> '.join(future_steps)
    if len(future_steps) > 0:
        return future_steps
    else:
        return 'No further actions'

In [12]:
step_id = len(data['history']) - 1
history = data['history'][:step_id]
present = data['history'][step_id]
future = data['history'][step_id + 1 :]

history_prompt = get_history_prompt(history)
future_prompt = get_future_actons(future)

In [14]:
data['goal']

'Who are the artists behind the top 5 streaming songs globally in 2023 and what are their birthplaces?'

In [13]:
print(history_prompt)

## Step 1:
### State:
I am currently on about:blank. There is nothing in the webpage. I am at the beginning of the task.
### Instruction:
Navigate to the Google home page
### Action:
goto('https://www.google.com')


## Step 2:
### State:
I have moved to Google's frontpage. There is a search box available for entry and a button for "Google Search". The search box is blank, meaning it's available for entering my query. Google search responds well to individual questions, so I should break down my question into several subquestions.
### Instruction:
Enter the query "artists behind the top 5 streaming songs globally in 2023" into the search box
### Action:
fill('95', 'artists behind the top 5 streaming songs globally in 2023')


## Step 3:
### State:
My search query has been entered into the combobox. A list of autocomplete options have popped up, such as "2023 so far", "2023 youtube", "2023 spotify", etc. But none should be necessary for now. To proceed with the search, I can either click

In [22]:
print(future_prompt)

No further actions


In [23]:
last_step = data['history'][step_id]
last_obs, last_state, last_instruction, last_action = last_step
if len(last_obs['error_prefix']) > 0 and not last_obs['clean_axtree_txt'].startswith(
    last_obs['error_prefix']
):
    last_axtree = f"{last_obs['error_prefix']}\n" + last_obs['clean_axtree_txt']
else:
    last_axtree = last_obs['clean_axtree_txt']

In [24]:
few_shot_prompt = f"""\
Example 1:

# Goal:
{data['goal']}

# History:
{history_prompt}

# Future Actions:
{future_prompt}

# Step {len(data['history'])}:
## Observation:
{last_axtree}
## Action:
{last_action}
## Answer:
<state>
{last_state}
</state>
<instruction>
{last_instruction}
</instruction>
"""

In [None]:
import collections
import re
from warnings import warn

import yaml


def yaml_parser(message):
    """Parse a yaml message for the retry function."""

    # saves gpt-3.5 from some yaml parsing errors
    message = re.sub(r':\s*\n(?=\S|\n)', ': ', message)

    try:
        value = yaml.safe_load(message)
        valid = True
        retry_message = ''
    except yaml.YAMLError as e:
        warn(str(e), stacklevel=2)
        value = {}
        valid = False
        retry_message = "Your response is not a valid yaml. Please try again and be careful to the format. Don't add any apology or comment, just the answer."
    return value, valid, retry_message


def _compress_chunks(text, identifier, skip_list, split_regex='\n\n+'):
    """Compress a string by replacing redundant chunks by identifiers. Chunks are defined by the split_regex."""
    text_list = re.split(split_regex, text)
    text_list = [chunk.strip() for chunk in text_list]
    counter = collections.Counter(text_list)
    def_dict = {}
    id = 0

    # Store items that occur more than once in a dictionary
    for item, count in counter.items():
        if count > 1 and item not in skip_list and len(item) > 10:
            def_dict[f'{identifier}-{id}'] = item
            id += 1

    # Replace redundant items with their identifiers in the text
    compressed_text = '\n'.join(text_list)
    for key, value in def_dict.items():
        compressed_text = compressed_text.replace(value, key)

    return def_dict, compressed_text


def compress_string(text):
    """Compress a string by replacing redundant paragraphs and lines with identifiers."""

    # Perform paragraph-level compression
    def_dict, compressed_text = _compress_chunks(
        text, identifier='§', skip_list=[], split_regex='\n\n+'
    )

    # Perform line-level compression, skipping any paragraph identifiers
    line_dict, compressed_text = _compress_chunks(
        compressed_text, '¶', list(def_dict.keys()), split_regex='\n+'
    )
    def_dict.update(line_dict)

    # Create a definitions section
    def_lines = ['<definitions>']
    for key, value in def_dict.items():
        def_lines.append(f'{key}:\n{value}')
    def_lines.append('</definitions>')
    definitions = '\n'.join(def_lines)

    return definitions + '\n' + compressed_text


def extract_html_tags(text, keys):
    """Extract the content within HTML tags for a list of keys.

    Parameters
    ----------
    text : str
        The input string containing the HTML tags.
    keys : list of str
        The HTML tags to extract the content from.

    Returns
    -------
    dict
        A dictionary mapping each key to a list of subset in `text` that match the key.

    Notes
    -----
    All text and keys will be converted to lowercase before matching.

    """
    content_dict = {}
    # text = text.lower()
    # keys = set([k.lower() for k in keys])
    for key in keys:
        pattern = f'<{key}>(.*?)</{key}>'
        matches = re.findall(pattern, text, re.DOTALL)
        if matches:
            content_dict[key] = [match.strip() for match in matches]
    return content_dict


class ParseError(Exception):
    pass


def parse_html_tags_raise(text, keys=(), optional_keys=(), merge_multiple=False):
    """A version of parse_html_tags that raises an exception if the parsing is not successful."""
    content_dict, valid, retry_message = parse_html_tags(
        text, keys, optional_keys, merge_multiple=merge_multiple
    )
    if not valid:
        raise ParseError(retry_message)
    return content_dict


def parse_html_tags(text, keys=(), optional_keys=(), merge_multiple=False):
    """Satisfy the parse api, extracts 1 match per key and validates that all keys are present

    Parameters
    ----------
    text : str
        The input string containing the HTML tags.
    keys : list of str
        The HTML tags to extract the content from.
    optional_keys : list of str
        The HTML tags to extract the content from, but are optional.

    Returns
    -------
    dict
        A dictionary mapping each key to subset of `text` that match the key.
    bool
        Whether the parsing was successful.
    str
        A message to be displayed to the agent if the parsing was not successful.
    """
    all_keys = tuple(keys) + tuple(optional_keys)
    content_dict = extract_html_tags(text, all_keys)
    retry_messages = []

    for key in all_keys:
        if key not in content_dict:
            if key not in optional_keys:
                retry_messages.append(f'Missing the key <{key}> in the answer.')
        else:
            val = content_dict[key]
            content_dict[key] = val[0]
            if len(val) > 1:
                if not merge_multiple:
                    retry_messages.append(
                        f'Found multiple instances of the key {key}. You should have only one of them.'
                    )
                else:
                    # merge the multiple instances
                    content_dict[key] = '\n'.join(val)

    valid = len(retry_messages) == 0
    retry_message = '\n'.join(retry_messages)
    return content_dict, valid, retry_message

In [None]:
def parser(text, output_keys):
    try:
        ans_dict = parse_html_tags_raise(
            text,
            keys=output_keys,
            merge_multiple=True,
        )
    except ParseError as e:
        return None, False, str(e)
    return ans_dict, True, ''

In [None]:
def retry_openai(
    system_prompt,
    input_prompt,
    parser,
    output_keys,
    n_retry=4,
    min_retry_wait_time=60,
    rate_limit_max_wait_time=60 * 30,
):
    messages = [
        {'role': 'system', 'content': system_prompt},
        {'role': 'user', 'content': input_prompt},
    ]
    tries = 0
    rate_limit_total_delay = 0
    while tries < n_retry and rate_limit_total_delay < rate_limit_max_wait_time:
        completion = client.chat.completions.create(
            model='gpt-4o-2024-08-06',
            messages=messages,
        )
        answer = completion.choices[0].message.content.strip()
        messages.append({'role': 'assistant', 'content': answer})

        value, valid, retry_message = parser(answer, output_keys)
        if valid:
            return value

        tries += 1
        messages.append({'role': 'user', 'content': retry_message})

    raise ValueError(f'Could not parse a valid value after {n_retry} retries.')

In [None]:
# ans_dict = retry_openai(system_prompt, input_prompt, parser, ['state', 'instruction'])

In [None]:
system_prompt = """\
The examples you see reflect the journey of an assistant taking actions in a browser environment to achieve a goal.
Below are the meaning of each field:

Observation: The browser's view of the webpage in one step of interaction. The assistant can only see the observation of the current step.
State: The assistant's description of the current webpage, information that the assistant must have memorized in order to take future actions, reflections on mistakes, reasoning about the progress, and two, three, four, or five options for what actions it can take next. Avoid mentioning just one specific action like fill, click, etc. Avoid descriptions of "I am about to" or "I am doing". The assistant will memorize any content in the state.
Instruction: A complete description of the action the assistant will select. The assistant will also memorize the instruction.
Action: An API call that the assistant will make to interact with the webpage.

Please fill in the state and instruction fields based on the observation and action provided.
Follow the example and surround the outputs with tags <state></state> and <instruction></instruction> respectively.

Action Space:

16 different types of actions are available:

noop(wait_ms: float = 1000)

send_msg_to_user(text: str)

scroll(delta_x: float, delta_y: float)

fill(bid: str, value: str)

select_option(bid: str, options: str | list[str])

click(bid: str, button: Literal['left', 'middle', 'right'] = 'left', modifiers: list[typing.Literal['Alt', 'Control', 'Meta', 'Shift']] = [])

dblclick(bid: str, button: Literal['left', 'middle', 'right'] = 'left', modifiers: list[typing.Literal['Alt', 'Control', 'Meta', 'Shift']] = [])

hover(bid: str)

press(bid: str, key_comb: str)

focus(bid: str)

clear(bid: str)

drag_and_drop(from_bid: str, to_bid: str)

upload_file(bid: str, file: str | list[str])

go_back()

go_forward()

goto(url: str)

Only a single action can be provided at once. Example:
    fill('a12', 'example with "quotes"')
"""

In [None]:
from openai import OpenAI

client = OpenAI()

In [25]:
raw_data = json.load(open('./data-collection/2024-09-14-19:34:04.json'))

In [26]:
raw_data['goal']

'What is the highest rated (according to IMDB) Daniel Craig movie that is less than 150 minutes and is available on Netflix (US)?'

In [27]:
from tqdm import tqdm

In [56]:
raw_history = []
for i, (obs, _, _, action) in tqdm(
    enumerate(raw_data['history']), total=len(raw_data['history'])
):
    raw_history_prompt = get_history_prompt(raw_history)

    future = raw_data['history'][i + 1 :]
    future_prompt = get_future_actons(future)

    if len(obs['error_prefix']) > 0 and not obs['clean_axtree_txt'].startswith(
        obs['error_prefix']
    ):
        axtree = f"{obs['error_prefix']}\n" + obs['clean_axtree_txt']
    else:
        axtree = obs['clean_axtree_txt']

    raw_example_prompt = f"""\
Example 2:

# Goal:
{raw_data['goal']}

# History:
{raw_history_prompt}

# Future Actions:
{future_prompt}

# Step {len(raw_history) + 1}:
## Observation:
{axtree}
## Action:
{action}
## Answer:
"""
    input_prompt = few_shot_prompt + '\n\n' + raw_example_prompt
    ans_dict = retry_openai(
        system_prompt, input_prompt, parser, ['state', 'instruction']
    )
    raw_history.append((obs, ans_dict['state'], ans_dict['instruction'], action))
    # break

100%|██████████| 44/44 [01:35<00:00,  2.17s/it]


In [57]:
ans_dict

{'state': 'I am on the search results page for "Sylvia 2003". I have been exploring Daniel Craig\'s movies to find the highest-rated one under 150 minutes, which is available on Netflix US. Despite my efforts, I couldn\'t find any movie that meets all these criteria. I found some high-rated Daniel Craig movies like Casino Royale, Skyfall, and Defiance, but none of them are available on Netflix US according to my searches.',
 'instruction': "Send a message to the user informing them that I couldn't locate a Daniel Craig movie that is less than 150 minutes, highly rated according to IMDB, and available on Netflix (US). Provide the titles and IMDB ratings of some of the movies I found that meet most of the criteria, except for Netflix availability."}

In [50]:
len(data['history'])

38

In [58]:
annotated_data = dict(raw_data)

In [59]:
annotated_data['history'] = raw_history

In [28]:
# annotated_data

In [60]:
json.dump(
    annotated_data, open('./data-collection/2024-09-14-19:34:04-annotated-d4.json', 'w')
)

In [35]:
# completion = client.chat.completions.create(
#   model="gpt-4o-2024-08-06",
#   messages=[
#     {"role": "system", "content": system_prompt},
#     {"role": "user", "content": input_prompt}
#   ]
# )

# message = completion.choices[0].message.content

In [36]:
# print(message)