# minimal

In [4]:
import random
import torch
import torch.nn as nn
import torch.nn.functional as F

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [5]:
default_maze = torch.tensor([
    [1, 0, 0, 0, 0],
    [1, 1, 1, 1, 1],
    [0, 1, 0, 1, 0],
    [1, 1, 0, 0, 0],
    [1, 1, 1, 1, -1],
])
MAZE_WIDTH = default_maze.shape[0]
INPUT_SIZE = MAZE_WIDTH * MAZE_WIDTH + 2 * MAZE_WIDTH
MOVES = {
    (-1, 0): torch.tensor(0).to(device), # up
    (1, 0):  torch.tensor(1).to(device), # down
    (0, -1): torch.tensor(2).to(device), # left
    (0, 1):  torch.tensor(3).to(device),  # right
 }

# policy
HIT_WALL_PENALTY = -1
MOVE_PENALTY = -0.04
WIN_REWARD = 10
 
# hyperparams
BATCH_SIZE = 128
EPOCH = 1000
LEARNING_RATE = 1e-3

def get_maze():
    maze = default_maze
    rewards = torch.zeros_like(maze)
    rewards[maze == 0] = HIT_WALL_PENALTY
    rewards[maze == 1] = MOVE_PENALTY
    rewards[maze == -1] = WIN_REWARD
    return maze, rewards

def get_reward(rewards, pos):
    x, y = pos
    a, b = rewards.shape
    if 0 <= x < a and 0 <= y < b:
        return rewards[x, y]
    return HIT_WALL_PENALTY

def get_next_pos(maze, rewards, pos):
    new_pos = pos # default to bouncing off a wall.
    reward = HIT_WALL_PENALTY # default to hitting a wall.
    move = random.choice(list(MOVES.keys()))
    x, y = pos
    a, b = maze.shape
    i, j = move
    if 0 <= x + i < a and 0 <= y + j < b:
        new_pos = (x + i, y + j)
        reward = get_reward(rewards, new_pos)
    return new_pos, reward, move

def get_batch():
    batch = []
    maze, rewards = get_maze()
    positions = random.choices((maze == 1).nonzero().tolist(), k=BATCH_SIZE)
    for pos in positions:
        new_pos, reward, move = get_next_pos(maze, rewards, pos)
        batch.append((pos, move, new_pos, reward))
    return maze, batch

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(INPUT_SIZE, INPUT_SIZE),
            nn.LayerNorm(INPUT_SIZE),
            nn.ReLU(),
            nn.Linear(INPUT_SIZE, INPUT_SIZE),
            nn.ReLU(),
            nn.Linear(INPUT_SIZE, INPUT_SIZE),
            nn.ReLU(),
            nn.Linear(INPUT_SIZE, len(MOVES)),
        )

    def forward(self, x):
        logits = self.linear_relu_stack(x)        
        return logits

def to_input(maze, pos):
    return torch.cat((
        maze.view(-1),
        F.one_hot(torch.tensor(pos), num_classes=MAZE_WIDTH).view(-1),
    )).float().to(device)

def train(model):
    optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
    losses = []
    for epoch in range(EPOCH):
        maze, batch = get_batch()
        # train non-vectorized
        # --------------------
        lefts, rights = [], []
        for pos, move, new_pos, reward in batch:
            qs = model(to_input(maze, pos))
            hot = F.one_hot(MOVES[move], num_classes=len(MOVES))
            bellman_left = (qs * hot).sum()

            new_q = model(to_input(maze, new_pos)).max()
            bellman_right = reward + new_q

            lefts.append(bellman_left)
            rights.append(bellman_right)
        
        bellman_left = torch.stack(lefts).to(device)
        bellman_right = torch.stack(rights).to(device)
        loss = F.mse_loss(bellman_left, bellman_right)
        losses.append(loss.item())
        if epoch % 50 == 0:
            print(f"epoch: {epoch: 5} loss: {torch.tensor(losses).mean():.8f}")
            losses = []
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

model = NeuralNetwork().to(device)
train(model)

epoch:     0 loss: 2.89412498
epoch:    50 loss: 2.29298115
epoch:   100 loss: 1.41007268
epoch:   150 loss: 0.77211845
epoch:   200 loss: 0.40292484
epoch:   250 loss: 0.25913852
epoch:   300 loss: 0.08322355
epoch:   350 loss: 0.01881363
epoch:   400 loss: 0.00497699
epoch:   450 loss: 0.00206507
epoch:   500 loss: 0.00091505
epoch:   550 loss: 0.00038059
epoch:   600 loss: 0.00015789
epoch:   650 loss: 0.00007466
epoch:   700 loss: 0.00003241
epoch:   750 loss: 0.00001914
epoch:   800 loss: 0.00001020
epoch:   850 loss: 0.00000523
epoch:   900 loss: 0.00000313
epoch:   950 loss: 0.00000185


In [6]:
i2move = {i.detach().item(): v for v, i in MOVES.items()}

def play(model, maze, pos=(0, 0)):
    print(maze)
    depth = 100
    while True:
        qs = model(to_input(maze, pos))
        move = i2move[qs.argmax().tolist()]
        new_pos = (pos[0] + move[0], pos[1] + move[1])
        print(f'chose {move} from {pos} to {new_pos}')
        if 0 <= new_pos[0] < MAZE_WIDTH and 0 <= new_pos[1] < MAZE_WIDTH:
            pos = new_pos
            if maze[pos] == -1:
                print("WIN")
                break
            elif maze[pos] == 0:
                print("LOSE: HIT WALL")
                break
        else:
            print("LOSE: OUTSIDE MAZE")
            break
        depth -= 1
        if depth == 0:
            print("LOSE: TOO DEEP")
            break

play(model, default_maze, pos=(0, 0))

tensor([[ 1,  0,  0,  0,  0],
        [ 1,  1,  1,  1,  1],
        [ 0,  1,  0,  1,  0],
        [ 1,  1,  0,  0,  0],
        [ 1,  1,  1,  1, -1]])
chose (1, 0) from (0, 0) to (1, 0)
chose (0, 1) from (1, 0) to (1, 1)
chose (1, 0) from (1, 1) to (2, 1)
chose (1, 0) from (2, 1) to (3, 1)
chose (1, 0) from (3, 1) to (4, 1)
chose (0, 1) from (4, 1) to (4, 2)
chose (0, 1) from (4, 2) to (4, 3)
chose (0, 1) from (4, 3) to (4, 4)
WIN


In [7]:
# train(model)

In [8]:
# backup to disk
# torch.save(model.state_dict(), 'default-maze.pt')