In [1]:
# =========================
# Config
# =========================
DO_EXPLORE = True                      # run environment exploration → JSON
DO_DRAW = True                         # draw maps from JSONs
ENV_STEP_LIMIT = 200
ENV_PARAMS = {
    "gameName": "coin",
    "gameParams": "numLocations=11,numDistractorItems=0,includeDoors=1,limitInventorySize=0",
}
SEEDS = [0, 1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20, 21, 22]

EXPLORE_OUT_DIR = "json_output"
EXPLORE_PREFIX = "coin_links"          # base filename prefix for generated JSONs

MAP_IN_GLOB = f"{EXPLORE_OUT_DIR}/*_seed*.json"  # glob to read JSONs for plotting
MAP_OUT_DIR = "output_maps"
PDF_NAME = "coincollector_maps.pdf"

In [2]:
# =========================
# Imports
# =========================
import re, os, glob, json, math
from pathlib import Path
from collections import defaultdict, deque

import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.lines import Line2D


from textworld_express import TextWorldExpressEnv 

# =========================
# Constants for directions
# =========================
DIRS = ["north", "east", "south", "west"]
OPPOSITE = {"north": "south", "south": "north", "east": "west", "west": "east"}
DIR_TO_DXY = {"north": (0, 1), "south": (0, -1), "east": (1, 0), "west": (-1, 0)}

# =========================
# Regex parsers
# =========================
ROOM_RE = re.compile(r"You are in the ([^.]+)\.", re.I | re.S)
PASSAGE_RE = re.compile(r"To the (North|East|South|West)\s+you see the\s+([^.]+)\.", re.I)
OPEN_DOOR_LOOK_RE = re.compile(
    r"Through an open ([^,]*?) door,\s*to the (North|East|South|West)\s+you see the\s+([^.]+)\.",
    re.I,
)
CLOSED_DOOR_RE = re.compile(
    r"To the (North|East|South|West)\s+you see a closed ([^.]*?) door\.", re.I
)
REVEAL_RE = re.compile(r"revealing the ([^.]+)\.", re.I)

def parse_current_room(text: str):
    m = ROOM_RE.search(text or "")
    return m.group(1).strip() if m else None

def parse_exits_linktype(look_text: str):
    """
    Extract exits from a look/observation string.
    Returns: {dir: {"link": "passage"|"door", "room": name_or_None, "door_desc": optional}}
    """
    exits = {}
    for d, rn in PASSAGE_RE.findall(look_text or ""):
        exits[d.lower()] = {"link": "passage", "room": rn.strip()}
    for desc, d, rn in OPEN_DOOR_LOOK_RE.findall(look_text or ""):
        exits[d.lower()] = {"link": "door", "room": rn.strip(), "door_desc": (desc.strip() + " door")}
    for d, desc in CLOSED_DOOR_RE.findall(look_text or ""):
        d = d.lower()
        if d not in exits:
            exits[d] = {"link": "door", "room": None, "door_desc": (desc.strip() + " door")}
    return exits

def parse_revealed_room(text: str):
    m = REVEAL_RE.search(text or "")
    return m.group(1).strip() if m else None

def ensure_look(env, infos):
    look = infos.get("look") or infos.get("observation") or ""
    if not look:
        obs, reward, done, infos = env.step("look around")
        look = infos.get("look") or obs or ""
    return look, infos

