In [None]:
import sys
sys.path.append('../benchmark') 

In [None]:
from pathlib import Path
import yaml
import benchmark
import utils
from openai_cache import Completion

In [None]:
def construct_summarization_prompt(objects, receptacles, placements):
    summarization_prompt_template = '''objects = ["dried figs", "protein bar", "cornmeal", "Macadamia nuts", "vinegar", "herbal tea", "peanut oil", "chocolate bar", "bread crumbs", "Folgers instant coffee"]
receptacles = ["top rack", "middle rack", "table", "shelf", "plastic box"]
pick_and_place("dried figs", "plastic box")
pick_and_place("protein bar", "shelf")
pick_and_place("cornmeal", "top rack")
pick_and_place("Macadamia nuts", "plastic box")
pick_and_place("vinegar", "middle rack")
pick_and_place("herbal tea", "table")
pick_and_place("peanut oil", "middle rack")
pick_and_place("chocolate bar", "shelf")
pick_and_place("bread crumbs", "top rack")
pick_and_place("Folgers instant coffee", "table")
# Summary: Put dry ingredients on the top rack, liquid ingredients in the middle rack, tea and coffee on the table, packaged snacks on the shelf, and dried fruits and nuts in the plastic box.

objects = ["yoga pants", "wool sweater", "black jeans", "Nike shorts"]
receptacles = ["hamper", "bed"]
pick_and_place("yoga pants", "hamper")
pick_and_place("wool sweater", "bed")
pick_and_place("black jeans", "bed")
pick_and_place("Nike shorts", "hamper")
# Summary: Put athletic clothes in the hamper and other clothes on the bed.

objects = ["Nike sweatpants", "sweater", "cargo shorts", "iPhone", "dictionary", "tablet", "Under Armour t-shirt", "physics homework"]
receptacles = ["backpack", "closet", "desk", "nightstand"]
pick_and_place("Nike sweatpants", "backpack")
pick_and_place("sweater", "closet")
pick_and_place("cargo shorts", "closet")
pick_and_place("iPhone", "nightstand")
pick_and_place("dictionary", "desk")
pick_and_place("tablet", "nightstand")
pick_and_place("Under Armour t-shirt", "backpack")
pick_and_place("physics homework", "desk")
# Summary: Put workout clothes in the backpack, other clothes in the closet, books and homeworks on the desk, and electronics on the nightstand.

objects = {objects_str}
receptacles = {receptacles_str}
{placements_str}
# Summary:'''
    objects_str = '[' + ', '.join(map(lambda x: f'"{x}"', objects)) + ']'
    receptacles_str = '[' + ', '.join(map(lambda x: f'"{x}"', receptacles)) + ']'
    placements_str = '\n'.join(map(lambda x: f'pick_and_place("{x[0]}", "{x[1]}")', placements))
    return summarization_prompt_template.format(objects_str=objects_str, receptacles_str=receptacles_str, placements_str=placements_str)

In [None]:
def construct_placement_prompt(summary, objects, receptacles):
    placement_prompt_template = '''# Summary: Put clothes in the laundry basket and toys in the storage box.
objects = ["socks", "toy car", "shirt", "Lego brick"]
receptacles = ["laundry basket", "storage box"]
pick_and_place("socks", "laundry basket")
pick_and_place("toy car", "storage box")
pick_and_place("shirt", "laundry basket")
pick_and_place("Lego brick", "storage box")

# Summary: {summary}
objects = {objects_str}
receptacles = {receptacles_str}
pick_and_place("{first_object}",'''
    objects_str = '[' + ', '.join(map(lambda x: f'"{x}"', objects)) + ']'
    receptacles_str = '[' + ', '.join(map(lambda x: f'"{x}"', receptacles)) + ']'
    return placement_prompt_template.format(summary=summary, objects_str=objects_str, receptacles_str=receptacles_str, first_object=objects[0])

In [None]:
def construct_category_prompt(summary):
    return '''# Summary: Put shirts on the bed, jackets and pants on the chair, and bags on the shelf.
objects = ["shirt", "jacket or pants", "bag"]

# Summary: Put pillows on the sofa, clothes on the chair, and shoes on the rack.
objects = ["pillow", "clothing", "shoe"]

# Summary: {summary}
objects = ["'''.format(summary=summary)

