## Multi Prompts

VirtualHomeの初期化等

In [None]:
import sys
sys.path.append('../simulation/')
from unity_simulator.comm_unity import UnityCommunication
comm = UnityCommunication()

objId_dic = {}
objProp_dic = {}
room_list = ["bathroom", "bedroom", "kitchen", "livingroom"]

def environment_initialization(scene_num, initial_room, initial_states):
    comm.reset(scene_num-1)
    g = comm.environment_graph()[1]
    # set initial states
    for init_state in initial_states:
        for node in g["nodes"]:
            if node.get("id") == init_state.get("id"):
                node["states"] = init_state.get("states")
    # update scene
    comm.expand_scene(g)
    # add egent
    comm.add_character('Chars/male1', initial_room=initial_room)
    # create dic
    g = comm.environment_graph()[1]
    for n in g["nodes"]:
        objId_dic[n["id"]] = n["class_name"]
        objProp_dic[n["id"]] = n["properties"]

def unity_simulation(script):
    success = comm.render_script(script, find_solution=False, recording=True, camera_mode=['PERSON_FROM_BACK'])[0] #skip_execution=True, 
    return success

シミュレータからの家庭環境知識の取得

In [None]:
import spacy
nlp = spacy.load("en_core_web_sm")


import inflection as i
def extract_nouns(text):
    doc = nlp(text)
    nouns = [token.text for token in doc if token.pos_ == "NOUN"]
    singular_nouns = [i.singularize(noun)  for noun in nouns if i.singularize(noun) not in nouns]

    return nouns + singular_nouns

# 家庭環境知識の取得（エージェントと関連オブジェクトに隣接するノードまでに限定）
def extract_environment_knowledge(related_objects):

    g = comm.environment_graph()[1]
    nodes = g["nodes"]
    edges = g["edges"]

    # object knowledge
    objKnowledge_dic = {}
    for n in nodes:
        if n["class_name"] in related_objects:
            objKnowledge_dic[n["id"]] = {
                "states": n["states"],
                "on": None,
                "inside": None,
                "location": None,
                "hold": []
            }

    add_objId = set()
    for e in edges:
        if e["from_id"] in objKnowledge_dic.keys():
            if e["relation_type"] == "ON" and objId_dic[e["to_id"]] not in ["walk", "floor"]: # 床との関係は無視
                objKnowledge_dic[e["from_id"]]["on"] = e["to_id"]
                add_objId.add(e["to_id"])
            elif e["relation_type"] == "INSIDE":
                if objId_dic[e["to_id"]] in room_list:
                    objKnowledge_dic[e["from_id"]]["location"] = e["to_id"]
                else:
                    objKnowledge_dic[e["from_id"]]["inside"] = e["to_id"]
                    add_objId.add(e["to_id"])

        if e["to_id"] in objKnowledge_dic.keys() and e["from_id"] in objKnowledge_dic.keys():
            if e["relation_type"] == "ON" or e["relation_type"] == "INSIDE":
                objKnowledge_dic[e["to_id"]]["hold"].append(e["from_id"])
        

    # add object knowledge
    for n in nodes:
        if n["id"] in add_objId:
            objKnowledge_dic[n["id"]] = {
                "states": n["states"],
                "on": None,
                "inside": None,
                "location": None,
                "hold": []
            }
    for e in edges:
        if e["from_id"] in add_objId: 
            if e["relation_type"] == "ON" and objId_dic[e["to_id"]] not in ["walk", "floor"]: # 床との関係は無視
                objKnowledge_dic[e["from_id"]]["on"] = e["to_id"]
            elif e["relation_type"] == "INSIDE":
                if objId_dic[e["to_id"]] in room_list:
                    objKnowledge_dic[e["from_id"]]["location"] = e["to_id"]
                else:
                    objKnowledge_dic[e["from_id"]]["inside"] = e["to_id"]

        if e["to_id"] in add_objId and e["from_id"] in objKnowledge_dic.keys():
            if e["relation_type"] == "ON" or e["relation_type"] == "INSIDE":
                objKnowledge_dic[e["to_id"]]["hold"].append(e["from_id"])


    # agent knowledge
    agentKnowledge_dic = {
        "close_to": [],
        "hold_rh": None,
        "hold_lh": None,
        "location": None
    }
    for e in edges:
        if e["from_id"] == 1:
            if e["relation_type"] == "CLOSE" and e["to_id"] in objKnowledge_dic.keys():
                agentKnowledge_dic["close_to"].append(e["to_id"])
            elif e["relation_type"] == "HOLDS_RH":
                agentKnowledge_dic["hold_rh"] = e["to_id"]
            elif e["relation_type"] == "HOLDS_LH":
                agentKnowledge_dic["hold_lh"] = e["to_id"]
            elif e["relation_type"] == "INSIDE" and objId_dic[e["to_id"]] in room_list:
                agentKnowledge_dic["location"] = e["to_id"]

    return objKnowledge_dic, agentKnowledge_dic

