In [1]:
import os 
import json
import re

In [2]:
exp_name = "GPT4" # Gemini GPT4 MiniCPM Qwen2VL

In [None]:
def parse_execution_history(history_string):
    history_string = history_string.replace("Historical Execution: ", "")
    historty_splits = re.split(r'(\(fail\)|\(success\))\s', history_string)

    parsed_steps = []

    for i in range(0, len(historty_splits)-1, 2):
        result = 'success' if 'success' in historty_splits[i+1] else 'fail'
        action_parts = re.sub(r'\(.*?\)', '', historty_splits[i], 1).strip().lstrip('[').rstrip(']').split(',')
        
        parsed_step = [
            action_parts[0].strip(),
            action_parts[1].strip().lower() if len(action_parts) > 1 else "None",
            action_parts[2].strip().lower() if len(action_parts) > 2 else "None",
            result
        ]
        parsed_steps.append(parsed_step)

    return parsed_steps


In [None]:
def subtask_feedback_list_gen(exp_name,round):

    exp_path = os.path.join("exp", exp_name)

    subtask_feedback_list = []

    for index in os.listdir(exp_path):
        conv_path = os.path.join(exp_path, index, f"{round}/conversation.json")
        if not os.path.exists(conv_path):
            continue

        with open(conv_path, 'r') as file:
            conv = json.load(file)
        
         
        exe_dict = {}
        exe_dict["index"] = int(index) 
        exe_dict["step_len"] = len(conv)
        exe_dict["history"] = []
        for i in range(1,len(conv)):
            sub_dict = {}
            sub_dict["step"] = i
            history = conv[i].split('\n')[3]
            sub_dict["subtask"] = parse_execution_history(history)[-1]
            feedback = conv[i].split('\n')[4]
            sub_dict["feedback"] = feedback.replace("Feedback: ", "")
            exe_dict["history"].append(sub_dict)
        subtask_feedback_list.append(exe_dict)

    subtask_feedback_list = sorted(subtask_feedback_list, key=lambda x: x["index"])

    return subtask_feedback_list

In [None]:
def eval_success_list_gen(exp_name,round):

    exp_path = os.path.join("exp", exp_name)

    eval_success_list = []

    for index in os.listdir(exp_path):
        conv_path = os.path.join(exp_path, index, f"{round}/conversation.json")
        if not os.path.exists(conv_path):
            continue

        with open(conv_path, 'r') as file:
            conv = json.load(file)
        if 'End' in conv[-1]:   
            exe_dict = {}
            exe_dict["index"] = int(index) 
            exe_dict["step_len"] = len(conv)

            history = conv[-1].split('\n')[3]
            exe_dict["history"] = parse_execution_history(history)
            exe_dict["history"].append(["End", "None", "None", "success"])
            eval_success_list.append(exe_dict) 

    eval_success_list = sorted(eval_success_list, key=lambda x: x["index"])
    # print(len(eval_success_list))

    return eval_success_list

def load_name_mapping():
        with open('hab-mobile-manipulation/name_dict.txt', 'r') as file:
            content = file.read()
        lines = content.split('\n')
        print(lines[-1])
        name_dict = {}
        for i in range(0, len(lines), 3):
            value = lines[i].strip().strip(':')
            key = lines[i + 1].strip()
            keys = key.split('/')
            for i in keys:
                name_dict[i] = value

        return name_dict
    # mapping item name
mapping_dict = load_name_mapping()

def match_action(action, keypoint, mapping_dict):
        #print(keypoint)
        if action[3] == 'fail':
               return False

        key_action = [element.strip() for element in keypoint.strip('[').strip(']').split(',')]
        if len(key_action) == 2:
            key_action.append("None")
        if action[0] == key_action[0]:
                if key_action[0] == "End":
                    return True
                obj1 = mapping_dict[action[1]]
                obj2 = mapping_dict[key_action[1]]
                if obj1 == obj2:
                       obj11 = "None" if action[2] == "None" else mapping_dict[action[2]]
                       obj22 = "None" if key_action[2] == "None" else mapping_dict[key_action[2]]
                       if obj11 == obj22:
                              return True
        return False

def match(list1, list2):
    if len(list2) == 0:
        return False
    index1 = 0
    index2 = 0
    while index1 < len(list1) and index2 < len(list2):
        if match_action(list1[index1], list2[index2], mapping_dict):
            index2 += 1
        index1 += 1

    return index2 == len(list2)

