In [11]:
import argparse
import glob
import json
import os
import subprocess
import random
import sys
import time
import backoff
import openai
from openai import OpenAI

# FAST_DOWNWARD_ALIAS = "lama"
FAST_DOWNWARD_ALIAS = "seq-opt-fdss-1"
OPENAI_API_KEY=""


In [12]:
def postprocess(x):
    return x.strip()
# extract a cost value from a string x
def get_cost(x):
    splitted = x.split()
    counter = 0
    found = False
    cost = 1e5
    for i, xx in enumerate(splitted):
        if xx == "cost":
            counter = i
            found = True
            break
    if found:
        cost = float(splitted[counter+2])
    return cost

In [13]:
# This function populates a list of tasks based on the files present in the specified domain's directory.
def get_tasks(domain_name):
    domain_path = f"./domain/{domain_name}"
    task_files = []
    # Iterate over all .nl files in the domain directory
    for file_name in glob.glob(f"{domain_path}/*.nl"):
        base_file_name = os.path.basename(file_name)
        if "domain" not in base_file_name and "p_example" not in base_file_name:
            if os.path.exists(file_name.replace("nl", "pddl")):
                task_files.append(base_file_name)
                
    sorted_task_files = sorted(task_files)
    tasks = [(file, file.replace("nl", "pddl")) for file in sorted_task_files]
    return tasks

# Reads and returns the processed content of the natural language and PDDL files for a specific task.
def get_task(tasks, i,domain_name):
    nl_f, pddl_f = f"./domain/{domain_name}/{tasks[i][0]}", f"./domain/{domain_name}/{tasks[i][1]}"
    with open(nl_f, 'r') as f:
        nl = f.read()
    with open(pddl_f, 'r') as f:
        pddl = f.read()
    return postprocess(nl), postprocess(pddl)
    
# Reads and returns the processed content of the context files (NL, PDDL, SOL)
def get_context(domain_name):
    nl_f   = f"./domain/{domain_name}/p_example.nl"
    pddl_f = f"./domain/{domain_name}/p_example.pddl"
    sol_f  = f"./domain/{domain_name}/p_example.sol"
    with open(nl_f, 'r') as f:
        nl   = f.read()
    with open(pddl_f, 'r') as f:
        pddl = f.read()
    with open(sol_f, 'r') as f:
        sol  = f.read()
    return postprocess(nl), postprocess(pddl), postprocess(sol)

# Reads and returns the processed content of the domain PDDL file.
def get_domain_pddl(domain_name):
    domain_pddl_f = f"./domain/{domain_name}/domain.pddl"
    with open(domain_pddl_f, 'r') as f:
        domain_pddl = f.read()
    return postprocess(domain_pddl)

# Reads and returns the processed content of the domain NL file.
def get_domain_nl(domain_name):
    domain_nl_f = f"./domain/{domain_name}/domain.nl"
    try:
        with open(domain_nl_f, 'r') as f:
            domain_nl = f.read()
    except:
        domain_nl = "Nothing"
    return postprocess(domain_nl)

## Create Prompts for LLMs

In [14]:
def create_llm_prompt(task_nl, domain_nl):
    # Baseline (LLM-as-Planner): directly ask the LLM for plan
    prompt = f"{domain_nl} \n" + \
                f"Now consider a planning problem. " + \
                f"The problem description is: \n {task_nl} \n" + \
                f"Can you provide an optimal plan, in the way of a " + \
                f"sequence of behaviors, to solve the problem?"
    return prompt

def create_llm_ic_prompt(task_nl, domain_nl, context):
    # Baseline (LLM-as-Planner with context): directly ask the LLM for plan
    context_nl, context_pddl, context_sol = context
    prompt = f"{domain_nl} \n" + \
             f"An example planning problem is: \n {context_nl} \n" + \
             f"A plan for the example problem is: \n {context_sol} \n" + \
             f"Now I have a new planning problem and its description is: \n {task_nl} \n" + \
             f"Can you provide an optimal plan, in the way of a " + \
             f"sequence of behaviors, to solve the problem?"
    return prompt

