# Wrappers

pddlgym expects certain specific class info 

`StringWrapper` allows us to give this as a string. E.g., `"move(tree1_7:tree1, crafting_table1_5:crafting_table1)"`

`PyperWrapper` for pyperplan compatibility

In [None]:
# String Wrappers 
## For stringifying stuff to pddlgym

import gym
from pddlgym.structs import Predicate, Literal, TypedEntity, Type

class StringWrapper(gym.Wrapper):
    """
    Allows for providing actions as strings rather than the pddlgym representation
    """
    def __init__(self, env):
        super().__init__(env)
        self.env = env
        self.current_state = None
        
    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        self.current_state = obs
        return obs

    def step(self,action_str):
        action = self._str_to_action(action_str)
        print(f"Action: {action}", type(action))
        next_state, reward, done, info = self.env.step(action)
        self.current_state = next_state
        return next_state, reward, done, info
    
    def observe(self):
        return self.current_state

    def _str_to_action(self, action_str):
        name = action_str.split("(")[0]
        args = action_str.split("(")[1].split(")")[0].split(",")
        objs = []
        for arg in args:
            arg = arg.replace(" ","")
            obj_type_str = arg.split(":")[1]
            obj_const_str = arg.split(":")[0]

            obj_type = Type(obj_type_str)
            obj_const = obj_type(obj_const_str)
            objs.append(obj_const)
        actionP = Predicate(name, len(args))
        action = Literal(actionP, objs)
        return action
     

In [None]:
import pddlgym

env = pddlgym.make('PDDLEnvTreasure-v0')
env = StringWrapper(env)
env.reset()
obs = env.observe()
print(obs)
action = "move(r1:r,q3:q)"
obs,_,_,_ = env.step(action)
print(obs)


In [51]:
# pyperplan wrapper 

import gym
import pddlgym.structs as pgym 
import pyperplan.pddl.pddl as pyper


# pyperplan wrapper
class PyperWrapper(gym.Wrapper):
    """
    Allows for translating between pddlgym and pyperplan representations.
    Can run pyperplan solutions in pddlgym
    Can read pddlgym output in pyperplan state representation
    """
    def __init__(self, env):
        super().__init__(env)
        self.env = env
        self.current_state = None  # going to be in default pddlgym representation
        self.root_object_type = pyper.Type("object",None)
        self.history = [] #In pyperplan format

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        self.current_state = obs
        return self.observe()


    def observe(self, source='pypertask'):
        if source == 'pyperpred':
            return self._obs_pddlgym_to_pyper(self.current_state)
        if source == 'pddlgym':
            return self.current_state
        if source == 'pypertask':
            return self._obs_pddlgym_to_pypertask(self.current_state)
        raise ValueError("source must be either 'pyper' or 'pddlgym'")



    def objects(self):
        """
        Returns all the objects in the current state
        """
        objects = {}
        objs = self.current_state[0].objects
        for o in set(objs):
            key = o.name
            type_name = str(o.var_type)
            value = pyper.Type(type_name, self.root_object_type)
            objects.update({key : value})
        return objects



    def step(self,action):
        action_pddlgym = self._action_pyper_to_pddlgym(action)
        next_state = self.env.step(action_pddlgym)
        self.history.append(action)
        self.current_state = next_state

        return self.observe()

    def get_history(self):
        return self.history

    def _action_pyper_to_pddlgym(self, action):
        """
        (move p3 k1) --> pddlgym representation
        """
        name = action.replace("(","").replace(")","").split(" ")[0]
        objs = set(self.current_state[0].objects)
        args = action.replace("(","").replace(")","").split(" ")[1:]
        arity = len(args)
        typed_args = [ (lambda x: [x for x in objs if x == y][0])(y) for y in args]
        action_pddlgym = pgym.Literal(pgym.Predicate(name, arity), typed_args)
        return action_pddlgym


    def _obs_pddlgym_to_pyper(self, obs):
        predicates = []
        for item in set(obs[0].literals):
            name = str(item).split("(")[0]
            list_args_str = str(item).split("(")[1].split(")")[0].split(",")
            signature = [(x.split(":")[0], [pyper.Type(x.split(":")[1], self.root_object_type)]) for x in list_args_str]
            pred = pyper.Predicate(name, signature)
            predicates.append(pred)
        return predicates

    
    
    def _obs_pddlgym_to_pypertask(self, obs):
        literals = []
        for item in set(obs[0].literals):
            name = str(item).split("(")[0]
            list_args_str = str(item).split("(")[1].split(")")[0].split(",")
            new_args = [x.split(":")[0] for x in list_args_str]
            new_lit = "("+name+" "+" ".join(new_args)+")"
            literals.append(new_lit)
        return frozenset(literals)

    
        # DEPRECATED
    def __get_type(self, constant):
        # look at current state 
        obj_strs = [str(x) for x in set(self.current_state[0].objects)]
        obj_types = [x.split(":")[1] for x in obj_strs if constant+":" in x]
        if len(obj_types) == 1:
            return obj_types[0]
        raise ValueError
    