def success_list_gen(eval_success_list):
    success_list = []

    for eval_success in eval_success_list:
        task_id = eval_success['index']
        history = eval_success['history']
        if 1 <= int(task_id) <= 90:
            data_dir = "EMMOE-100/data/train"
        else:
            data_dir = "EMMOE-100/data/test"

        with open(os.path.join(data_dir, str(task_id), "keypath.json"), 'r') as file:
            keypaths = json.load(file)

        for keypath in keypaths:
            if match(history, keypath):
                success_list.append(task_id)
                break
    return success_list




In [7]:
def error_analysis(exp_name):
    L_success, L1_success, L2_success, L3_success, L4_success = 0, 0, 0, 0, 0
    L_fail, L1_fail, L2_fail, L3_fail, L4_fail = 0, 0, 0, 0, 0
    D_success, D1_success, D2_success = 0, 0, 0
    D_fail, D1_fail, D2_fail = 0, 0, 0
    F_success, F1_success, F2_success = 0, 0, 0
    F_fail, F1_fail, F2_fail = 0, 0, 0
    E_success, E1_success, E2_success, E3_success = 0, 0, 0, 0
    E_fail, E1_fail, E2_fail, E3_fail = 0, 0, 0, 0
    step_success = 0
    step_fail = 0
    for round in range(1,4):
        eval_success_list = eval_success_list_gen(exp_name,round)
        success_list = success_list_gen(eval_success_list)
        subtask_feedback_list = subtask_feedback_list_gen(exp_name,round)

        for task in subtask_feedback_list:
            if task['index'] in success_list:
                step_success += task['step_len'] - 1
            else:
                step_fail += task['step_len'] - 1

            for step in task['history']:
                if 'the target is far away' in step['feedback']:
                    if task['index'] in success_list:
                        D1_success += 1
                    else:
                        D1_fail += 1
                elif 'the target is too close' in step['feedback']:
                    if task['index'] in success_list:
                        D2_success += 1
                    else:
                        D2_fail += 1
                elif 'is not in the action list! You should only choose actions in the list' in step['feedback']:
                    if task['index'] in success_list:
                        F1_success += 1
                    else:
                        F1_fail += 1
                elif 'does not exist! Please choose another object' in step['feedback']:
                    if task['index'] in success_list:
                        F2_success += 1
                    else:
                        F2_fail += 1
                elif 'the hand is full' in step['feedback']:
                    if task['index'] in success_list:
                        L1_success += 1
                    else:
                        L1_fail += 1
                elif 'the hand is empty' in step['feedback']:
                    if task['index'] in success_list:
                        L2_success += 1
                    else:
                        L2_fail += 1
                elif 'is closed, you should open it first' in step['feedback']:
                    if task['index'] in success_list:
                        L3_success += 1
                    else:
                        L3_fail += 1
                elif 'Please choose another object' in step['feedback']:
                    if task['index'] in success_list:
                        L4_success += 1
                    else:
                        L4_fail += 1
                elif 'the subtask is too difficult to perform' in step['feedback']:
                    if task['index'] in success_list:
                        E1_success += 1
                    else:
                        E1_fail += 1
                elif 'and the object is missing' in step['feedback']:
                    if task['index'] in success_list:
                        E2_success += 1
                    else:
                        E2_fail += 1
                elif 'time out' in step['feedback']:
                    if task['index'] in success_list:
                        E3_success += 1
                    else:
                        E3_fail += 1
    L_success = L1_success + L2_success + L3_success + L4_success
    L_fail = L1_fail + L2_fail + L3_fail + L4_fail
    D_success = D1_success + D2_success
    D_fail = D1_fail + D2_fail
    F_success = F1_success + F2_success
    F_fail = F1_fail + F2_fail
    E_success = E1_success + E2_success + E3_success
    E_fail = E1_fail + E2_fail + E3_fail
    return L1_success, L1_fail, L2_success, L2_fail, L3_success, L3_fail, L4_success, L4_fail, D1_success, D1_fail, D2_success, D2_fail, F1_success, F1_fail, F2_success, F2_fail, E1_success, E1_fail, E2_success, E2_fail, E3_success, E3_fail, step_success, step_fail

# Format error
if action not in ["go to", "open", "close", "pick", "put", "end"]:
        return 'fail', f'{action} is not in the action list! You should only choose actions in the list.', state_info, None