def create_llm_pddl_prompt(
        task_nl, domain_nl):
    # Modified BaseLine (LLM+Planner w/o context), no context, create the problem PDDL
    prompt = f"{domain_nl} \n" + \
                f"Now consider a planning problem. " + \
                f"The problem description is: \n {task_nl} \n" + \
                f"Provide me with the problem PDDL file that describes " + \
                f"the planning problem directly without further explanations?" +\
                f"Keep the domain name consistent in the problem PDDL. Only return the PDDL file. Do not return anything else."
    return prompt

def create_llm_ic_pddl_prompt( task_nl, domain_pddl, context):
    # Modified BaseLine(LLM+Planner), create the problem PDDL given the context
    context_nl, context_pddl, context_sol = context
    prompt = f"I want you to solve planning problems. " + \
                f"An example planning problem is: \n {context_nl} \n" + \
                f"The problem PDDL file to this problem is: \n {context_pddl} \n" + \
                f"Now I have a new planning problem and its description is: \n {task_nl} \n" + \
                f"Provide me with the problem PDDL file that describes " + \
                f"the new planning problem directly without further explanations? Only return the PDDL file. Do not return anything else."
    return prompt


# Interact with LLMs

In [15]:
def query(prompt_text, api_key):
    # Create an OpenAI client
    client = OpenAI(api_key=api_key)

    # Define the parameters for the chat completion
    chat_params = {
        "model": "gpt-3.5-turbo",
        "messages": [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt_text},
        ],
    }

    # Create the chat completion
    response = client.chat.completions.create(**chat_params)

    # Extract the result text from the completion
    result_text = response.choices[0].message.content

    return result_text

def plan_to_language( plan, task_nl, domain_pddl,context_pddl,context_sol):
    domain_pddl_ = " ".join(domain_pddl.split())
    task_nl_ = " ".join(task_nl.split())
    prompt = f"An example PDDL plan is: \n {context_pddl} \n" + \
                f"The corresponding Natural Language Description of this PDDL plan is : \n {context_sol} \n" + \
                f"Now I have a new planning problem and its description is: \n {task_nl_} \n" + \
                f"The corresponding domain PDDL file is: \n {domain_pddl_} \n" + \
                f"The optimal PDDL plan is: \n {plan} \n" + \
                f"Transform the PDDL plan into a sequence of behaviors without further explanation."
    res = query(prompt,OPENAI_API_KEY).strip() + "\n"
    return res

# BaseLine Model: Only LLM

#### Without Context

In [16]:
def llm_planner(domain_name,time_limit,tasks,task_number):
    """
    Baseline method:
        The LLM will be asked to directly give a plan based on the task description.
    """
    domain_pddl      = get_domain_pddl(domain_name)
    domain_pddl_file = f"./domain/{domain_name}/domain.pddl"
    domain_nl        = get_domain_nl(domain_name)
    domain_nl_file   = f"./domain/{domain_name}/domain.nl"

    # create the tmp / result folders
    problem_folder = f"./experiments/problems/llm"
    plan_folder    = f"./experiments/plans/llm"
    result_folder  = f"./experiments/results/llm"

    if not os.path.exists(problem_folder):
        os.system(f"mkdir -p {problem_folder}")
    if not os.path.exists(plan_folder):
        os.system(f"mkdir -p {plan_folder}")
    if not os.path.exists(result_folder):
        os.system(f"mkdir -p {result_folder}")

    start_time = time.time()

    # A. generate problem pddl file
    # task_suffix        = f"{domain_name}/{tasks[task_number][1]}"
    task_suffix        = f"{tasks[task_number][1]}"
    task_nl, task_pddl = get_task(tasks,task_number,domain_name) 
    prompt             = create_llm_prompt(task_nl, domain_nl)
    text_plan          = query(prompt,OPENAI_API_KEY)

    # B. write the problem file into the problem folder
    text_plan_file_name = f"./experiments/results/llm/{task_suffix}"
    with open(text_plan_file_name, "w") as f:
        f.write(text_plan)
    end_time = time.time()
    print(f"[info] task {task_number} takes {end_time - start_time} sec")

