In [1]:
import random
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from enviornment import WumpusEnv

  from pkg_resources import resource_stream, resource_exists


pygame 2.6.1 (SDL 2.28.4, Python 3.10.19)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [10]:
def state_to_tensor(state, H, W):
    """
    Single H x W integer grid flattened to length H*W.
    Codes (ordered low->high as you wanted):
      0 : pit
      1 : wumpus_up
      2 : wumpus_down
      3 : wumpus_left
      4 : wumpus_right
      5 : exit
      6 : gold
      7 : agent (no gold)
      8 : agent (has gold)
    Returns: 1D torch.float32 tensor of length H*W
    """
    grid = np.full((H, W), 2.0, dtype=np.float32)  # default = empty (use 2 to sit between wumpus and exit)

    # pits
    for pr, pc in state["pits"]:
        grid[pr, pc] = 0.0

    # wumpuses with facing preserved
    face_code = {"up": 1.0, "down": 2.0, "left": 3.0, "right": 4.0}
    for pos in state["wumpus"]:
        fr, fc = pos
        fac = state["wumpus_facing"].get(pos, "down")
        grid[fr, fc] = face_code.get(fac, 2.0)

    # exit
    er, ec = state["exit"]
    grid[er, ec] = 5.0

    # gold (if present)
    if state["gold"] is not None:
        gr, gc = state["gold"]
        grid[gr, gc] = 6.0

    ar, ac = state["agent"]
    if state.get("have_gold", False):
        grid[ar, ac] = 8.0
    else:
        grid[ar, ac] = 7.0

    return torch.from_numpy(grid.ravel()).float()  # shape (H*W,)


class PolicyNet(nn.Module):
    def __init__(self, input_dim, hidden=256, n_actions=4):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Linear(hidden, n_actions),
        )

    def forward(self, x):
        return self.net(x)


def run_episode(env, model, device=None):
    """
    Run one episode using the current stochastic policy.
    Returns: trajectory list[(state_tensor_cpu, action_int)], total_reward
    - Uses state_to_flat_grid_with_facing(state, H, W) as encoder.
    - Auto-detects device from model if device is None.
    """
    if device is None:
        device = next(model.parameters()).device

    traj = []
    state = env.get_state()
    done = state["done"]
    total = 0

    while not done:
        # encode state as flat H*W tensor (keeps wumpus facing)
        inp = state_to_tensor(state, env.h, env.w).to(device)  # (H*W,)

        with torch.no_grad():
            logits = model(inp.unsqueeze(0))  # (1,4)
            probs = torch.softmax(logits, dim=-1).squeeze(0)  # (4,)
            action = torch.multinomial(probs, num_samples=1).item()

        reward, done = env.step(action)
        total += reward

        traj.append((inp.cpu(), action))  # store CPU tensor for batching later
        state = env.get_state()

    return traj, total

In [12]:
model = PolicyNet(8 * 8)
criterion = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters(), lr=1e-3)

epochs = 10
train_epochs = 100
batch_size = 64
elite_frac = 0.3


env = WumpusEnv(w=8, h=8, n_pits=6, n_wumpus=3, seed=42, wumpus_orientation="down")

for epoch in range(epochs):
    # 1) Collect batch_size episodes using current policy
    episodes = []
    rewards = []

    for _ in range(batch_size):
        env.reset()
        traj, total = run_episode(env, model)
        episodes.append(traj)
        rewards.append(total)

    rewards = np.array(rewards)

    k = int(batch_size * elite_frac)
    elite_idx = rewards.argsort()[-k:]
    elite_trajs = [episodes[i] for i in elite_idx]

    states_list = []
    actions_list = []
    for traj in elite_trajs:
        for s, a in traj:
            states_list.append(s.numpy())
            actions_list.append(a)
    if len(states_list) == 0:
        print(f"Epoch {epoch}: no elite data, skipping")
        continue
    states = torch.tensor(np.stack(states_list), dtype=torch.float32)  # (N, input_dim)
    actions = torch.tensor(actions_list, dtype=torch.long)  # (N,)

    for i in range(train_epochs):
        opt.zero_grad()
        a_predict = model(states)  # (N, 4)
        loss = criterion(a_predict, actions)
        loss.backward()
        opt.step()
        if  i % 10 == 0:
            print(f"loss: {loss}")

    mean_r = rewards.mean()
    best_r = rewards.max()
    print(f"Epoch {epoch:03d}  meanR {mean_r:.1f}  bestR {best_r:.1f}  elite_steps {len(states_list)}")

loss: 1.3660863637924194
loss: 1.2563667297363281
loss: 1.1404964923858643
loss: 1.0120081901550293
loss: 0.9068971276283264
loss: 0.8261556029319763
loss: 0.7783100605010986
loss: 0.7341163158416748
loss: 0.7168760895729065
loss: 0.6945194602012634
Epoch 000  meanR -98.5  bestR 97.0  elite_steps 249


KeyboardInterrupt: 