return 'fail', f'{input_item} does not exist! Please choose another object', state_info, None
# Logical error
    if inventory != 'None' and action in ['pick', 'open', 'close']:
        return 'fail', f'Unable to {action}, the hand is full', state_info, None
    if inventory == 'None' and action == 'put':
        return 'fail', f'Unable to {action}, the hand is empty', state_info, None
    if action == 'put' and state_info['art_objs_qpos']['fridge_:0000'][1] < 0.8  and input_item == 'fridge': 
        return 'fail', f'Unable to {action}, the {input_item} is closed, you should open it first', state_info, None
    if action == 'put' and state_info['art_objs_qpos']['kitchen_counter_:0000'][drawer_cnt] < 0.25  and  'drawer' in input_item:
        return 'fail', f'Unable to {action}, the {input_item} is closed, you should open it first', state_info, None
    if action in ['open','close'] and 'drawer' not in input_item and 'fridge' not in input_item:
        return 'fail', f'Can not {action} {input_item}! Please choose another object', state_info, None



# Distance error
if distance > 2:
            return 'fail', f'Unable to {action}, the target is far away', state_info, None
if distance < 0.1:
            return 'fail', f'Unable to {action}, the target is too close', state_info, None




# LLE error
if signal == 'fail':
        return 'fail', f'Unable to {action}, time out', state_info, video_images


if 'time out' in feedback:
                    if retry == 2:
                        feedback = f'Unable to {action}, the subtask is too difficult to perform'

if 'time out' in feedback and state_info['grasped_obj'] is None:
                        feedback = f'Unable to {action}, and the object is missing'

In [8]:
# exp_name = 'GPT4'
L1_success, L1_fail, L2_success, L2_fail, L3_success, L3_fail, L4_success, L4_fail, D1_success, D1_fail, D2_success, D2_fail, F1_success, F1_fail, F2_success, F2_fail, E1_success, E1_fail, E2_success, E2_fail, E3_success, E3_fail, step_success, step_fail = error_analysis(exp_name)
# L_success, L_fail, D_success, D_fail, F_success, F_fail, E_success, E_fail, error_success, error_fail = error_analysis(exp_name)
L_success = L1_success + L2_success + L3_success + L4_success
L_fail = L1_fail + L2_fail + L3_fail + L4_fail
D_success = D1_success + D2_success
D_fail = D1_fail + D2_fail
F_success = F1_success + F2_success
F_fail = F1_fail + F2_fail
E_success = E1_success + E2_success + E3_success
E_fail = E1_fail + E2_fail + E3_fail
L_success_per = L_success / (L_success + D_success + F_success + E_success)
L_fail_per = L_fail / (L_fail + D_fail + F_fail + E_fail)
D_success_per = D_success / (L_success + D_success + F_success + E_success)
D_fail_per = D_fail / (L_fail + D_fail + F_fail + E_fail)
F_success_per = F_success / (L_success + D_success + F_success + E_success)
F_fail_per = F_fail / (L_fail + D_fail + F_fail + E_fail)
E_success_per = E_success / (L_success + D_success + F_success + E_success)
E_fail_per = E_fail / (L_fail + D_fail + F_fail + E_fail)
All_success_per = (L_success + D_success + F_success + E_success) / (step_success)
All_fail_per = (L_fail + D_fail + F_fail + E_fail) / (step_fail)

In [9]:
resulttt = {}
str_list = 'L1_success, L1_fail, L2_success, L2_fail, L3_success, L3_fail, L4_success, L4_fail, D1_success, D1_fail, D2_success, D2_fail, F1_success, F1_fail, F2_success, F2_fail, E1_success, E1_fail, E2_success, E2_fail, E3_success, E3_fail'.split(', ')
for i in range(len(str_list)):
    resulttt[str_list[i]] = [L1_success, L1_fail, L2_success, L2_fail, L3_success, L3_fail, L4_success, L4_fail, D1_success, D1_fail, D2_success, D2_fail, F1_success, F1_fail, F2_success, F2_fail, E1_success, E1_fail, E2_success, E2_fail, E3_success, E3_fail][i]
resulttt

{'L1_success': 5,
 'L1_fail': 228,
 'L2_success': 1,
 'L2_fail': 4,
 'L3_success': 1,
 'L3_fail': 23,
 'L4_success': 0,
 'L4_fail': 121,
 'D1_success': 56,
 'D1_fail': 279,
 'D2_success': 0,
 'D2_fail': 2,
 'F1_success': 2,
 'F1_fail': 19,
 'F2_success': 22,
 'F2_fail': 2152,
 'E1_success': 20,
 'E1_fail': 464,
 'E2_success': 2,
 'E2_fail': 17,
 'E3_success': 17,
 'E3_fail': 8}