# =========================
# Exploration: build JSON with link type (passage/door)
# =========================
def explore_env_links_only(env, seed: int):
    """
    Fully explore the environment and output {"room_info": ..., "items": ...}.
    Distinguishes "door" vs "passage" only (ignores door open/closed state).
    """
    obs, infos = env.reset(seed=seed, gameFold="train", generateGoldPath=True)
    look, infos = ensure_look(env, infos)
    start_room = parse_current_room(look) or parse_current_room(obs)
    if not start_room:
        raise RuntimeError("Failed to parse current room name.")

    room_info = defaultdict(dict)   # room_info[room][dir] = {"room": v, "link": "door"|"passage", "door_desc"?}
    items = defaultdict(list)
    visited = set()

    def record_items(look_text: str, room: str):
        if re.search(r"\bcoin\b", look_text or "", re.I):
            items[room] = ["coin"]

    def set_edge(u: str, d: str, v: str, link: str, door_desc: str = None):
        meta = {"room": v, "link": link}
        if door_desc and link == "door":
            meta["door_desc"] = door_desc
        room_info[u][d] = meta
        opp = OPPOSITE[d]
        meta2 = {"room": u, "link": link}
        if door_desc and link == "door":
            meta2["door_desc"] = door_desc
        room_info[v][opp] = meta2

    def has_action(infos, action: str) -> bool:
        acts = [a.lower() for a in infos.get("validActions", [])]
        return action.lower() in acts

    def ensure_open_then_move(dir_: str):
        nonlocal obs, infos
        if has_action(infos, f"open door to {dir_}"):
            obs, reward, done, infos = env.step(f"open door to {dir_}")
        obs, reward, done, infos = env.step(f"move {dir_}")
        return obs, infos

    def visit(room: str):
        nonlocal obs, infos
        if room in visited:
            return
        visited.add(room)

        look, infos = ensure_look(env, infos)
        record_items(look, room)
        exits = parse_exits_linktype(look)

        # known neighbors from look
        for d, meta in exits.items():
            if meta["room"]:
                set_edge(room, d, meta["room"], meta["link"], meta.get("door_desc"))

        # closed doors with unknown neighbor: open briefly to reveal neighbor name
        for d, meta in exits.items():
            if meta["link"] == "door" and meta.get("room") is None:
                if has_action(infos, f"open door to {d}"):
                    obs, reward, done, infos = env.step(f"open door to {d}")
                    nb = parse_revealed_room(obs) or parse_revealed_room(infos.get("observation", ""))
                    look2, infos = ensure_look(env, infos)
                    if not nb:
                        nb = parse_exits_linktype(look2).get(d, {}).get("room")
                    desc = meta.get("door_desc")
                    if nb:
                        set_edge(room, d, nb, "door", desc)

        # DFS to neighbors
        for d, m in list(room_info[room].items()):
            nb = m["room"]
            if nb not in visited:
                if has_action(infos, f"move {d}"):
                    ensure_open_then_move(d)
                else:
                    if has_action(infos, f"open door to {d}"):
                        obs, reward, done, infos = env.step(f"open door to {d}")
                    obs, reward, done, infos = env.step(f"move {d}")
                visit(nb)
                back = OPPOSITE[d]
                if has_action(infos, f"move {back}"):
                    obs, reward, done, infos = env.step(f"move {back}")
                else:
                    if has_action(infos, f"open door to {back}"):
                        obs, reward, done, infos = env.step(f"open door to {back}")
                    obs, reward, done, infos = env.step(f"move {back}")
                ensure_look(env, infos)

    visit(start_room)

    room_info = {r: {d: {k: v for k, v in meta.items()} for d, meta in ds.items()} for r, ds in room_info.items()}
    items = dict(items)
    return {"room_info": room_info, "items": items}

def sanitize_game_params(s: str):
    return s.replace(",", "_").replace("=", "")