# 取得した家庭環境知識を自然言語に変換
def return_nlp(a, b):

    env_prompt = "The current states in the home are as follows: \n"
    for id, knowledge in a.items():
        text = "The " + objId_dic[id] + f" ({id})"
    
        if knowledge["states"]:
            text += f" is {knowledge['states'][0]} and"
            if knowledge["on"]:
                text += f" is ON the {objId_dic[knowledge['on']]} ({knowledge['on']}) and"

            elif knowledge["inside"]:
                text += f" is INSIDE the {objId_dic[knowledge['inside']]} ({knowledge['inside']}) and"

        else:
            if knowledge["on"]:
                text += f" is ON the {objId_dic[knowledge['on']]} ({knowledge['on']}) and"

            elif knowledge["inside"]:
                text += f" is INSIDE the {objId_dic[knowledge['inside']]} ({knowledge['inside']}) and"

        if knowledge["location"]:
            text += f" and is INSIDE the {objId_dic[knowledge['location']]} ({knowledge['location']}).\n"
        else:
            text = ""

        if knowledge["hold"]:
            receptacle_type = "INSIDE" if "CONTAINERS" in objProp_dic[id] else "ON"
            text += f"{objId_dic[knowledge['hold'][0]]} ({knowledge['hold'][0]})"
            if len(knowledge["hold"]) > 1:
                for ho in knowledge["hold"][1:]:
                    text += f" and {objId_dic[ho]} ({ho})"
                text += f" are {receptacle_type} the {objId_dic[id]} ({id}).\n"
            else:
                text += f" is {receptacle_type} the {objId_dic[id]} ({id}).\n"

        env_prompt += text


    agent_prompt = f"You are INSIDE the {objId_dic[b['location']]} ({b['location']}).\n"
    if b["hold_rh"] and b["hold_lh"]:
        agent_prompt += f"You are holding the {objId_dic[b['hold_rh']]} ({b['hold_rh']}) in your right hand and the {objId_dic[b['hold_lh']]} ({b['hold_lh']}) in your left hand.\n"
    elif b["hold_rh"]:
        agent_prompt += f"You are holding the {objId_dic[b['hold_rh']]} ({b['hold_rh']}) in your right hand.\n"
    elif b["hold_lh"]:
        agent_prompt += f"You are holding the {objId_dic[b['hold_lh']]} ({b['hold_lh']}) in your left hand.\n"

    if b["close_to"]:
        agent_prompt += f"You are close to the {objId_dic[b['close_to'][0]]} ({b['close_to'][0]})"
        if len(b['close_to']) > 1:
            for c in b['close_to'][1:]:
                agent_prompt += f" and the {objId_dic[c]} ({c})"
        agent_prompt += ".\n"
    
    env_prompt += agent_prompt

    return env_prompt