In [10]:
L_success, L_fail, D_success, D_fail, F_success, F_fail, E_success, E_fail, step_success, step_fail

(7, 376, 56, 281, 24, 2171, 39, 489, 416, 4506)

In [11]:
error_success = L_success + D_success + F_success + E_success
erroor_fail = L_fail + D_fail + F_fail + E_fail
error_success, erroor_fail

(126, 3317)

In [12]:
step_success + step_fail

4922

In [13]:
step_success,step_fail

(416, 4506)

Table 6

In [14]:
def LLE_error_analysis(exp_name):
    Nav = 0
    Pick = 0
    Place = 0
    Open = 0
    Close = 0
    SR_range = {'Nav':[],'Pick':[],'Place':[],'Open':[],'Close':[]}
    for round in range(1,4):

        subtask_feedback_list = subtask_feedback_list_gen(exp_name,round)

        Nav_success = 0
        Nav_fail = 0
        Pick_success = 0
        Pick_fail = 0
        Place_success = 0
        Place_fail = 0
        Open_success = 0
        Open_fail = 0
        Close_success = 0
        Close_fail = 0

        for task in subtask_feedback_list:

            for step in task['history']:
                if 'the target is far away' in step['feedback'] or 'the target is too close' in step['feedback']:
                    continue
                elif 'is not in the action list! You should only choose actions in the list' in step['feedback'] or 'does not exist! Please choose another object' in step['feedback']:
                    continue
                elif 'the hand is full' in step['feedback'] or 'the hand is empty' in step['feedback'] or 'is closed, you should open it first' in step['feedback'] or 'Please choose another object' in step['feedback']:
                    continue
                elif 'None' not in step['feedback']:
                    if 'Go to' in step['subtask']:
                        Nav_fail += 1
                    elif 'Pick' in step['subtask']:
                        Pick_fail += 1
                    elif 'Put' in step['subtask']:
                        Place_fail += 1
                        # if 'time out' in step['feedback']:
                        #     print(step['step'],step['subtask'])
                        #     print(round,task['index'])
                    elif 'Open' in step['subtask']:
                        Open_fail += 1
                    elif 'Close' in step['subtask']:
                        Close_fail += 1
                else:
                    if 'Go to' in step['subtask']:
                        Nav_success += 1
                    elif 'Pick' in step['subtask']:
                        Pick_success += 1
                    elif 'Put' in step['subtask']:
                        Place_success += 1
                    elif 'Open' in step['subtask']:
                        Open_success += 1
                    elif 'Close' in step['subtask']:
                        Close_success += 1
        Nav += Nav_fail
        Pick += Pick_fail
        Place += Place_fail
        Open += Open_fail
        Close += Close_fail

        SR_range['Nav'].append(Nav_success/max(0.001,(Nav_success+Nav_fail)))
        SR_range['Pick'].append(Pick_success/max(0.001,(Pick_success+Pick_fail)))
        SR_range['Place'].append(Place_success/max(0.001,(Place_success+Place_fail)))
        SR_range['Open'].append(Open_success/max(0.001,(Open_success+Open_fail)))
        SR_range['Close'].append(Close_success/max(0.001,(Close_success+Close_fail)))
    return Nav, Pick, Place, Open, Close, SR_range

In [15]:
Nav, Pick, Place, Open, Close, SR_range = LLE_error_analysis(exp_name)
Nav_per = Nav / (Nav + Pick + Place + Open + Close)
Pick_per = Pick / (Nav + Pick + Place + Open + Close)
Place_per = Place / (Nav + Pick + Place + Open + Close)
Open_per = Open / (Nav + Pick + Place + Open + Close)
Close_per = Close / (Nav + Pick + Place + Open + Close)
Nav_SR = [min(SR_range['Nav']), max(SR_range['Nav'])]
Pick_SR = [min(SR_range['Pick']), max(SR_range['Pick'])]
Place_SR = [min(SR_range['Place']), max(SR_range['Place'])]
Open_SR = [min(SR_range['Open']), max(SR_range['Open'])]
Close_SR = [min(SR_range['Close']), max(SR_range['Close'])]

In [16]:
Nav, Pick, Place, Open, Close, SR_range