#### With Context

In [17]:
def llm_ic_planner(domain_name,time_limit,tasks,task_number):
    """
    Baseline method:
        The LLM will be asked to directly give a plan based on the task description.
    """
    context          = get_context(domain_name)
    domain_pddl      = get_domain_pddl(domain_name)
    domain_pddl_file = f"./domain/{domain_name}/domain.pddl"
    domain_nl        = get_domain_nl(domain_name)
    domain_nl_file   = f"./domain/{domain_name}/domain.nl"

    # create the tmp / result folders
    problem_folder = f"./experiments/problems/llm_ic"
    plan_folder    = f"./experiments/plans/llm_ic"
    result_folder  = f"./experiments/results/llm_ic"

    if not os.path.exists(problem_folder):
        os.system(f"mkdir -p {problem_folder}")
    if not os.path.exists(plan_folder):
        os.system(f"mkdir -p {plan_folder}")
    if not os.path.exists(result_folder):
        os.system(f"mkdir -p {result_folder}")


    start_time = time.time()

    # A. generate problem pddl file
    task_suffix        = f"{tasks[task_number][1]}"
    task_nl, task_pddl = get_task(tasks,task_number,domain_name) 
    prompt             = create_llm_ic_prompt(task_nl, domain_nl, context)
    text_plan          = query(prompt,OPENAI_API_KEY)

    # B. write the problem file into the problem folder
    text_plan_file_name = f"./experiments/results/llm_ic/{task_suffix}"
    with open(text_plan_file_name, "w") as f:
        f.write(text_plan)
    end_time = time.time()
    print(f"[info] task {task_number} takes {end_time - start_time} sec")

# Improved Model: LLM + Planner Combination

#### Without Context

In [18]:
def llm_pddl_planner(domain_name,time_limit,tasks,task_number):
    """
    Baseline method:
        Same as ours, except that no context is given. In other words, the LLM
        will be asked to directly give a problem PDDL file without any context.
    """
    domain_pddl      = get_domain_pddl(domain_name)
    domain_pddl_file = f"./domain/{domain_name}/domain.pddl"
    domain_nl        = get_domain_nl(domain_name)
    domain_nl_file   = f"./domain/{domain_name}/domain.nl"

    # create the tmp / result folders
    problem_folder = f"./experiments/problems/llm_pddl"
    plan_folder    = f"./experiments/plans/llm_pddl"
    result_folder  = f"./experiments/results/llm_pddl"

    if not os.path.exists(problem_folder):
        os.system(f"mkdir -p {problem_folder}")
    if not os.path.exists(plan_folder):
        os.system(f"mkdir -p {plan_folder}")
    if not os.path.exists(result_folder):
        os.system(f"mkdir -p {result_folder}")

    start_time = time.time()

    # A. generate problem pddl file
    task_suffix= f"{tasks[task_number][1]}"
    task_nl, task_pddl = get_task(tasks,task_number,domain_name) 
    prompt = create_llm_pddl_prompt(task_nl, domain_pddl)

    task_pddl_file_name = f"./experiments/problems/llm_pddl/{task_suffix}"
    raw_result= query(prompt,OPENAI_API_KEY)

    # B. write the problem file into the problem folder
    with open(task_pddl_file_name, "w") as f:
        f.write(raw_result)
    time.sleep(1)

    ## C. run fastforward to plan
    plan_file_name = f"./experiments/plans/llm_pddl/{task_suffix}"
    sas_file_name  = f"./experiments/plans/llm_pddl/{task_suffix}.sas"
    validate_option = "--validate"  # "--debug" or "--validate" if you want to explicitly enable validation
    os.system(f"python ../downward/fast-downward.py --alias {FAST_DOWNWARD_ALIAS} " + \
              f"--search-time-limit {time_limit} --plan-file {plan_file_name} " + \
              f"--sas-file {sas_file_name} {validate_option} " + \
              f"{domain_pddl_file} {task_pddl_file_name}")

    # D. collect the least cost plan
    best_cost = 1e10
    best_plan = None
    for fn in glob.glob(f"{plan_file_name}"):
        with open(fn, "r") as f:
            plans = f.readlines()
            cost = get_cost(plans[-1])
            if cost < best_cost:
                best_cost = cost
                best_plan = "\n".join([p.strip() for p in plans[:-1]])

    # E. translate the plan back to natural language, and write it to result
    # '''
    if best_plan:
        plans_nl = plan_to_language(best_plan, task_nl, domain_nl, domain_pddl)
        plan_nl_file_name = f"./experiments/results/llm_pddl/{task_suffix}"
        with open(plan_nl_file_name, "w") as f:
            f.write(plans_nl)
    # '''
    end_time = time.time()
    if best_plan:
        print(f"[info] task {task_number} takes {end_time - start_time} sec, found a plan with cost {best_cost}")
    else:
        print(f"[info] task {task_number} takes {end_time - start_time} sec, no solution found")

