# Implementing simple pathfinding into random actor

In [None]:
import time
import gym
import nle

import numpy as np
import matplotlib.pyplot as plt

In [None]:
del gym.Wrapper.__getattr__

A wrpper that keeps track of the action history

In [None]:
from collections import deque


class RecentHistory(gym.Wrapper):
    """The base interaction architecture is essentially a middleman, who passes
    the action to the underlying env and intercepts the resulting transition
    data. It also is allowed, but not obliged to interact with the env, while
    intercepting the observations.
    """
    def __init__(self, env, *, n_recent=0):
        super().__init__(env)
        self.recent = deque([], n_recent)
    
    def reset(self, seed=None):
        return self.env.reset()

    def step(self, action):
        self.recent.append(action)
        return self.env.step(action)

We hide the NLE under several layers of wrappers. From the core to the shell:
1. `ReplayToFile` saves the seeds and the takes actions into a file for later inspection and replay.
2. `NLEAtoN` maps ascii actions to opaque actions accpeted by the NLE.
3. `NLEPatches` patches tty-screens, botched by the cr-lf misconfiguration of the NLE's tty term emulator and NetHacks displays (lf only).
4. `NLEFeatures` adds extra features generated on-the-fly from the current NLE's observation. 

In [None]:
from nle_toolbox.utils.replay import ReplayToFile, Replay
from nle_toolbox.utils.env.wrappers import NLEPatches, NLEAtoN, NLEFeatures

def gui_factory(seed=None):
    env = NLEPatches(
        ReplayToFile(
            gym.make('NetHackChallenge-v0'),
            save_on='done',
            sticky=True,
            folder='./replays',
        ),
    )

    env.seed(seed)

    return NLEAtoN(env)
    

We start with implementing a simple command evaluator.

In [None]:
def gui_run(
    env,
    *commands
):
    pipe0 = deque([])
    obs, done = env.reset(), False
    for cmd in commands:
        pipe0.extend(cmd)
        while pipe0 and not done:
            obs, rew, done, info = env.step(pipe0.popleft())

        yield obs
        if done:
            break

A simple proc to overlay a play atop the tty characters.

In [None]:
def overlay_plan(
    plan,
    *,
    tty_chars,
    tty_colors,
    **ignore,
):
#     import pdb ; pdb.set_trace()
    # draw magenta X-s at the specified coords
    for r, c in plan:
        tty_colors[r+1, c] = 128 + 13  # 0x8D -- bright magenta
        tty_chars[r+1, c] = 88  # ord('X')

A renderer

In [None]:
import pprint as pp

from time import sleep
from nle_toolbox.utils.env.render import render as tty_render

def ipynb_render(obs, clear=True, fps=None, plan=None):
    if fps is None:
        return True

    from IPython.display import clear_output
    if clear:
        clear_output(wait=True)

    if plan:
        overlay_plan(plan, **obs)

    print(tty_render(**obs))

    if fps > 0:
        sleep(fps)

    return True

Below is a wrapper, which handles menus (unless an interaction is required) and
fetches all consecutive messages.

In [None]:
from nle_toolbox.bot.chassis import Chassis, ActionMasker

Let's test it in bulk.

In [None]:
# seed = None
# seed = 12513325507677477210, 18325590921330101247  # multi
# seed = 1251332550767747710, 18325590921330101247  # single
# seed = 125133255076774710, 18325590921330101247  # single
# seed = 13765371332493407478, 12246923801353953927
# seed = 12301533412141513004, 11519511065143048485
# seed = 1632082041122464284, 11609152793318129379
seed = 12604736832047991440, 12469632217503715839  # an aspirant
# seed = 5009195464289726085, 12625175316870653325
# seed = 8962210393456991721, 8431607288866012881