(107,
 362,
 44,
 15,
 0,
 {'Nav': [0.9444444444444444, 0.88, 0.8631284916201117],
  'Pick': [0.453781512605042, 0.475, 0.41743119266055045],
  'Place': [0.84, 0.8089887640449438, 0.872093023255814],
  'Open': [0.6875, 0.6, 0.7894736842105263],
  'Close': [0.0, 0.0, 1.0]})

In [17]:
Nav+ Pick+ Place+ Open+ Close

528

In [18]:
Nav_SR, Pick_SR, Place_SR, Open_SR, Close_SR

([0.8631284916201117, 0.9444444444444444],
 [0.41743119266055045, 0.475],
 [0.8089887640449438, 0.872093023255814],
 [0.6, 0.7894736842105263],
 [0.0, 1.0])

In [19]:
def LLE_error_analysis_overall():

    Nav_success = 0
    Nav_fail = 0
    Pick_success = 0
    Pick_fail = 0
    Place_success = 0
    Place_fail = 0
    Open_success = 0
    Open_fail = 0
    Close_success = 0
    Close_fail = 0
    for exp_name in ['GPT4','Gemini','Qwen2VL','MiniCPM','sft','sft+dpo']:
        for round in range(1,4):

            subtask_feedback_list = subtask_feedback_list_gen(exp_name,round)

            for task in subtask_feedback_list:

                for step in task['history']:
                    if 'the target is far away' in step['feedback'] or 'the target is too close' in step['feedback']:
                        continue
                    elif 'is not in the action list! You should only choose actions in the list' in step['feedback'] or 'does not exist! Please choose another object' in step['feedback']:
                        continue
                    elif 'the hand is full' in step['feedback'] or 'the hand is empty' in step['feedback'] or 'is closed, you should open it first' in step['feedback'] or 'Please choose another object' in step['feedback']:
                        continue
                    elif 'None' not in step['feedback']:
                        if 'Go to' in step['subtask']:
                            Nav_fail += 1
                        elif 'Pick' in step['subtask']:
                            Pick_fail += 1
                        elif 'Put' in step['subtask']:
                            Place_fail += 1
                            # if 'time out' in step['feedback']:
                            #     print(step['step'],step['subtask'])
                            #     print(round,task['index'])
                        elif 'Open' in step['subtask']:
                            Open_fail += 1
                        elif 'Close' in step['subtask']:
                            Close_fail += 1
                    else:
                        if 'Go to' in step['subtask']:
                            Nav_success += 1
                        elif 'Pick' in step['subtask']:
                            Pick_success += 1
                        elif 'Put' in step['subtask']:
                            Place_success += 1
                        elif 'Open' in step['subtask']:
                            Open_success += 1
                        elif 'Close' in step['subtask']:
                            Close_success += 1

    return Nav_success,Nav_fail,Pick_success,Pick_fail,Place_success,Place_fail,Open_success,Open_fail,Close_success,Close_fail

In [20]:
Nav_success,Nav_fail,Pick_success,Pick_fail,Place_success,Place_fail,Open_success,Open_fail,Close_success,Close_fail = LLE_error_analysis_overall()
Nav_fail,Pick_fail,Place_fail,Open_fail,Close_fail

(938, 1213, 178, 81, 27)

In [21]:
Nav_fail+Pick_fail+Place_fail+Open_fail+Close_fail

2437

In [22]:
Nav_success/(Nav_fail*3 + Nav_success), Nav_success/(Nav_fail*3 + Nav_success*2), Nav_success/(Nav_fail*3 + Nav_success*3)

(0.828936170212766, 0.4532340623545835, 0.31187960294588535)

In [23]:
Pick_success/(Pick_fail*3 + Pick_success), Pick_success/(Pick_fail*3 + Pick_success*2), Pick_success/(Pick_fail*3 + Pick_success*3)

(0.289535337758688, 0.22452687358062073, 0.18335806132542037)

In [24]:
Place_success/(Place_fail*3 + Place_success), Place_success/(Place_fail*3 + Place_success*2), Place_success/(Place_fail*3 + Place_success*3)

(0.6939828080229227, 0.40967523680649526, 0.2906167506599472)

In [25]:
Open_success/(Open_fail*3 + Open_success), Open_success/(Open_fail*3 + Open_success*2), Open_success/(Open_fail*3 + Open_success*3)

(0.7584493041749503, 0.4313171283210854, 0.3013428120063191)

In [26]:
Close_success/(Close_fail*3 + Close_success), Close_success/(Close_fail*3 + Close_success*2), Close_success/(Close_fail*3 + Close_success*3)

(0.5736842105263158, 0.36454849498327757, 0.26715686274509803)