# 6.882 HW 1.1 Starter Code

See the problem set handout for instructions and deliverables.

###  Installing Dependencies (PDDLGym)
The main dependency that we will use in this and some future problem sets is [PDDLGym](https://github.com/tomsilver/pddlgym). PDDLGym is a package developed by us. The bad news is that it almost certainly has bugs. The good news is that we can fix those bugs quickly when you find them. If you encounter any strange behavior, please contact course staff or open an issue through Github.

**Important:** Since PDDLGym will be updating throughout the course, it is important that you _install from source_ at the beginning of each problem set, rather than via ~pip install pddlgym~. The recommended way to install from source is: `pip install --upgrade git+https://github.com/tomsilver/pddlgym`. We take care of this for you at the top of this notebook.

In [1]:
# Install dependencies (run this once ever 12 hours)
!pip install --upgrade git+https://github.com/tomsilver/pddlgym # Install most recent PDDLGym (must be from source!)

Collecting git+https://github.com/tomsilver/pddlgym
  Cloning https://github.com/tomsilver/pddlgym to /tmp/pip-req-build-kx6lzjms
  Running command git clone -q https://github.com/tomsilver/pddlgym /tmp/pip-req-build-kx6lzjms
Building wheels for collected packages: pddlgym
  Building wheel for pddlgym (setup.py) ... [?25l[?25hdone
  Created wheel for pddlgym: filename=pddlgym-0.0.2-cp36-none-any.whl size=5560759 sha256=e04067c46b06a3a97e7e2ddbd10f93440e7aa76a653b508617bf41e23b9113ac
  Stored in directory: /tmp/pip-ephem-wheel-cache-94a61uty/wheels/50/e8/3a/c26982decc88172ada05f2040d7fba539da5b1ceaff9b505a2
Successfully built pddlgym
Installing collected packages: pddlgym
Successfully installed pddlgym-0.0.2


### Key Environment Functions

In [2]:
import pddlgym
import time
import random

# Create an environment
env = pddlgym.make("SearchAndRescueLevel1-v0")
# Check the number of problems
num_problems = len(env.problems)
# Fix the environment to the first problem
env.fix_problem_index(0)
# Reset the environment to the initial state
state, debug_info = env.reset()
# Get the available actions
actions = env.get_possible_actions()
# Compute a successor state (without advancing the env)
next_state = env.get_successor_state(state, actions[0])
# Check goal
goal_satisfied = env.check_goal(next_state)
# Advance the environment
state, reward, done, info = env.step(actions[0])

### Rendering Utilities

In [3]:
import matplotlib.pyplot as plt

def display_image(img, title=None):
    """Render a figure inline
    """
    plt.figure()
    if title:
        plt.title(title)
    plt.imshow(img)
    _ = plt.axis('off')

In [4]:
from heapq import heappop, heappush

def random_plan(ctx, env, state, actions, max_steps=250):
    plan = list()
    for i in range(max_steps):
        plan.append(random.choice(actions))
    return plan

def heuristic_search(ctx, env, init_state, actions, a_star=True, heuristic_func=lambda x: 0, timeout=10):
    q = [(0, 0, 0, init_state)]
    plans = [(None, None)]
    visited = {init_state}

    start_time = time.time()

    def gen_plan(idx):
        result = []
        idx, a = plans[idx]
        while a is not None:
            result.append(a)
            idx, a = plans[idx]
        return list(reversed(result))

    ctx['expanded_states'] = 0

    while len(q) > 0:
        if time.time() - start_time > timeout:
            break
        _f, distance, plan_idx, u = heappop(q)

        ctx['expanded_states'] += 1

        if env.check_goal(u):
            return gen_plan(plan_idx)

        uu = dict(u)
        # print(uu['carrying'] if uu['carrying'] is not None else uu['person0'], uu['robot0'], uu['hospital0'], _f, _f - distance)

        for a in actions:
            v = env.get_successor_state(u, a)

            if v in visited:
                continue
            visited.add(v)

            h = heuristic_func(v)
            if a_star:
                f = distance + 1 + h
            else:
                f = h
            # print('    ', a, v, f)
            plans.append((plan_idx, a))
            heappush(q, (f, distance + 1, len(plans) - 1, v))

    return None


def l1_distance(a, b):
    return abs(a[0]-b[0]) + abs(a[1]-b[1])


def my_heuristics(state):
    state = dict(state)
    a, b, c = state['robot0'], None, state['hospital0']
    if state['carrying'] is None:
        b = state['person0']
        return l1_distance(a, b) + l1_distance(b, c) + 2
    else:
        return l1_distance(a, c) + 1

### Example Code Snippets

In [5]:
def run_plan_execution(problem_idx, algo):
    env = pddlgym.make("SearchAndRescueLevel1-v0")
    env.fix_problem_index(problem_idx)
    state, _ = env.reset()
    # NOTE: You should not render/display images when you are collecting final statistics.
    # Rendering is only included for your convenience during development/debugging.
    # display_image(env.render_from_state(state), "Initial state")

    actions = dropoff, move_down, move_left, move_right, move_up, pickup_person0 = env.get_possible_actions()

    ctx = dict()
    ctx['start_time'] = time.time()
    if algo == 'random':
        plan = random_plan(ctx, env, state, actions, max_steps=250)
    elif algo == 'a*_uniform':
        plan = heuristic_search(ctx, env, state, actions)
    elif algo == 'a*':
        plan = heuristic_search(ctx, env, state, actions, heuristic_func=my_heuristics)
    elif algo == 'best_first':
        plan = heuristic_search(ctx, env, state, actions, a_star=False, heuristic_func=my_heuristics)
    else:
        raise ValueError()
    ctx['end_time'] = time.time()

    ctx['env_steps'] = 0
    ctx['success'] = False

    for action in plan[:250]:
        # print(action)
        # Advance the state of the environment
        state, reward, done, debug_info = env.step(action)
        # NOTE: You should not render/display images when you are collecting final statistics.
        # Rendering is only included for your convenience during development/debugging.
        # display_image(env.render_from_state(state), f"Took action {action}. Goal reached? {reward == 1}")
        ctx['env_steps'] += 1
        if done:
            ctx['success'] = True
            break
    ctx['time'] = ctx['end_time'] - ctx['start_time']
    return ctx

In [6]:
from collections import defaultdict
def run_tests():
    for algo in ['random', 'a*_uniform', 'a*', 'best_first']:
        print(algo)
        stat = defaultdict(float)
        for pidx in range(num_problems):
            ctx = run_plan_execution(pidx, algo)
            for k, v in ctx.items():
                stat[k] += float(v)
        for k, v in stat.items():
            stat[k] = v / num_problems
        print(algo, stat)

In [7]:
run_tests()

random
random defaultdict(<class 'float'>, {'start_time': 1600319649.5977159, 'end_time': 1600319649.5979035, 'env_steps': 250.0, 'success': 0.0, 'time': 0.00018743276596069335})
a*_uniform
a*_uniform defaultdict(<class 'float'>, {'start_time': 1600319682.5412564, 'expanded_states': 147.5, 'end_time': 1600319685.7004638, 'env_steps': 11.8, 'success': 1.0, 'time': 3.159207856655121})
a*
a* defaultdict(<class 'float'>, {'start_time': 1600319726.3113477, 'expanded_states': 40.25, 'end_time': 1600319727.1577299, 'env_steps': 11.8, 'success': 1.0, 'time': 0.8463821887969971})
best_first
best_first defaultdict(<class 'float'>, {'start_time': 1600319740.0554764, 'expanded_states': 13.95, 'end_time': 1600319740.3433115, 'env_steps': 11.8, 'success': 1.0, 'time': 0.2878352522850037})


In [10]:
def run_get_successor_example():
    """Example demonstrating how to get successors and check goals.
    """
    env = pddlgym.make("SearchAndRescueLevel1-v0")
    env.fix_problem_index(0)
    initial_state, _ = env.reset()
    # NOTE: You should not render/display images when you are collecting final statistics.
    # Rendering is only included for your convenience during development/debugging.
    display_image(env.render_from_state(initial_state), "Initial state")
    
    actions = dropoff, move_down, move_left, move_right, move_up, pickup_person0 = env.get_possible_actions()

    print(initial_state)
    
    for action in actions:
        state = env.get_successor_state(initial_state, action)
        goal_reached = env.check_goal(state)
        # NOTE: You should not render/display images when you are collecting final statistics.
        # Rendering is only included for your convenience during development/debugging.
        display_image(env.render_from_state(state), f"Candidate action: {action}. Goal reached? {goal_reached}")