取り扱うアクションの一覧と、出力形式

In [None]:
allowed_actions = """
Allowed actions: 
Walk, Grab, Switch on, Switch off, Open, Close, Put, Put in
Output Format: 
[WALK] <Object> (ID)
[GRAB] <Object> (ID)
[SWITCHON] <Object> (ID)
[SWITCHOFF] <Object> (ID)
[OPEN] <Object> (ID)
[CLOSE] <Object> (ID)
[PUT] <Object1> (ID) <Object2> (ID)
[PUTIN] <Object1> (ID) <Object2> (ID)

"""

実行（テスト）するタスクの選択

In [None]:
#task_type = "state_change_task"    #状態変化タスク
task_type = "placement_task"      #配置タスク

import json
with open(f"dataset/{task_type}/test.json") as f:
    test_dataset = json.load(f)

# 例をプロンプトに組み込む必要があるため先に読み込む
with open(f"dataset/{task_type}/example.json") as f:
    example_dataset = json.load(f)

例のタスクの埋め込みと、センテンス間の類似度計算（最も類似するタスクを選択）

In [None]:
from sentence_transformers import SentenceTransformer, util
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

example_scripts = {}
all_tasknames = set()
for v in example_dataset.values():
    if v["task"] not in example_scripts:
        example_scripts[v["task"]] = []
    example_scripts[v["task"]].append(v["action_scripts"])
    all_tasknames.add(v["task"])

all_tasknames = list(all_tasknames)
embeddings = model.encode(all_tasknames)


def sentence_cosine_similarity(sentence):
    cos_sim = util.cos_sim(model.encode(sentence), embeddings)[0]
    all_sentence_combinations = []
    for i in range(len(cos_sim)):
        all_sentence_combinations.append([cos_sim[i], i])
    all_sentence_combinations = sorted(all_sentence_combinations, key=lambda x: x[0], reverse=True)
    all_sentence_combinations = all_sentence_combinations[:]  
    
    return [all_tasknames[all_sentence_combinations[i][1]] for i in range(1)]

few-shot用のプロンプト作成

In [None]:
def fewshot_prompt_generation(task): # one-shot
    example_tasks = sentence_cosine_similarity(task)

    example_prompt = ""
    for task in example_tasks:
        example_prompt += f"Example Task: {task}\n"
        scripts = max(example_scripts[task], key=len)
        step = 1
        for script in scripts:
            example_prompt += f"Step{str(step)}: {script}\n"
            step += 1
        example_prompt += "\n"

    return example_prompt

LLMおよび、LangChainの設定

In [None]:
# API KEY
API_KEY = ""

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain.memory import ChatMessageHistory

llm = ChatOpenAI(api_key=API_KEY, model="gpt-4o-mini", temperature=0.0)
output_parser = StrOutputParser()

# determination of task conmpletion（終了判定用）
llm_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a life support robot."),
    ("user", "{input}")
])
chain = llm_prompt | llm | output_parser

# precondition check（実行の前提条件の検証用）
pc_prompt = ChatPromptTemplate.from_messages([
    ("user", "{input}")
])
pc_chain = pc_prompt | llm | output_parser

# chat & action step generation（対話履歴を参照する処理用）
multi_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a life support robot. "),
    MessagesPlaceholder(variable_name="messages"),
    ("user", "{input}")
])
multi_chain = multi_prompt | llm | output_parser

実行の前提条件の検証と、アクションの再生成

In [None]:
with open(f"precondition.json") as f:
    preconditions = json.load(f)