In [None]:
def construct_summarization_primitive_prompt(objects, primitives):
    summarization_primitive_prompt_template = '''objects = ["granola bar", "hat", "toy car", "Lego brick", "fruit snacks", "shirt"]
pick_and_toss("granola bar")
pick_and_place("hat")
pick_and_place("toy car")
pick_and_place("Lego brick")
pick_and_toss("fruit snacks")
pick_and_place("shirt")
# Summary: Pick and place clothes and toys, pick and toss snacks.

objects = {objects_str}
{primitives_str}
# Summary:'''
    objects_str = '[' + ', '.join(map(lambda x: f'"{x}"', objects)) + ']'
    primitives_str = '\n'.join(map(lambda x: f'pick_and_{x[1]}("{x[0]}")', primitives))
    return summarization_primitive_prompt_template.format(objects_str=objects_str, primitives_str=primitives_str)

In [None]:
def construct_primitive_prompt(summary, objects):
    primitive_prompt_template = '''# Summary: Pick and place clothes, pick and toss snacks.
objects = ["granola bar", "hat", "toy car", "Lego brick", "fruit snacks", "shirt"]
pick_and_toss("granola bar")
pick_and_place("hat")
pick_and_place("toy car")
pick_and_place("Lego brick")
pick_and_toss("fruit snacks")
pick_and_place("shirt")

# Summary: Pick and place granola bars, hats, toy cars, and Lego bricks, pick and toss fruit snacks and shirts.
objects = ["clothing", "snack"]
pick_and_place("clothing")
pick_and_toss("snack")

# Summary: {summary}
objects = {objects_str}'''
    objects_str = '[' + ', '.join(map(lambda x: f'"{x}"', objects)) + ']'
    return primitive_prompt_template.format(summary=summary, objects_str=objects_str)

In [None]:
def parse_categories(categories_completion):
    categories = categories_completion.split(',')
    categories[-1] = categories[-1].replace(']', '')
    categories = [c.strip().replace('"', '') for c in categories]
    return categories

In [None]:
def parse_primitives(primitive_completion):
    primitives = []
    for line in primitive_completion.strip().split('\n'):
        if len(line) == 0:
            print('Warning: Stopping since newline was encountered')
            break
        primitive, obj = line.split('(')
        primitive = primitive.strip().replace('pick_and_', '')
        obj = obj.strip().replace(')', '').replace('"', '')
        primitives.append([obj, primitive])
    return primitives

In [None]:
def load_scenario(scenario_name):
    with open(f'scenarios/{scenario_name}.yml', 'r', encoding='utf8') as f:
        scenario_dict = yaml.safe_load(f)
    scenario = benchmark.Scenario(
        room=None,
        receptacles=[r for r, info in scenario_dict['receptacles'].items() if 'primitive_names' in info],  # Ignore receptacles that are only used as a prop in this scenario (such as the sofa)
        seen_objects=scenario_dict['seen_objects'],
        seen_placements=scenario_dict['seen_placements'],
        unseen_objects=scenario_dict['unseen_objects'],
        unseen_placements=scenario_dict['unseen_placements'],
        annotator_notes=scenario_dict['annotator_notes'],
        tags=None)
    scenario.seen_primitives = scenario_dict['seen_primitives']

    # Sanity checks
    assert len(set(scenario.seen_objects)) == len(scenario.seen_objects)
    for obj, recep in scenario.seen_placements:
        assert obj in scenario.seen_objects
        assert recep in scenario.receptacles
    for obj, primitive in scenario.seen_primitives:
        assert obj in scenario.seen_objects
        assert primitive in ['place', 'toss']
    for obj1, (obj2, _) in zip(scenario.seen_objects, scenario.seen_placements):
        assert obj1 == obj2
    for obj1, (obj2, _) in zip(scenario.seen_objects, scenario.seen_primitives):
        assert obj1 == obj2
    assert len(set(scenario.unseen_objects)) == len(scenario.unseen_objects)
    for obj, recep in scenario.unseen_placements:
        assert obj in scenario.unseen_objects
        assert recep in scenario.receptacles
    for obj1, (obj2, _) in zip(scenario.unseen_objects, scenario.unseen_placements):
        assert obj1 == obj2

    return scenario

