In [1]:
import numpy as np

import gymnasium as gym
from gymnasium.spaces import Discrete, Box, Dict
from gymnasium.utils import seeding


def calculate_manhattan_distance(current_pos, goal_pos):
    return abs(current_pos[0] - goal_pos[0]) + abs(current_pos[1] - goal_pos[1])


def find_tile_position(grid, tile):
    row, col = np.where(grid == tile)
    return row[0], col[0]


def total_manhattan_distance(current_state):
    goal_state = np.arange(1, 17).reshape(4, 4)
    total_manhattan_distance = 0
    for num in range(1, 16):
        current_pos = find_tile_position(current_state, num)
        goal_pos = find_tile_position(goal_state, num)
        total_manhattan_distance += calculate_manhattan_distance(current_pos, goal_pos)
    return total_manhattan_distance


def is_solvable(state):
    permutation = state.flatten()
    inversions = 0
    for i in range(len(permutation)):
        for j in range(i + 1, len(permutation)):
            if permutation[i] == 0 or permutation[j] == 0:
                continue
            if permutation[i] > permutation[j]:
                inversions += 1
    num_rows = state.shape[0]
    if num_rows % 2 == 0:
        empty_row = np.where(state == 0)[0][0]
        return (inversions + empty_row) % 2 == 0
    else:
        return inversions % 2 == 0


class FifteenPuzzleEnv(gym.Env):
    def __init__(self, config=None):
        self.current_steps = 0
        self.max_episode_steps = 100
        self.action_space = Discrete(4)  # 0: left, 1: up, 2: right, 3: down
        self.grid_size = 4
        self.observation_space = Box(low=0, high=self.grid_size ** 2 - 1, shape=(self.grid_size, self.grid_size),
                                     dtype=np.int32)
        self.seed()
        self.reset()
        self.spec = gym.envs.registration.EnvSpec("fp", max_episode_steps=100, reward_threshold=100)

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def reset(self, seed=None, options=None):
        self.state = np.arange(self.grid_size ** 2)
        self.np_random.shuffle(self.state)
        while not is_solvable(self.state):
            self.np_random.shuffle(self.state)
        self.current_steps = 0
        self.state = self.state.reshape((self.grid_size, self.grid_size))
        self.zero_pos = np.argwhere(self.state == 0)[0]
        return self.state, {}

    def step(self, action):
        self.current_steps += 1
        distance_1 = total_manhattan_distance(self.state)
        valid_move = self.move(action)
        d = self.is_solved() or self.current_steps >= self.max_episode_steps
        distance_2 = total_manhattan_distance(self.state)
        r = distance_1 - distance_2
        if not valid_move:
            r -= 5
        return self.state, r, d, False, {}

    def move(self, action):
        new_zero_pos = np.array(self.zero_pos)
        if action == 0:  # left
            new_zero_pos[1] -= 1
        elif action == 1:  # up
            new_zero_pos[0] -= 1
        elif action == 2:  # right
            new_zero_pos[1] += 1
        elif action == 3:  # down
            new_zero_pos[0] += 1

        if (0 <= new_zero_pos[0] < self.grid_size) and (0 <= new_zero_pos[1] < self.grid_size):
            self.state[self.zero_pos[0], self.zero_pos[1]], self.state[new_zero_pos[0], new_zero_pos[1]] = (
                self.state[new_zero_pos[0], new_zero_pos[1]],
                self.state[self.zero_pos[0], self.zero_pos[1]],
            )
            self.zero_pos = new_zero_pos
            return True
        else:
            return False

    def is_solved(self):
        goal_state = np.arange(1, 17).reshape((self.grid_size, self.grid_size))
        goal_state[3][3] = 0
        return np.array_equal(self.state, goal_state)


In [2]:
import gymnasium as gym
import torch, torch.nn as nn
from torch.utils.tensorboard import SummaryWriter
import tianshou as ts
from ray.tune.registry import register_env

register_env("fp", lambda config: FifteenPuzzleEnv())

ModuleNotFoundError: No module named 'tianshou'

In [17]:
lr, epoch, batch_size = 1e-3, 10, 64
train_num, test_num = 10, 100
gamma, n_step, target_freq = 0.9, 3, 320
buffer_size = 20000
eps_train, eps_test = 0.1, 0.05
step_per_epoch, step_per_collect = 10000, 10
logger = ts.utils.TensorboardLogger(SummaryWriter('log/dqn'))  # TensorBoard is supported!
# For other loggers: https://tianshou.readthedocs.io/en/master/tutorials/logger.html

In [18]:
# you can also try with SubprocVectorEnv
train_envs = ts.env.DummyVectorEnv([lambda: FifteenPuzzleEnv() for _ in range(train_num)])
test_envs = ts.env.DummyVectorEnv([lambda: FifteenPuzzleEnv() for _ in range(test_num)])

TypeError: EnvSpec.__init__() missing 1 required positional argument: 'entry_point'

In [8]:
from tianshou.utils.net.common import Net
# you can define other net by following the API:
# https://tianshou.readthedocs.io/en/master/tutorials/dqn.html#build-the-network
env = FifteenPuzzleEnv()
state_shape = env.observation_space.shape or env.observation_space.n
action_shape = env.action_space.shape or env.action_space.n
net = Net(state_shape=state_shape, action_shape=action_shape, hidden_sizes=[128, 128, 128])
optim = torch.optim.Adam(net.parameters(), lr=lr)

In [9]:
policy = ts.policy.DQNPolicy(net, optim, gamma, n_step, target_update_freq=target_freq)
train_collector = ts.data.Collector(policy, train_envs, ts.data.VectorReplayBuffer(buffer_size, train_num), exploration_noise=True)
test_collector = ts.data.Collector(policy, test_envs, exploration_noise=True)  # because DQN uses epsilon-greedy method

In [13]:
print(env.spec)

None


In [14]:
result = ts.trainer.offpolicy_trainer(
    policy, train_collector, test_collector, epoch, step_per_epoch, step_per_collect,
    test_num, batch_size, update_per_step=1 / step_per_collect,
    train_fn=lambda epoch, env_step: policy.set_eps(eps_train),
    test_fn=lambda epoch, env_step: policy.set_eps(eps_test),
    stop_fn=lambda mean_rewards: mean_rewards >= env.spec.reward_threshold,
    logger=logger)
print(f'Finished training! Use {result["duration"]}')

Epoch #1:  10%|9         | 990/10000 [00:00<00:04, 1879.65it/s, env_step=990, len=0, loss=20.486, n/ep=0, n/st=10, rew=0.00]


AttributeError: 'NoneType' object has no attribute 'reward_threshold'

In [18]:
torch.save(policy.state_dict(), 'dqn.pth')
policy.load_state_dict(torch.load('dqn.pth'))

<All keys matched successfully>

In [19]:
policy.eval()
policy.set_eps(eps_test)
collector = ts.data.Collector(policy, env, exploration_noise=True)
collector.collect(n_episode=1, render=1 / 35)

  gym.logger.warn(


{'n/ep': 1,
 'n/st': 180,
 'rews': array([180.]),
 'lens': array([180]),
 'idxs': array([0]),
 'rew': 180.0,
 'len': 180.0,
 'rew_std': 0.0,
 'len_std': 0.0}