In [52]:
import pddlgym

env = pddlgym.make('PDDLEnvTreasure-v0')
env = PyperWrapper(env)
obs = env.reset()
print(f"Init: {obs}\n")

print(f"OBJECTS = {env.objects()}")

action = '(move r1 b3)'
obs= env.step(action)
action = '(move b3 q4)'
obs= env.step(action)
print(f"Final: {obs}\n")

obs = env.observe()
print(f"Observe: {obs}\n")

obs = env.observe(source="pddlgym")
print(f"Observe (pddlgym): {obs}\n")

obs = env.observe(source="pypertask")
print(f"Observe (task): {obs}\n")

obs = env.observe(source="pyperpred")
print(f"Observe (pred): {obs}\n")

objs = env.objects()
print(f"Objects: {objs}")

h = env.get_history()
print(f"History: {h}")


Init: frozenset({'(inworld b2)', '(inworld r2)', '(inworld r1)', '(inworld p1)', '(at r1)', '(inworld p5)', '(in s1 b1)', '(in s2 b2)', '(inworld p3)', '(inworld b3)', '(inworld q1)', '(inworld q3)', '(inworld p2)', '(inworld q5)', '(inworld q2)', '(inworld b1)', '(inworld q4)', '(inworld p4)'})

OBJECTS = {'q4': q, 'p2': p, 'b3': b, 'p1': p, 'b1': b, 'q3': q, 'p3': p, 'q2': q, 'r1': r, 'p4': p, 's2': s, 'q5': q, 'b2': b, 's1': s, 'q1': q, 'r2': r, 'p5': p}
Final: frozenset({'(inworld b2)', '(inworld r2)', '(inworld r1)', '(inworld p1)', '(in s1 b1)', '(inworld p5)', '(inworld b3)', '(in s2 b2)', '(inworld p3)', '(at q4)', '(inworld q1)', '(inworld q3)', '(inworld p2)', '(inworld q5)', '(inworld q2)', '(inworld b1)', '(inworld q4)', '(inworld p4)'})