#### With Context

In [19]:
def llm_ic_pddl_planner(domain_name,time_limit,tasks,task_number):
    """
    Our method:
        context: (task natural language, task problem PDDL)
        Condition on the context (task description -> task problem PDDL),
        LLM will be asked to provide the problem PDDL of a new task description.
        Then, we use a planner to find the near optimal solution, and translate
        that back to natural language.
    """
    context          = get_context(domain_name)
    domain_pddl      = get_domain_pddl(domain_name)
    domain_pddl_file = f"./domain/{domain_name}/domain.pddl"
    domain_nl        = get_domain_nl(domain_name)
    domain_nl_file   = f"./domain/{domain_name}/domain.nl"

    # create the tmp / result folders
    problem_folder = f"./experiments/problems/llm_ic_pddl"
    plan_folder    = f"./experiments/plans/llm_ic_pddl"
    result_folder  = f"./experiments/results/llm_ic_pddl"

    if not os.path.exists(problem_folder):
        os.system(f"mkdir -p {problem_folder}")
    if not os.path.exists(plan_folder):
        os.system(f"mkdir -p {plan_folder}")
    if not os.path.exists(result_folder):
        os.system(f"mkdir -p {result_folder}")

    start_time = time.time()

    # A. generate problem pddl file
    task_suffix= f"{tasks[task_number][1]}"
    task_nl, task_pddl = get_task(tasks,task_number,domain_name) 
    prompt = create_llm_ic_pddl_prompt(task_nl, domain_pddl, context)

    task_pddl_file_name = f"./experiments/problems/llm_ic_pddl/{task_suffix}"
    
    raw_result= query(prompt,OPENAI_API_KEY)

    # B. write the problem file into the problem folder
    with open(task_pddl_file_name, "w") as f:
        f.write(raw_result)
    time.sleep(1)

    ## C. run fastforward to generate plan
    plan_file_name = f"./experiments/plans/llm_ic_pddl/{task_suffix}"
    sas_file_name  = f"./experiments/plans/llm_ic_pddl/{task_suffix}.sas"
    validate_option = "--validate"  # "--debug" or "--validate" if you want to explicitly enable validation
    os.system(f"python ../downward/fast-downward.py --alias {FAST_DOWNWARD_ALIAS} " + \
              f"--search-time-limit {time_limit} --plan-file {plan_file_name} " + \
              f"--sas-file {sas_file_name} {validate_option} " + \
              f"{domain_pddl_file} {task_pddl_file_name}")

    # D. collect the least cost plan
    best_cost = 1e10
    best_plan = None
    for fn in glob.glob(f"{plan_file_name}"):
        with open(fn, "r") as f:
            plans = f.readlines()
            cost = get_cost(plans[-1])
            if cost < best_cost:
                best_cost = cost
                best_plan = "\n".join([p.strip() for p in plans[:-1]])

    # E. translate the plan back to natural language, and write it to result
    # '''
    if best_plan:
        plans_nl = plan_to_language(best_plan, task_nl, domain_nl, domain_pddl)
        plan_nl_file_name = f"./experiments/results/llm_ic_pddl/{task_suffix}"
        with open(plan_nl_file_name, "w") as f:
            f.write(plans_nl)
    # '''
    end_time = time.time()
    if best_plan:
        print(f"[info] task {task_number} takes {end_time - start_time} sec, found a plan with cost {best_cost}")
    else:
        print(f"[info] task {task_number} takes {end_time - start_time} sec, no solution found")