with Chassis(RecentHistory(gui_factory(seed), n_recent=None), split=False) as env:
    for obs in gui_run(
        env,
        ';j:',         # a paragraph about a cat
        'acy',         # break a wand "of slow" and blow up
        '\033Zbyyy,',  # cast a sleep ray at a newt and pick up its corpse
#         # investigating illegal messages
#         # chassis does not fetch messages in GLANCE and JUMP actions, because
#         #  the NLE does not report them, and chassis does not check the `tty_chars` topl.
#         '\033bbhh', '\033;', 'u', 'll', '\033ê', 'u',
    ):
        pp.pprint(
            (
                env.messages, env.prompt, obs['tty_chars'][0].view('S80')[0].strip(),
                env.in_getlin, env.in_menu, env.in_yn_function, env.xwaitingforspace,
            )
        )

        ipynb_render(obs, clear=False, fps=0.01)  # dump(env.env, obs[0])

<br>

     y  k  u  
      \ | /   
    h - . - l 
      / | \   
     b  j  n  

Dijkstra algorithm for shortest paths on +ve weighted graphs (here the graph is the regular mesh).

In [None]:
from math import isfinite
from collections import namedtuple

from heapq import heappop, heappush
from collections import defaultdict

dir_to_ascii = {
    # diagonal
    (-1, -1): 'y', (-1, +1): 'u',
    (+1, -1): 'b', (+1, +1): 'n',

    # parallel
    (-1,  0): 'k', (+1,  0): 'j',
    ( 0, -1): 'h', ( 0, +1): 'l',
}

DijNode = namedtuple('DijNode', 'v,p')

# find shortest paths to all accessible tiles (determined by `cost`)
def dij(cost, /, source):
    rows, cols = cost.shape

    # we leverage -ve indexing to avoid adding two borders
    value = np.full((rows + 1, cols + 1), np.inf)
    weight = value.copy()
    weight[:-1, :-1] = cost
    paths = np.full((*value.shape, 2), -1, dtype=int)

    # init start
    value[source] = 0.

    # run dijkstra
    frontier = [DijNode(0., source)]
    while frontier:
        current = heappop(frontier)

        # no need to re-inspect stale heap records
        v = value[current.p]
        if v < current.v:
            continue

        r, c = current.p
        for dr, dc in dir_to_ascii:
            # checking for finite value ensures that we stay within bounds
            pos = r + dr, c + dc

            # consider tiles with finite +ve walk costs only
            tile = DijNode(v + weight[pos], pos)
            if tile.v < value[tile.p]:
                heappush(frontier, tile)
                value[tile.p] = tile.v
                paths[tile.p] = r, c

    return value[:-1, :-1], paths[:-1, :-1]

Backuping through the optimal value surface to recover the shortest path to the specified destination.

In [None]:
def backup(paths, destination):
    r, c = destination
    while r >= 0 and c >= 0:
        yield r, c
        r, c = paths[r, c]

The factory for collecting random exploration rollouts

In [None]:
# from nle_toolbox.utils import seeding
from nle_toolbox.bot.chassis import get_wrapper

def factory(seed=None, folder=None):
    # get the base env and apply tty patches
    env = NLEPatches(gym.make('NetHackChallenge-v0'))
    ctoa = {chr(a): j for j, a in enumerate(env.unwrapped._actions)}

    # setup seed runs capabilities
    if folder is None:
        env = Replay(env, sticky=True)

    else:
        env = ReplayToFile(env, sticky=True, folder=folder, save_on='done')

    env.seed(seed)

    # if not isinstance(seed, tuple):
    #     seed = seeding.generate(seed)
    # seeding.pyroot(env).set_initial_seeds(*seed, False)

    # use chassis
    env = RecentHistory(env, n_recent=32)
    env = Chassis(env, space=ctoa[' '], split=False)
    return ActionMasker(env)

Level and dungeon mapper