Observe: frozenset({'(inworld b2)', '(inworld r2)', '(inworld r1)', '(inworld p1)', '(in s1 b1)', '(inworld p5)', '(inworld b3)', '(in s2 b2)', '(inworld p3)', '(at q4)', '(inworld q1)', '(inworld q3)', '(inworld p2)', '(inworld q5)', '(in

# Checking predicates in states

In [146]:
from pyperplan.pddl.pddl import Predicate, Type

root_object_type = Type("object",None)
def _parse_predicate(pred_str):
        """
        Assume a predicate string
        "(have ?x-t)" or "(have p1-p)" or "(on a-block b-block) or (inworld p2)"
        """
        name = pred_str.replace("(","").replace(")","").split(" ")[0]
        args = pred_str.replace("(","").replace(")","").split(" ")[1:]
        signature = []
        grounded = True
        for arg in args:
            if "-" in arg:
                sym = arg.split("-")[0]
                if "?" in sym:
                    grounded = False
                typing = arg.split("-")[1]
                signature.append((sym, [Type(typing,root_object_type)]))
            else:
                if "?" in arg:
                    raise ValueError("Must provide at least one of constant or a type")
                objs = env.objects() #CHANGE TO self.env                
                typing = objs[arg]
                signature.append((arg, [typing]))
        return Predicate(name, signature)


In [151]:
out = _parse_predicate("(inworld ?x-p)")
print(out.name)
print(out.signature)
print(str(out))

inworld
[('?x', [p])]
inworld[('?x', [p])]


In [163]:
import random
import uuid
import copy

def _assign_symbol(predicate): # ADD SELF
    """
    Replaces ?x with either a hypothetical or with an object from the domain 
    """
    predicate_copy = copy.deepcopy(predicate)
    # Flip objects dictionary to be keyed to type name
    objs = env.objects()  # CHANGE TO SELF.ENV
    type_keyed_objs = {}
    for k,v in objs.items():
        type_keyed_objs[v.name]= type_keyed_objs.get(v.name,[])
        type_keyed_objs[v.name].append(k)
    print(type_keyed_objs)
    
    updated_signature = []
    for term,typing in predicate_copy.signature:
        if "?" in term:
            # check if env objects have an object of type
            if typing[0].name in type_keyed_objs:
                # randomly assign
                used_term = random.choice(type_keyed_objs[typing[0].name])
                updated_signature.append((used_term, [typing[0]]))
            else:
                # hypothetical name 
                hypo_term = typing[0].name+"_"+str(uuid.uuid4())[:8]
                updated_signature.append((hypo_term, [typing[0]]))
        else:
             updated_signature.append((term, [typing[0]])) 
    predicate_copy.signature = updated_signature
    return predicate_copy
                
    
    

In [164]:
_assign_symbol(out)

{'p': ['p4', 'p3', 'p5', 'p2', 'p1'], 'r': ['r1'], 's': ['-r', 's2', 's1', 'r2'], 'b': ['b3', 'b2', 'b1'], 'q': ['q3', 'q4', 'q2', 'q1', 'q5'], 'k': ['k1', 'k2'], 't': ['t1']}


inworld[('p4', [p])]

# Alternative approach 

grounding goal from strings

In [26]:
import random

def _ground_goal( atom_str):
    """
    Given an atom (have ?x-t) or (have t23) or (on ?x-t y24-y)  or (have ?t34-t)
    Return (have t23)
    """

    objs = env.objects()  # CHANGE TO SELF.ENV
    type_keyed_objs = {}
    for k,v in objs.items():
        type_keyed_objs[v.name]= type_keyed_objs.get(v.name,[])
        type_keyed_objs[v.name].append(k)

    name = atom_str.replace("(","").replace(")","").split(" ")[0]
    args = atom_str.replace("(","").replace(")","").split(" ")[1:]
    new_args = []
    for arg in args:
        if "?" in arg:
            if "-" in arg:
                typing = arg.split("-")[1]
                if typing in type_keyed_objs:
                    # Assign random name 
                    new_sym = random.choice(type_keyed_objs[typing])
                    new_args.append(f"{new_sym}")
                else:
                    # Assign hypothetical name 
                    hypo_sym = f"{typing}_{str(uuid.uuid4())[:8]}"
                    new_args.append(f"{hypo_sym}")
            else:
                raise ValueError("Must provide at least one of constant or a type")
        else:
            if "-" in arg:
                new_arg = arg.split("-")[0]
                new_args.append(new_arg)
            else:
                new_args.append(arg)
  
    new_atom = "("+name+" "+" ".join(new_args)+")"
    return new_atom



In [35]:
_ground_goal("have ?x-p)")

'(have p4)'

In [40]:
env.observe(source="pddlgym")

(State(literals=frozenset({inworld(p1:p), inworld(q2:q), inworld(p3:p), inworld(q1:q), inworld(q4:q), inworld(r2:r), inworld(q3:q), inworld(b3:b), inworld(b1:b), in(s2:s,b2:b), inworld(b2:b), inworld(q5:q), in(s1:s,b1:b), inworld(r1:r), inworld(p2:p), inworld(p5:p), inworld(p4:p), at(q4:q)}), objects=frozenset({p1:p, q4:q, s1:s, p2:p, t1:t, k1:k, s2:s, q5:q, b3:b, k2:k, p5:p, q1:q, r2:r, p3:p, b2:b, q3:q, b1:b, r1:r, q2:q, p4:p}), goal=have(t1:t)),
 0.0,
 False,
 {'problem_file': '/home/vsarathy/.cache/pypoetry/virtualenvs/construct-VivzdgmF-py3.8/lib/python3.8/site-packages/pddlgym/pddl/treasure/problem01.pddl',
  'domain_file': '/home/vsarathy/.cache/pypoetry/virtualenvs/construct-VivzdgmF-py3.8/lib/python3.8/site-packages/pddlgym/pddl/treasure.pddl'})

In [48]:
a = {"a", "b", "c"}
" ".join(a)

'a c b'

In [55]:
strings = []
for k,v in env.objects().items():
    strings.append(f"{k} - {v}")
" ".join(strings)

'p1 - p q4 - q k2 - k p5 - p s1 - s q1 - q p2 - p t1 - t k1 - k r2 - r p3 - p b2 - b s2 - s q5 - q q3 - q b1 - b r1 - r q2 - q b3 - b p4 - p'

# Crafting trees

Each node has a type and label


In [2]:
import networkx as nx

In [8]:
tree = nx.DiGraph()
tree.add_node('k', type='tnode', value='have')
tree.add_node('p', type='tnode', value='have')
tree.add_node('q', type='tnode', value='have')
tree.add_node('t', type='tnode', value='have')
tree.add_node('s', type='tnode', value='have')
tree.add_node('craftt', type='anode')
tree.add_node('craftk', type='anode')

tree.add_edge('p','craftk')
tree.add_edge('q','craftk')
tree.add_edge('craftk','k')

tree.add_edge('k','craftt')
tree.add_edge('s','craftt')
tree.add_edge('craftt','t')

In [16]:
tree.nodes.get("k")

{'type': 'tnode', 'value': 'have'}

In [17]:
nx.write_gml(tree, "test.gml")

In [8]:
# Read a gml 

filename = "../agents/biplex/bias/crafting_knowledge.gml"
read_in_tree = nx.read_gml(filename)

node = read_in_tree.nodes.get('k')
print(read_in_tree.nodes)


['k', 'p', 'q', 't', 's', 'craftt', 'craftk']


In [13]:
list(read_in_tree.predecessors('k'))


['craftk']

In [49]:
# creating a new domain file with craft action
import uuid 

def _create_new_domain_file(graph, anode, current_domain):
    action_symbol = f"\t(:action {anode}\n"
    precon_types = list(graph.predecessors(str(anode)))
    precons = []
    effects = []
    params = []
    for p in precon_types:
        variable = "?x"+str(uuid.uuid4())[:3]
        param = f"{variable}-{p}"
        pred = f"(have {param})"
        effp = f"(not {pred})"
        precons.append(pred)
        effects.append(effp)
        params.append(param)
    
    eff_types = list(graph.successors(str(anode)))
    for e in eff_types:
        variable = "?y"+str(uuid.uuid4())[:3]
        param = f"{variable}-{e}"
        effp = f"(have {variable}-{e})"
        effects.append(effp)
        params.insert(0,param)
        
    param_line = f"\t\t:parameters ({' '.join(params)})\n"
    precon_line = f"\t\t:precondition (and {' '.join(precons)})\n"
    effects_line = f"\t\t:effect (and {' '.join(effects)}))"
    
    action_entry = action_symbol+param_line+precon_line+effects_line
    
    new_domain_filename = current_domain.split(".pddl")[0]+"_gen.pddl"
    with open(current_domain,'r') as current, open(new_domain_filename,'w') as secondfile:
        lines = current.readlines()
        for line in lines[:-1]:
            secondfile.write(line)
        secondfile.write("\n")
        secondfile.write(action_entry)
        secondfile.write("\n")
        secondfile.write(")")

current_domain = "../agents/biplex/bias/treasure.pddl"
_create_new_domain_file(read_in_tree, "craftk", current_domain)

In [25]:
import uuid 
str(uuid.uuid4())[:3]

'7d5'

In [50]:
env.objects()

NameError: name 'env' is not defined

# Generating large problems for TreasureWorld

SEE PDDLGYM notebooks folder for this

In [1]:
from pyperplan.pddl.parser import Parser

In [15]:
# Given a domain file, create a bare version (removing all craft actions)

treasure = "../domains/treasure.pddl"
parser = Parser(treasure)
domain = parser.parse_domain()

# Crafting trees for Pancake

In [1]:
import networkx as nx

In [2]:
tree = nx.DiGraph()

list_of_types = [
        'spw',
        'apw',
        'pw',
        'wl',
        'vpw',
        'ly',
        'yfl',
        'tl',
        'sl',
        'yftl',
        'syftl',
        'pwva',
        'spwva',
        'yvaftl',
        'syvaftl',
        'vaftyu',
        'svaftyu'
]

for item in list_of_types:
    tree.add_node(item, type='tnode', value='have')

list_of_actions = [
       'mixeggbatter1',
     'mixeggbatter2',
     'mixdryingredients1',
     'mixdryingredients2',
     'mixpancakebatter1',
     'mixpancakebatter2',
     'mixpancakebatter3',
     'mixpancakebatter3a',
     'mixpancakebatter4',
     'mixpancakebatter4a',
     'mixpancakebatter5',
     'mixpancakebatter5a',  
     'cookpancake1',
     'cookpancake2',
     'cookpancake3',
     'drizzlepancake1',
     'drizzlepancake2'
]

for item in list_of_actions:
    tree.add_node(item, type='anode')

# EGG BATTER
    
tree.add_edge('mixeggbatter1', 'yftl')
tree.add_edge('yfl','mixeggbatter1')
tree.add_edge('tl','mixeggbatter1')
tree.add_edge('wl','mixeggbatter1')

tree.add_edge('mixeggbatter2', 'syftl')
tree.add_edge('yfl','mixeggbatter2')
tree.add_edge('tl','mixeggbatter2')
tree.add_edge('wl','mixeggbatter2')
tree.add_edge('spw','mixeggbatter2')


# DRY INGREDIENTS

tree.add_edge('mixdryingredients1', 'pwva')
tree.add_edge('pw','mixdryingredients1')
tree.add_edge('vpw','mixdryingredients1')
tree.add_edge('apw','mixdryingredients1')

tree.add_edge('mixdryingredients2', 'spwva')
tree.add_edge('pw','mixdryingredients2')
tree.add_edge('vpw','mixdryingredients2')
tree.add_edge('apw','mixdryingredients2')
tree.add_edge('spw','mixdryingredients2')


# PANCAKE BATTER

tree.add_edge('mixpancakebatter1', 'yvaftl')
tree.add_edge('pwva', 'mixpancakebatter1')
tree.add_edge('yftl', 'mixpancakebatter1')

tree.add_edge('mixpancakebatter2', 'syvaftl')
tree.add_edge('pwva', 'mixpancakebatter2')
tree.add_edge('yftl', 'mixpancakebatter2')
tree.add_edge('spw', 'mixpancakebatter2')

tree.add_edge('mixpancakebatter3', 'syvaftl')
tree.add_edge('spwva', 'mixpancakebatter3')
tree.add_edge('yftl', 'mixpancakebatter3')

tree.add_edge('mixpancakebatter3a', 'syvaftl')
tree.add_edge('spwva', 'mixpancakebatter3a')
tree.add_edge('yftl', 'mixpancakebatter3a')
tree.add_edge('spw', 'mixpancakebatter3a')

tree.add_edge('mixpancakebatter4', 'syvaftl')
tree.add_edge('spwva', 'mixpancakebatter4')
tree.add_edge('syftl', 'mixpancakebatter4')

tree.add_edge('mixpancakebatter4a', 'syvaftl')
tree.add_edge('spwva', 'mixpancakebatter4a')
tree.add_edge('syftl', 'mixpancakebatter4a')
tree.add_edge('spw', 'mixpancakebatter4a')

tree.add_edge('mixpancakebatter5', 'syvaftl')
tree.add_edge('pwva', 'mixpancakebatter5')
tree.add_edge('syftl', 'mixpancakebatter5')

tree.add_edge('mixpancakebatter5a', 'syvaftl')
tree.add_edge('pwva', 'mixpancakebatter5a')
tree.add_edge('syftl', 'mixpancakebatter5a')
tree.add_edge('spw', 'mixpancakebatter5a')

tree.add_edge('cookpancake1', 'vaftyu')
tree.add_edge('yvaftl', 'cookpancake1')
tree.add_edge('ly', 'cookpancake1')

tree.add_edge('cookpancake2', 'svaftyu')
tree.add_edge('syvaftl', 'cookpancake2')
tree.add_edge('ly', 'cookpancake2')

tree.add_edge('cookpancake3', 'svaftyu')
tree.add_edge('yvaftl', 'cookpancake3')
tree.add_edge('ly', 'cookpancake3')
tree.add_edge('spw', 'cookpancake3')

tree.add_edge('drizzlepancake1', 'svaftyu')
tree.add_edge('vaftyu', 'drizzlepancake1')
tree.add_edge('sl', 'drizzlepancake1')

tree.add_edge('drizzlepancake1', 'svaftyu')
tree.add_edge('svaftyu', 'drizzlepancake1')
tree.add_edge('sl', 'drizzlepancake1')


In [3]:
nx.write_gml(tree, "test.gml")