In [None]:
def process_scenario(scenario_name, verbose=False):
    completion = Completion()
    scenario = load_scenario(scenario_name)
    if verbose:
        print(f'Scenario: {scenario_name}\n')

    # Summarization
    summarization_prompt = construct_summarization_prompt(
        scenario.seen_objects, scenario.receptacles, scenario.seen_placements)
    summarization_completion = completion.create(summarization_prompt)['choices'][0]['text']
    if verbose:
        print(summarization_prompt, end='')
        utils.print_colored(summarization_completion, 'blue')
        print('\n' + 10 * '-' + '\n')

    # Seen object placement
    summary = benchmark.parse_summary(summarization_completion)
    placement_prompt = construct_placement_prompt(summary, scenario.seen_objects, scenario.receptacles)
    placement_completion = completion.create(placement_prompt)['choices'][0]['text']
    if verbose:
        print(placement_prompt, end='')
        utils.print_colored(placement_completion, 'blue')
        print('\n' + 10 * '-' + '\n')

    # Object categories
    category_prompt = construct_category_prompt(summary)
    category_completion = completion.create(category_prompt)['choices'][0]['text']
    categories = parse_categories(category_completion)
    if verbose:
        print(category_prompt, end='')
        utils.print_colored(category_completion, 'blue')
        print('\n' + 10 * '-' + '\n')

    # Object category placement
    category_placement_prompt = construct_placement_prompt(summary, categories, scenario.receptacles)
    category_placement_completion = completion.create(category_placement_prompt)['choices'][0]['text']
    category_placements = benchmark.parse_placements(category_placement_completion, categories)
    if verbose:
        print(category_placement_prompt, end='')
        utils.print_colored(category_placement_completion, 'blue')
        print('\n' + 10 * '-' + '\n')

    # Summarization for primitive selection
    summarization_primitive_prompt = construct_summarization_primitive_prompt(
        scenario.seen_objects, scenario.seen_primitives)
    summarization_primitive_completion = completion.create(summarization_primitive_prompt)['choices'][0]['text']
    if verbose:
        print(summarization_primitive_prompt, end='')
        utils.print_colored(summarization_primitive_completion, 'blue')
        print('\n' + 10 * '-' + '\n')

    # Seen object primitive selection
    summary_primitive = benchmark.parse_summary(summarization_primitive_completion)
    primitive_prompt = construct_primitive_prompt(summary_primitive, scenario.seen_objects)
    primitive_completion = completion.create(primitive_prompt)['choices'][0]['text']
    if verbose:
        print(primitive_prompt, end='')
        utils.print_colored(primitive_completion, 'blue')
        print('\n' + 10 * '-' + '\n')

    # Object category primitive selection
    category_primitive_prompt = construct_primitive_prompt(summary_primitive, categories)
    category_primitive_completion = completion.create(category_primitive_prompt)['choices'][0]['text']
    category_primitives = parse_primitives(category_primitive_completion)
    if verbose:
        print(category_primitive_prompt, end='')
        utils.print_colored(category_primitive_completion, 'blue')
        print('\n' + 10 * '-' + '\n')

    # Analysis
    predicted_placements = benchmark.parse_placements(placement_completion, scenario.seen_objects)
    placement_corrects, placement_accuracy = benchmark.check_placements(predicted_placements, scenario.seen_placements)
    predicted_primitives = parse_primitives(primitive_completion)
    primitive_corrects, primitive_accuracy = benchmark.check_placements(predicted_primitives, scenario.seen_primitives)
    if verbose:
        print(f'Annotator notes: {scenario.annotator_notes}\n')

        print('\nSeen placements:')
        for placement in scenario.seen_placements:
            print(placement)
        print('\nParsed placements:')
        for placement, correct in zip(predicted_placements, placement_corrects):
            utils.print_colored(placement, 'green' if correct else 'red')
        print(f'\nSeen placement accuracy: {placement_accuracy:.2f}')

        print('\nSeen primitives:')
        for primitive in scenario.seen_primitives:
            print(primitive)
        print('\nParsed primitives:')
        for primitive, correct in zip(predicted_primitives, primitive_corrects):
            utils.print_colored(primitive, 'green' if correct else 'red')
        print(f'\nSeen primitive accuracy: {primitive_accuracy:.2f}')

        print('\nCategories:', categories)
        print('\nCategory placements:')
        for placement in category_placements:
            print(placement)
        print('\nCategory primitives:')
        for primitive in category_primitives:
            print(primitive)
        print('\n' + 80 * '-' + '\n')

    # YAML
    preferences = {
        'categories': categories,
        'placements': dict(category_placements),
        'primitives': dict(category_primitives),
    }
    with open(f'preferences/{scenario_name}.yml', 'w', encoding='utf8') as f:
        yaml.dump(preferences, f)

In [None]:
#process_scenario('test', verbose=True)
process_scenario('test')

In [None]:
for scenario_idx in range(8):
    #process_scenario(f'scenario-{scenario_idx + 1:02}', verbose=True)
    process_scenario(f'scenario-{scenario_idx + 1:02}')