In [77]:
import torch
import torch.nn as nn
import gym
import gym_sokoban
import numpy as np
import torch.nn.functional as F
import torchvision.transforms as T
from collections import namedtuple
from itertools import count
from PIL import Image

env = gym.make('Sokoban-small-v1')
# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [78]:
class DQN(nn.Module):

    def __init__(self, h, w, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)
       

        # Number of Linear input connections depends on output of conv2d layers
        # and therefore the input image size, so compute it.
        def conv2d_size_out(size, kernel_size = 5, stride = 2):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
        linear_input_size = convw * convh * 32
        self.fc1 = nn.Linear(linear_input_size, linear_input_size)
        self.head = nn.Linear(linear_input_size, outputs)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.fc1(x.view(x.size(0), -1)))
        return self.head(x.view(x.size(0), -1))

In [79]:
resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=Image.CUBIC),T.ToTensor()])
def get_screen():
    # Returned screen requested by gym is 400x600x3, but is sometimes larger
    # such as 800x1200x3. Transpose it into torch order (CHW).
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))
    # (this doesn't require a copy)
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW)
    return resize(screen).unsqueeze(0).to(device)

In [80]:
init_screen = get_screen()
_, _, screen_height, screen_width = init_screen.shape
n_actions = env.action_space.n
policy_net = DQN(screen_height, screen_width, n_actions).to(device)

In [81]:
def test_model(episodes):
    iteration_count = 0
    reward_sum = 0
    episode_len_sum = 0
    for i_episode in range(episodes):
        # Initialize the environment and state
        env.reset()
        last_screen = get_screen()
        current_screen = get_screen()
        for t in count():
            iteration_count += 1
            current_screen = get_screen()
            # Select and perform an action
            action = policy_net(current_screen).max(1)[1].view(1, 1).item()
            _, reward, done, _ = env.step(action)
            reward_sum += reward
            if done:
                episode_len_sum += t
                break
    env.close()
    average_reward = reward_sum / iteration_count
    average_ep_len = episode_len_sum / episodes
    return average_reward, average_ep_len
    

In [82]:
snapshots = torch.load('./model/snapshots')
for snapshot in snapshots:
    print(snapshot)
for snapshot in snapshots:
    policy_net.load_state_dict(snapshots[snapshot])
    print(test_model(5))
    
    

model_snapshot_@0
model_snapshot_@10
model_snapshot_@20
model_snapshot_@30
[SOKOBAN] Retry . . .
[SOKOBAN] Retry . . .
(-0.09899999999999866, 199.0)
[SOKOBAN] Retry . . .
(-0.09799999999999866, 199.0)
[SOKOBAN] Retry . . .
(-0.09899999999999866, 199.0)
[SOKOBAN] Retry . . .
(-0.09799999999999864, 199.0)