def precondition_checking(action_step, home_env):
    checking_prompt = f"""
If the current status satisfies the preconditions, output "Yes"; otherwise, output "No".
"""
    split_data = action_step.split(" ")
    action = split_data[0][1:-1]
    
    if action in preconditions.keys():
        preconditions_prompt = preconditions[action]

        if len(split_data) == 5:
            preconditions_prompt = str(preconditions_prompt).replace("<Object1>", f"{split_data[1]} {split_data[2]}").replace("<Object2>", f"{split_data[3]} {split_data[4]}")
        else:
            preconditions_prompt = str(preconditions_prompt).replace("<Object>", f"{split_data[1]} {split_data[2]}")

        res = pc_chain.invoke({"input": home_env + checking_prompt + preconditions_prompt})
        #print(home_env + checking_prompt + preconditions_prompt)
        
    else:
        return "Incorrect output format"
    
    # extraction of the preconditions that are not satisfied
    if "Yes" in res:
        return "Satisfied"
    else:
        chat_history = ChatMessageHistory()
        chat_history.add_user_message(home_env + checking_prompt + preconditions_prompt)
        chat_history.add_ai_message(res)

        prec = multi_chain.invoke(
            {
                "messages": chat_history.messages,
                "input": "Which the preconditions are not satisfied?\nOutput only that." + preconditions_prompt
            }
        )
        return f"'{action_step}' can not execute, because it is not satisfied the following precondition.\n{prec}"


    
def regeneration(prompts, action_step, pc_output):
    chat_history = ChatMessageHistory()

    for prompt in prompts:
        chat_history.add_user_message(prompt)
    chat_history.add_ai_message(action_step)
    chat_history.add_user_message(pc_output)

    res = multi_chain.invoke(
        {
            "messages": chat_history.messages,
            "input": str(prompts[-1]).replace("Generate", "Regenerate")
        }
    )

    #print(chat_history)
    #print(str(prompts[-1]).replace("Generate", "Regenerate"))

    return res

アクションステップ生成（プロンプト分割）

In [None]:
def action_step_generation(prompts):
    chat_history = ChatMessageHistory()
    for prompt in prompts[:-1]:
        chat_history.add_user_message(prompt)

    res = multi_chain.invoke(
        {
            "messages": chat_history.messages,
            "input": prompts[-1],
        }
    )
    return res

システム全体の処理

In [None]:
def run(task, ablation):
    instruct_prompt = "You need to generate a next action step for completing a household task.\n"

    task_name = task["task"]
    environment_initialization(task["scene"], task["initial_room"], task["initial_states"])

    rel_objects = extract_nouns(task_name)
    fewshot_prompt = fewshot_prompt_generation(task_name)
    task_prompt = f"""\
Generate a only next action step to complete the following task and output only that.
Task: {task_name}
"""
    end_check_prompt = f"""
If the following task has already completed based on the current status, output "End"; otherwise, output "Continue".
Task: {task_name}
"""
    maximum_attempts = len(task["action_scripts"]) * 2
    count = 1

    action_script = []

    while True:
        #print("attempts:" + str(count))

        # extract environment knowledge
        a, b = extract_environment_knowledge(rel_objects)
        home_environment_prompt = return_nlp(a, b)
        #print(home_environment_prompt + end_check_prompt)

        # end check
        check = chain.invoke({"input": home_environment_prompt + end_check_prompt})
        #print(check)

        if "End" == check:
            if count == 1 and maximum_attempts == 0:
                return 1.0, "Success", action_script
            break
        elif "Continue" == check and maximum_attempts == 0:
            return 0.0, "Reaching Maximum Attempts", action_script

        # generate action
        merge_propmt = [instruct_prompt, allowed_actions, fewshot_prompt, home_environment_prompt, task_prompt + f"Step{count}: "]
        action = action_step_generation(merge_propmt)
        #print(merge_propmt)
        #print(action)

        # -------------------- precondition check with llm ----------------------
        if ablation == False:

            #print("--- precondition check ---")
            pc_output = precondition_checking(action, home_environment_prompt)
            #print(pc_output)
            if "Satisfied" == pc_output:
                pass
            # regenerate action
            elif "Incorrect output format" == pc_output:
                action = regeneration(merge_propmt, action, f"'{action}' is incorrect output format.")
                #print("--- action regeneration ---")
                #print(action)
            else:
                action = regeneration(merge_propmt, action, pc_output)
                #print("--- action regeneration ---")
                #print(action)
        # ------------------------------------------------------------------------

        # simulation
        success = unity_simulation(["<char0> " + action])
        if success is True:
            task_prompt += f"Step{count}: {action}\n"
            action_script.append(action)
        else:
            return 0.0, "Execution Failure", action_script
        
        # checking to reach maximun attempts
        if count >= maximum_attempts:
            return 0.0, "Reaching Maximum Attempts", action_script

        count += 1



    # cal success rate   
    g = comm.environment_graph()[1]
    num_of_achieves = 0
    num_of_goals = len(task["goal_states"])

    # states
    if len(task["goal_states"][0]) == 2:
        for goal_states in task["goal_states"]:
            for node in g["nodes"]:
                if node["id"] == goal_states["id"] and node["states"] == goal_states["states"]:
                    num_of_achieves += 1
    # relation
    else:
        for goal_states in task["goal_states"]:
            for e in g["edges"]:
                if e["from_id"] == goal_states["from_id"] and e["to_id"] == goal_states["to_id"] and e["relation_type"] == goal_states["relation_type"]:
                    num_of_achieves += 1

    score = num_of_achieves/num_of_goals
    if score == 1.0:
        return score, "Success", action_script
    
    return 0.0, "Erroneous Terminate", action_script