In [None]:
from nle.nethack import (
    NLE_BL_X,
    NLE_BL_Y,
    NLE_BL_DNUM,
    NLE_BL_DLEVEL,
    # NLE_BL_DEPTH,  # derived from DNUM and DLEVEL
    # XXX does not uniquely identify floors,
    #  c.f. [`depth`](./nle/src/dungeon.c#L1086-1084)
    DUNGEON_SHAPE,
    MAX_GLYPH,
)

from nle_toolbox.utils.env.defs import \
    glyph_is, dt_glyph_ext, ext_glyphlut

from nle_toolbox.bot.level import Level, DungeonMapper

Detemine the walkability of the observed tiles

In [None]:
from nle_toolbox.utils.env.defs import symbol, GLYPH_CMAP_OFF, glyph_group, get_group
from nle_toolbox.utils.env.defs import glyphlut, ext_glyphlut

closed_doors = get_group(symbol, GLYPH_CMAP_OFF, *[
    'S_vcdoor', 'S_hcdoor',
    'S_vcdbridge', 'S_hcdbridge',
])

open_doors = get_group(symbol, GLYPH_CMAP_OFF, *[
    'S_ndoor',
    'S_vodoor', 'S_hodoor',
    'S_vodbridge', 'S_hodbridge',
])

is_closed_door = np.isin(ext_glyphlut.id.value, np.array(list(closed_doors)))
is_actor = np.isin(ext_glyphlut.id.group, np.array(list(glyph_group.ACTORS)))
is_pet = ext_glyphlut.id.group == glyph_group.PET

is_open_door = np.isin(ext_glyphlut.id.value, np.array(list(open_doors)))
is_object = np.isin(ext_glyphlut.id.group, np.asarray(list(glyph_group.OBJECTS)))
is_walkable = ext_glyphlut.is_accessible | is_open_door | is_object

In [None]:
traps = get_group(symbol, GLYPH_CMAP_OFF, *[
    'S_arrow_trap',
    'S_dart_trap',
    'S_falling_rock_trap',
    'S_squeaky_board',
    'S_bear_trap',
    'S_land_mine',
    'S_rolling_boulder_trap',
    'S_sleeping_gas_trap',
    'S_rust_trap',
    'S_fire_trap',
    'S_pit',
    'S_spiked_pit',
    'S_hole',
    'S_trap_door',
    'S_teleportation_trap',
    'S_level_teleporter',
    'S_magic_portal',
    'S_web',
    'S_statue_trap',
    'S_magic_trap',
    'S_anti_magic_trap',
    'S_polymorph_trap',
    'S_vibrating_square',
])

is_trap = np.isin(ext_glyphlut.id.value, np.array(list(traps)))

The core of the "smart" dungeon explorer

In [None]:
from scipy.special import softmax

def crawler(obs, mask, *, dir, seed=None):
    dng = DungeonMapper()

    # own random number generator
    rng = np.random.default_rng(seed)

    # a simple state machine: linger <<-->> crawler
    state, n_linger, stack = 'linger', 16, []
    while True:
        dng.update(obs)
        pos = dng.level.trace[-1]

        if state == 'crawl':
            if stack:
                plan.pop()
                act = dir[stack.pop()]

            else:
                state, n_linger = 'linger', 16
                continue

        elif state == 'linger':
            if n_linger > 0:
                n_linger -= 1

                # if we're in LINGER state, pick a random non-forbidden action
                # XXX whelp... tilde on int8 is `two's complement`, not the `logical not`
                act = rng.choice(*np.logical_not(mask).nonzero())

            else:
                lvl = dng.level

                # we've run out linger moves, time to pick a random destination
                # and go to it
                state = 'crawl'

                # get the walkability cost
                cost = np.where(
                    # is_walkable[lvl.bg_tiles.glyph]
                    (is_walkable | is_pet)[lvl.bg_tiles.glyph]
                    , .334, np.inf)
                # XXX adjust `cost` for hard-to-pass objects?
                cost[is_trap[lvl.bg_tiles.glyph]] = 10.

                # get the shortest paths from the current position
                value, path = dij(cost, pos)

                # draw a destination, the further the better
                prob = softmax(np.where(
                    is_closed_door[lvl.bg_tiles.glyph],
                    100.,
                    np.where(
                        np.logical_and(
                            np.isfinite(value),
                            np.logical_not(
                                is_trap[lvl.bg_tiles.glyph]
                            )
                        ), value, -np.inf
                    ))
                )
                dest = divmod(rng.choice(prob.size, p=prob.flat), prob.shape[1])

                # reconstruct the path to the destination in reverse order
                plan = list(backup(path, dest))
                for (r1, c1), (r0, c0) in zip(plan, plan[1:]):
                    stack.append(dir_to_ascii[r1-r0, c1-c0])

                plan.pop()
                continue

        obs, mask = yield act