def explore_and_save(env_params, seeds, out_dir, prefix):
    if TextWorldExpressEnv is None:
        raise RuntimeError("TextWorldExpressEnv not available. Fix the import and rerun.")
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    env = TextWorldExpressEnv(envStepLimit=ENV_STEP_LIMIT)
    env.load(**env_params)
    gp = sanitize_game_params(env_params.get("gameParams", ""))
    outputs = []
    for seed in seeds:
        data = explore_env_links_only(env, seed)
        fname = Path(out_dir) / f"{prefix}_{gp}_seed{seed}.json"
        with open(fname, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        outputs.append(str(fname))
    return outputs

# =========================
# Map drawing from JSON
# =========================
def standardize_room_info(room_info_raw):
    # back-compat: map door_state to link type
    room_info = {}
    for room, dmap in (room_info_raw or {}).items():
        room_info[room] = {}
        for d, meta in (dmap or {}).items():
            m = dict(meta or {})
            link = m.get("link")
            if link is None:
                ds = (m.get("door_state") or "").lower()
                if ds in ("closed",):
                    link = "door"
                elif ds in ("open", "opened"):
                    link = "passage"
                else:
                    link = "passage"
            room_info[room][d.lower()] = {
                "room": m.get("room"),
                "link": link,
                **({"door_desc": m["door_desc"]} if "door_desc" in m else {})
            }
    return room_info

def load_json(path):
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    room_info = standardize_room_info(data.get("room_info", {}))
    items = data.get("items", {})
    coin_rooms = {r for r, lst in (items or {}).items() if any(str(x).lower() == "coin" for x in lst)}
    return room_info, coin_rooms

def build_edge_types(room_info):
    edge_type = {}
    for u, dmap in (room_info or {}).items():
        for d, meta in (dmap or {}).items():
            v = meta.get("room")
            if not v:
                continue
            key = tuple(sorted((u, v)))
            typ = meta.get("link", "passage")
            if key not in edge_type:
                edge_type[key] = typ
            else:
                if typ == "door":
                    edge_type[key] = "door"
    return edge_type

def compute_positions(room_info):
    if not room_info:
        return {}
    start = "kitchen" if "kitchen" in room_info else sorted(room_info.keys())[0]
    pos = {start: (0, 0)}
    q = deque([start])
    seen = {start}
    while q:
        u = q.popleft()
        ux, uy = pos[u]
        for d, meta in (room_info.get(u) or {}).items():
            if d not in DIR_TO_DXY or not meta.get("room"):
                continue
            v = meta["room"]
            dx, dy = DIR_TO_DXY[d]
            target = (ux + dx, uy + dy)
            if v not in pos:
                pos[v] = target
            if v not in seen:
                seen.add(v); q.append(v)
    return pos

# label collision handling
CAND_OFFSETS = [
    (0.00,  0.22), (0.22, 0.00), (0.00, -0.26), (-0.24, 0.00),
    (0.18,  0.18), (-0.18, 0.18), (0.18, -0.18), (-0.18, -0.18),
]

def preferred_offset(room, pos, room_info):
    ux, uy = pos[room]
    sx = sy = 0.0
    for d, meta in (room_info.get(room) or {}).items():
        if d in DIR_TO_DXY and meta.get("room"):
            dx, dy = DIR_TO_DXY[d]
            sx += dx; sy += dy
    if sx == 0 and sy == 0:
        return CAND_OFFSETS[0]
    best, best_score = CAND_OFFSETS[0], float("inf")
    for off in CAND_OFFSETS:
        dot = off[0]*sx + off[1]*sy
        if dot < best_score:
            best, best_score = off, dot
    return best

def repel_texts(fig, ax, texts, max_iter=80, pad_px=3):
    renderer = fig.canvas.get_renderer()
    def pix_to_data(dx, dy):
        inv = ax.transData.inverted()
        x0, y0 = ax.transData.transform((0, 0))
        x1, y1 = ax.transData.transform((dx, dy))
        (dx_data, dy_data) = inv.transform((x1, y1)) - inv.transform((x0, y0))
        return dx_data, dy_data
    moved_total = {t: [0.0, 0.0] for t in texts}
    for _ in range(max_iter):
        moved = False
        bboxes = [t.get_window_extent(renderer=renderer).expanded(1.0, 1.0) for t in texts]
        for i in range(len(texts)):
            for j in range(i+1, len(texts)):
                bb1 = bboxes[i].expanded((bboxes[i].width + 2*pad_px)/bboxes[i].width,
                                         (bboxes[i].height + 2*pad_px)/bboxes[i].height)
                bb2 = bboxes[j].expanded((bboxes[j].width + 2*pad_px)/bboxes[j].width,
                                         (bboxes[j].height + 2*pad_px)/bboxes[j].height)
                if not bb1.overlaps(bb2):
                    continue
                cx1 = 0.5*(bb1.x0 + bb1.x1); cy1 = 0.5*(bb1.y0 + bb1.y1)
                cx2 = 0.5*(bb2.x0 + bb2.x1); cy2 = 0.5*(bb2.y0 + bb2.y1)
                dxp, dyp = cx2 - cx1, cy2 - cy1
                if abs(dxp) < 1e-3 and abs(dyp) < 1e-3:
                    dxp, dyp = 1.0, 0.0
                ovx = (bb1.width/2 + bb2.width/2) - abs(dxp)
                ovy = (bb1.height/2 + bb2.height/2) - abs(dyp)
                if ovx > ovy:
                    pushx, pushy = (ovx/2 + 1) * (-1 if dxp > 0 else 1), 0.0
                else:
                    pushx, pushy = 0.0, (ovy/2 + 1) * (-1 if dyp > 0 else 1)
                pdx, pdy = pix_to_data(pushx, pushy)
                t1, t2 = texts[i], texts[j]
                x1, y1 = t1.get_position(); x2, y2 = t2.get_position()
                t1.set_position((x1 + pdx, y1 + pdy))
                t2.set_position((x2 - pdx, y2 - pdy))
                moved_total[t1][0] += pdx; moved_total[t1][1] += pdy
                moved_total[t2][0] -= pdx; moved_total[t2][1] -= pdy
                moved = True
        if not moved:
            break
        fig.canvas.draw()
    return moved_total

def plot_map(ax, room_info, coin_rooms, title):
    pos = compute_positions(room_info)
    edge_type = build_edge_types(room_info)

    for (u, v), typ in edge_type.items():
        if u in pos and v in pos:
            x1, y1 = pos[u]; x2, y2 = pos[v]
            ls = "-" if typ == "passage" else (0, (5, 5))
            ax.plot([x1, x2], [y1, y2], linestyle=ls, linewidth=2)

    for room, (x, y) in pos.items():
        if room in coin_rooms:
            continue
        ax.scatter([x], [y], s=220, marker="o", zorder=3)
    for room in coin_rooms:
        x, y = pos.get(room, (None, None))
        if x is None:
            used = set(pos.values()); cx, cy = 0, 0
            while (cx, cy) in used: cx += 1
            x, y = cx, cy
            pos[room] = (x, y)
        ax.scatter([x], [y], s=300, marker="*", zorder=4)

    texts = []
    base_pos = {}
    for room, (x, y) in pos.items():
        offx, offy = preferred_offset(room, pos, room_info)
        t = ax.text(x + offx, y + offy, room + (" ★" if room in coin_rooms else ""),
                    ha="center", va="bottom", fontsize=10, zorder=5)
        texts.append(t); base_pos[t] = (x, y)
    ax.figure.canvas.draw()
    repel_texts(ax.figure, ax, texts, max_iter=100, pad_px=2)
    for t in texts:
        x, y = base_pos[t]; lx, ly = t.get_position()
        dx, dy = lx - x, ly - y
        if (dx*dx + dy*dy) ** 0.5 > 0.28:
            ax.plot([x, lx], [y, ly], linestyle=":", linewidth=1, zorder=4)

    ax.set_title(title, fontsize=12)
    ax.set_aspect("equal", adjustable="box")
    ax.grid(True, linestyle=":", linewidth=0.8)
    ax.set_xlabel("W ↔ E")
    ax.set_ylabel("S ↔ N")
    legend = [
        Line2D([0], [0], linestyle="-", linewidth=2, label="Passage"),
        Line2D([0], [0], linestyle=(0, (5, 5)), linewidth=2, label="Door"),
        Line2D([0], [0], marker="o", linestyle="None", markersize=10, label="Room"),
        Line2D([0], [0], marker="*", linestyle="None", markersize=12, label="Room with coin"),
    ]
    ax.legend(handles=legend, loc="best", frameon=True)
    ax.margins(0.2)

def draw_all(in_glob=MAP_IN_GLOB, out_dir=MAP_OUT_DIR, pdf_name=PDF_NAME):
    files = sorted(glob.glob(in_glob))
    if not files:
        raise FileNotFoundError(f"No files matched: {in_glob}")

    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    pdf_path = out_dir / pdf_name

    with PdfPages(str(pdf_path)) as pdf:
        for path in files:
            room_info, coin_rooms = load_json(path)
            base = os.path.splitext(os.path.basename(path))[0]
            seed = base.split("seed", 1)[1] if "seed" in base else base
            title = f"CoinCollector Map (seed {seed})"
            png_path = out_dir / f"{base}_map.png"

            fig, ax = plt.subplots(figsize=(8, 6))
            plot_map(ax, room_info, coin_rooms, title)
            fig.tight_layout()
            fig.savefig(png_path, dpi=160)
            plt.close(fig)

            fig, ax = plt.subplots(figsize=(8, 6))
            plot_map(ax, room_info, coin_rooms, title)
            fig.tight_layout()
            pdf.savefig(fig, dpi=160)
            plt.close(fig)

    return {"pdf": str(pdf_path), "png_dir": str(out_dir), "count": len(files)}


In [3]:
# =========================
# Run
# =========================
if DO_EXPLORE:
    Path(EXPLORE_OUT_DIR).mkdir(parents=True, exist_ok=True)
    outputs = explore_and_save(ENV_PARAMS, SEEDS, EXPLORE_OUT_DIR, EXPLORE_PREFIX)
    print("Saved JSON:", len(outputs), "files →", EXPLORE_OUT_DIR)

if DO_DRAW:
    result = draw_all(MAP_IN_GLOB, MAP_OUT_DIR, PDF_NAME)
    print("Saved PDF:", result["pdf"])
    print("Saved PNGs in:", result["png_dir"], f"({result['count']} files)")

Saved JSON: 20 files → json_output
Saved PDF: output_maps/coincollector_maps.pdf
Saved PNGs in: output_maps (20 files)