# Main Function

In [24]:
domain_name="blocksworld"
methods=["llm_ic_pddl_planner","llm_pddl_planner","llm_planner","llm_ic_planner"]
method=methods[0]
time_limit=200

task_number=14
tasks=[] # should be list of tuples like (descritpion, ground_truth_pddl)

# Call the function to populate the tasks list
tasks = get_tasks(domain_name)

# 3. execute the llm planner
method_function = {
    "llm_ic_pddl_planner"   : llm_ic_pddl_planner,
    "llm_pddl_planner"      : llm_pddl_planner,
    "llm_planner"           : llm_planner,
    "llm_ic_planner"        : llm_ic_planner, 
}[method]

method_function(domain_name,time_limit,tasks,task_number)

INFO     planner time limit: None
INFO     planner memory limit: None

INFO     Running translator.
INFO     translator stdin: None
INFO     translator time limit: None
INFO     translator memory limit: None
INFO     translator command line string: /home/jainav/python-environments/pe-llm/bin/python /home/jainav/Work/PE-LLM/downward/builds/release/bin/translate/translate.py ./domain/blocksworld/domain.pddl ./experiments/problems/llm_ic_pddl/p15.pddl --sas-file ./experiments/plans/llm_ic_pddl/p15.pddl.sas
Parsing...
Parsing: [0.000s CPU, 0.001s wall-clock]
Normalizing task... [0.000s CPU, 0.000s wall-clock]
Instantiating...
Generating Datalog program... [0.000s CPU, 0.000s wall-clock]
Normalizing Datalog program...
Normalizing Datalog program: [0.000s CPU, 0.001s wall-clock]
Preparing model... [0.010s CPU, 0.000s wall-clock]
Generated 21 rules.
Computing model... [0.000s CPU, 0.002s wall-clock]
339 relevant atoms
227 auxiliary atoms
566 final queue length
944 total queue pushes
Completin

In [None]:
# domain_name="blocksworld"
# methods=["llm_ic_pddl_planner","llm_pddl_planner","llm_planner","llm_ic_planner"]
# for method in methods:
#     time_limit=200
#     for task_number in range(1,21):
#         tasks=[] # should be list of tuples like (descritpion, ground_truth_pddl)

#         # Call the function to populate the tasks list
#         tasks = get_tasks(domain_name)

#         # 3. execute the llm planner
#         method_function = {
#             "llm_ic_pddl_planner"   : llm_ic_pddl_planner,
#             "llm_pddl_planner"      : llm_pddl_planner,
#             "llm_planner"           : llm_planner,
#             "llm_ic_planner"        : llm_ic_planner, 
#         }[method]

#         method_function(domain_name,time_limit,tasks,task_number)

In [39]:
print(tasks)

[('p01.nl', 'p01.pddl'), ('p02.nl', 'p02.pddl'), ('p03.nl', 'p03.pddl'), ('p04.nl', 'p04.pddl'), ('p05.nl', 'p05.pddl'), ('p06.nl', 'p06.pddl'), ('p07.nl', 'p07.pddl'), ('p08.nl', 'p08.pddl'), ('p09.nl', 'p09.pddl'), ('p10.nl', 'p10.pddl'), ('p11.nl', 'p11.pddl'), ('p12.nl', 'p12.pddl'), ('p13.nl', 'p13.pddl'), ('p14.nl', 'p14.pddl'), ('p15.nl', 'p15.pddl'), ('p16.nl', 'p16.pddl'), ('p17.nl', 'p17.pddl'), ('p18.nl', 'p18.pddl'), ('p19.nl', 'p19.pddl'), ('p20.nl', 'p20.pddl')]


# Extras