Demo

In [None]:
demo_task = {
    "state_change_task": {
        "task": "Turn on all lightswitches",
        "scene": 1,
        "initial_room": "bathroom",
        "initial_states": [
            {
                "id": 71,
                "states": [
                    "ON"
                ]
            },
            {
                "id": 173,
                "states": [
                    "ON"
                ]
            },
            {
                "id": 261,
                "states": [
                    "ON"
                ]
            },
            {
                "id": 427,
                "states": [
                    "OFF"
                ]
            }
        ],
        "goal_states": [
            {
                "id": 71,
                "states": [
                    "ON"
                ]
            },
            {
                "id": 173,
                "states": [
                    "ON"
                ]
            },
            {
                "id": 261,
                "states": [
                    "ON"
                ]
            },
            {
                "id": 427,
                "states": [
                    "ON"
                ]
            }
        ],
        "action_scripts": [
            "[WALK] <lightswitch> (427)",
            "[SWITCHON] <lightswitch> (427)"
        ],
        "comment": ""
    },
    "placement_task": {
        "task": "Put all chips on the kitchentable",
        "scene": 1,
        "initial_room": "kitchen",
        "initial_states": [],
        "goal_states": [
            {
                "from_id": 328,
                "to_id": 231,
                "relation_type": "ON"
            },
            {
                "from_id": 329,
                "to_id": 231,
                "relation_type": "ON"
            }
        ],
        "action_scripts": [
            "[WALK] <chips> (328)",
            "[GRAB] <chips> (328)",
            "[WALK] <kitchentable> (231)",
            "[PUT] <chips> (328) <kitchentable> (231)",
            "[WALK] <chips> (329)",
            "[GRAB] <chips> (329)",
            "[WALK] <kitchentable> (231)",
            "[PUT] <chips> (329) <kitchentable> (231)"
        ],
        "comment": ""
    }
}

In [None]:
run(demo_task[task_type], False)    #アブレーションスタディをする場合は第二引数を"False"

評価

In [None]:
result_dic = {}
for key, valuse in test_dataset.items():
    success_rate, result, action_script = run(valuse, False)   #アブレーションスタディをする場合は第二引数を"True"

    print(key, success_rate, result)

    result_dic[key] = {
        "score": success_rate,
        "result": result,
        "action_script":action_script,
        "attempts": len(action_script)
    }

    break


In [None]:
with open(f"result_.json", "w") as json_file:
    json.dump(result_dic, json_file, indent=4)