How do we want to explore?
* open closed doors
* explore tunnels

In [None]:
dng = getgeneratorlocals(gen).get('dng')
# dng.level.trace[-1]

In [None]:
plt.imshow(dng.level.bg_tiles.info.is_accessible)

Implementing the random dungeon crwaler

In [None]:
from inspect import getgeneratorlocals
from nle_toolbox.bot.genfun import is_suspended

# env = factory(seed=(8962210393456991721, 8431607288866012881))
env = factory()

cha, msk = get_wrapper(env, Chassis), get_wrapper(env, ActionMasker)

(obs, mask), fin, buffer = env.reset(), False, deque([], 8)
gen = crawler(obs, mask, dir=msk.directions)

while ipynb_render(
    obs,
    clear=True,
    fps=0.01,
    plan=getgeneratorlocals(gen).get('plan'),
) and not fin:
    # if any(b'illegal' in msg for msg in cha.messages):
    buffer.append(obs['tty_chars'].view('S80')[:, 0].copy())
    if any(b'illegal' in line for line in buffer[-1]):
        break

    # default to immediately escaping from any menu or prompt
    if cha.in_menu or cha.prompt:
        (obs, mask), rew, fin, info = env.step(msk.escape)
        continue

    # we do not protect agains `StopIteration`, since we 
    #  expect the crawler to be an infinite loop.    
    if is_suspended(gen):
        act = gen.send((obs, mask))
    else:
        act = gen.send(None)

    (obs, mask), rew, fin, info = env.step(act)

gen.close()

     y  k  u  
      \ | /   
    h - . - l 
      / | \   
     b  j  n  

In [None]:
# get the walkability cost
cost = np.where((
    is_walkable
    | is_pet
)[obs['glyphs']], 1., np.inf)
# XXX adjust `cost` for hard-to-pass objects?
cost[is_trap[obs['glyphs']]] = 10.

# get shroteste paths from the current position
bls = obs['blstats']
value, path = dij(cost, (bls[NLE_BL_Y], bls[NLE_BL_X]))

prob = softmax(np.where(
    np.logical_and(
        np.isfinite(value),
        np.logical_not(
            is_trap[obs['glyphs']]
        )
    ), value, -np.inf
))

plt.imshow(value)

<hr>

Test the algo

In [None]:
r, c = 12, 12

rng = np.random.default_rng()  #248675)

cost = -np.log(rng.random((21, 79)))
# cost = np.ones((21, 79))
cost[rng.random(cost.shape) < .5] = np.inf

value, path = dij(cost, (r, c))


# mask = is_walkable[lvl.bg_tiles.glyph] | is_walkable[lvl.stg_tiles.glyph]
mask = np.isfinite(value)
mask[r, c] = False  # mask the current position

from scipy.special import softmax

value = np.where(value > 5, 0., -np.inf)
prob = softmax(np.where(mask, value, -np.inf))

Play around with the shortes path.

In [None]:
r, c = divmod(rng.choice(prob.size, p=prob.flat, ), prob.shape[1])

displ = cost.copy()
plan = list(backup(path, (r, c)))
for ij in plan:
    displ[ij] = 10
displ[12, 12] = 11


fig, ax = plt.subplots(1, 1, dpi=300)
ax.imshow(displ)

In [None]:
commands = []
for (r1, c1), (r0, c0) in zip(plan, plan[1:]):
    commands.append(dir_to_ascii[r1-r0, c1-c0])

''.join(reversed(commands))

<br>

In [None]:
assert False

A non-illegal random action exploration.

In [None]:
from copy import deepcopy
from nle_toolbox.bot.chassis import get_wrapper


def random_explore(seed=None, n_steps=1000, *, auto=False, fps=None, copy=False):
    """A non-illegal random action explorer.
    """
    ss_pol, ss_env = np.random.SeedSequence(seed).spawn(2)

    rng, j, n_linger, pf = np.random.default_rng(ss_pol), 0, 0, None
    with factory(seed=ss_env) as env:
        # we need access to the Chassis for additional meta state variables
        cha = get_wrapper(env, Chassis)

        # ActionMasker caches the esacpe action id
        ESC = get_wrapper(env, ActionMasker).escape
        
        # setup the dungeon mapper
        dng = DungeonMapper()

        # launch the episode
        (obs, mask), fin = env.reset(), False
        while (
            ipynb_render(obs, clear=True, fps=fps)
            and not (fin or j >= n_steps)
        ):
            # though nle reuses buffers, we do not deep copy them
            #  delegating this to the downstream user instead
            yield deepcopy(obs) if copy else obs

            # default to immediately escaping from any menu or prompt
            act = ESC
            if not (cha.in_menu or cha.prompt):
                dng.update(obs)

                # if we're in LINGER state, pick a random non-forbidden action
                # XXX whelp... tilde on int8 is `two's complement`, not the `logical not`
                act = rng.choice(*np.logical_not(mask).nonzero())

            (obs, mask), rew, fin, info = env.step(act)
            j += 1

            if fin and auto:
                ipynb_render(obs, clear=True, fps=fps)
                (obs, mask), fin = env.reset(), False

Get a random episode

In [None]:
# from inspect import getgeneratorlocals
episode = random_explore(
    seed=None,
    n_steps=256,
    auto=False,
    copy=True,
    fps=0.01,
)


glyphs = [next(episode)]
# dng = getgeneratorlocals(episode).get('dng')

glyphs.extend(obs['glyphs'] for obs in episode)

In [None]:
assert False

In [None]:
from scipy.special import softmax

def dstination_prob(lvl, pos):
    r, c = pos
    dist = np.maximum(abs(lvl.bg_tiles.rc.r - r), abs(lvl.bg_tiles.rc.c - c))

    mask = is_walkable[lvl.bg_tiles.glyph] | is_walkable[lvl.stg_tiles.glyph]
    mask[r, c] = False  # mask the current position
    return softmax(np.minimum(np.where(mask, dist, -np.inf), 5))

rng = np.random.default_rng()
prob = dstination_prob(dng.level, dng.level.trace[-1])
cost = np.where(prob > 0, 1., float('inf'))

plt.imshow(prob)

In [None]:
plt.imshow(cost)

In [None]:
def backup(path, dest):
    p0 = dest
    while True:
        p0, p1 = path[p0], p0
        yield p1
        if p0 is None:
            return

#         (r0, c0), (r1, c1) = p0, p1
#         yield directions[r1-r0, c1-c0]
        

In [None]:
value, path = dij(cost, dng.level.trace[-1])

In [None]:
val = value.copy()
r, c = rng.choice(dng.level.bg_tiles.rc.flat, p=prob.flat)

fig, ax = plt.subplots(1, 1, dpi=300)
for i, j in backup(path, (r, c)):
    val[i, j] = 0.

val[r, c] = np.inf

ax.imshow(val[:, 10:40])

In [None]:
# ''.join(reversed())
list()

<br>

<br>