In [1]:
# imports
import SokobanEnv
import numpy as np
import random
from collections import deque


# QTable
Let's implement a Q_table for Sokoban

In [2]:
game = SokobanEnv.SokobanEnv()
obs = game.reset()
game.render()

##########
#O       #
#        #
#  $     #
#        #
#   @    #
#        #
##########



### Write a function that maps the possible states to an ID

In [3]:
# Data is an iterable
def base_to_decimal(data, base = 6):
    decimal = 0
    power = len(data) - 1
    for digit in data:
        if digit < base:
            decimal += int(digit) * (base ** power)
            power -= 1
        else:
            print(f'Digit {digit} is bigger than base {base}: Interuption!')
            break;
    return decimal

# Decimal is an int
def decimal_to_base(decimal, base = 6):
    if decimal == 0:
        return '0'
    data = ''
    while decimal > 0:
        remainder = decimal % base
        data = str(remainder) + data
        decimal //= base
    return list(data)
    

def map_to_id(map_int):
    # Flatten the tiles
    map_base_6 = map_int.ravel()
    id = base_to_decimal(map_base_6, base = 6)
    return id

In [4]:
map_to_id(game.map_to_int(game.map))

35737982114874677137749112743971628081119804271605776626953011

Too many states!
### Let's try to refine this into relevant states for the given map. 

In [5]:
player_position = game.find_player_position()
box_positions = game.find_box_positions()
nb_achieved_targets = game.number_achieved_targets()


player_position, box_positions, nb_achieved_targets

(array([5, 4], dtype=int64),
 array([[3],
        [3]], dtype=int64),
 0)

In [6]:
def map_to_id(env):
    player_position = env.find_player_position()
    player_state = player_position[0] * env.height + player_position[1]
    
    # We will only process 1 box instances for now
    box1_position = env.find_box_positions()[:,0]
    box1_state = box1_position[0] * env.height + box1_position[1]

    return base_to_decimal([player_state, box1_state], base=env.height*env.width)

map_to_id(game)

3547

In [7]:
class Q_table:
    def __init__(self, env: SokobanEnv.SokobanEnv, alpha, gamma):
        state_space = 80*80
        self.table = np.zeros((state_space, env.action_space.n))
        self.alpha = alpha
        self.gamma = gamma
    
    def choose_action(self, env: SokobanEnv.SokobanEnv):
        map_id = map_to_id(env)
        return np.argmax(self.table[map_id])
    
    def learn(self, state, action, reward, new_state):
        self.table[state, action] += self.alpha * (reward + self.gamma * np.max(self.table[new_state]) - self.table[state, action])

We now have our Q table

In [8]:
# def experience_replay(model, batch_size, gamma, memory, obs_count, action_count, epoch_count, loss):
#     for _ in range(epoch_count):
#         batch = random.sample(memory, batch_size)      #sample a batch from the experience memory using batch_size
#         batch_vector = np.array(batch, dtype=object)   #vectorise the batch

#         obs_t = np.zeros((batch_size,) + batch_vector[0,0].shape)     #observation at t: create a numpy array of zeros with dimensions batch_size, state_count
#         obs_t_next = np.zeros((batch_size,) + batch_vector[0,3].shape) #observation at t+1: create a numpy array of zeros with dimensions batch_size, state_count
#         for i in range(len(batch_vector)):             #loop through the batch collecting the obs at t and obs at t+1
#             obs_t[i] = batch_vector[i,0]               #store the observations at t in the relevant array
#             obs_t_next[i] = batch_vector[i,3]          #store the observations at t+1 in the relevant array

#         prediction_at_t = model.predict(obs_t, verbose=0)         #Use the model to predict an action using observations at t
#         prediction_at_t_next = model.predict(obs_t_next, verbose=0)  #Use the model to predict an action using observations at t+1

#         #Create the features(X) and lables(y)
#         X = []                   #This is our feature vector, the most recent observation
#         y = []                   #This is our label calculated using the long term discounted reward
#         i = 0
#         for obs_t, action, reward, _, done in batch_vector: #get a row from our batch

#             X.append(obs_t)                          #append the most recent observation to X

#             if done:                                 #if the episode was over
#                 target = reward                      #the target value is just the reward
#             else:                                    #otherwise
#                 #the target value is the discounted optimal reward (Bellman optimality equation)
#                 #Remember we use the max action_value from the state(observation) at time t+1
#                 target = reward + gamma * np.max(prediction_at_t_next[i])

#             #now we update the action value for the original state(observation) given the action that
#             #was taken, and we use the target value as the update.
#             prediction_at_t[i,action] = target
#             #the updated action values are used as the label
#             y.append(prediction_at_t[i])    #Remember the update will be used as the label to update the ANN weights
#                                             #by backpropagating the mean squared error.

#             i += 1                          #increment i

#         h, w = batch_vector[0,0].shape
#         X_train = np.array(X).reshape(batch_size, h, w)          #reshape X
#         y_train = np.array(y)                                   #create a numpy array from y
#         hist = model.fit(X_train, y_train, epochs = 1, verbose = 0) #fit the model with X,y and epochs
#         for i in range(1):                             #loop over the epoch_count
#             loss.append(hist.history['loss'][i])                 #record the loss for analysis

#     return loss                                              #return the loss

In [68]:
rewards = []                                              #store the rewards in a list for analysis only
loss = []                                                 #store the losses in a list for analysis only
episodes = 10000                                             #We will start with 50 episodes and see how it goes
gamma = 0.99                                               #This is the discount rate
beta = 0.01                                                #This is the epsilon decay rate
batch_size = 8                                           #The batch size for solving the IID problem
memory = deque([], maxlen=250)                           #The memory replay buffer

In [10]:
BEST_ACTIONS = [2]*5+[1]+[2]+[0]*2

In [73]:
game = SokobanEnv.SokobanEnv(level=2,
                             empty_space_reward=-.1,
                             wall_reward=-1,
                             cannot_move_box_reward=0,
                             moved_box_reward=1,
                             end_of_game_reward=10)
game.max_steps = 25
q_table = Q_table(game, alpha=1e-3, gamma=gamma)

# CAUTION! Do not forget to do temperal differencing! TD(2)

In [74]:
for episode in range(episodes):
    game.reset()
    state_id = map_to_id(game)

    total_reward = 0
    # epsilon = .1
    epsilon = 1 / (1 + beta * (episode / game.action_space.n)) # Decreasing level of exploration over episodes

    i = 0

    done = False
    while not done:
        if episode%1==-1:
          # Follows the "best actions"
          if i < len(BEST_ACTIONS):
              action = BEST_ACTIONS[i]
              i += 1
          else:
              action = game.action_space.sample()

        else:
          # Epsilon-greedy policy
          rand_num = np.random.random()
          if rand_num <= epsilon:
              action = game.action_space.sample() # Random action
          else:
              action = q_table.choose_action(game)

        _, reward, done, info = game.step(action)
        new_state_id = map_to_id(game)
        total_reward += reward
        # memory.append((state_id, action, reward, new_state_id, done))

        if done:
            rewards.append(total_reward)
            print(f'episode: {episode}/{episodes}, score: {total_reward}, epsilon: {epsilon}')
        
        q_table.learn(state_id, action, reward, new_state_id)

        # if len(memory) > batch_size:
        #     loss = experience_replay(model, batch_size, gamma, memory, obs_count, action_count, 2, loss)

        state_id = new_state_id

    rewards.append(total_reward)

episode: 0/10000, score: -8.799999999999997, epsilon: 1.0
episode: 1/10000, score: -10.6, epsilon: 0.9975062344139651
episode: 2/10000, score: -3.400000000000001, epsilon: 0.9950248756218907
episode: 3/10000, score: -10.599999999999994, epsilon: 0.9925558312655086
episode: 4/10000, score: -10.599999999999994, epsilon: 0.9900990099009901
episode: 5/10000, score: -14.199999999999998, epsilon: 0.9876543209876544
episode: 6/10000, score: -7.899999999999999, epsilon: 0.9852216748768474
episode: 7/10000, score: -6.999999999999997, epsilon: 0.9828009828009827
episode: 8/10000, score: -6.099999999999999, epsilon: 0.9803921568627451
episode: 9/10000, score: -12.399999999999995, epsilon: 0.9779951100244499
episode: 10/10000, score: -9.699999999999996, epsilon: 0.9756097560975611
episode: 11/10000, score: -7.6999999999999975, epsilon: 0.97323600973236
episode: 12/10000, score: -11.499999999999996, epsilon: 0.970873786407767
episode: 13/10000, score: -6.9999999999999964, epsilon: 0.968523002421307

In [75]:
test_episodes = 1
all_rewards = []
for episode in range(test_episodes):
    game.reset()
    total_reward = 0
    done = False

    while not done:
        action = q_table.choose_action(game)
        _, reward, done, _ = game.step(action)
        total_reward += reward

        if done:
            all_rewards.append(total_reward)
            print(f'episode: {episode}/{test_episodes}, Total Reward: {total_reward}')

episode: 0/1, Total Reward: 14.2


In [76]:
game.play_memory(framerate=4)

Frame 15/15
##########
#!   #   #
#@   #   #
#    #   #
#        #
#    #   #
#    #   #
##########

Replay over!


# Draft

In [21]:
game.reset()
game.step(0)
game.step(0)
for _ in range(6):
    game.step(2)
game.render()

##########
#O       #
#        #
#$@      #
#        #
#        #
#        #
##########



In [19]:
q_table.table[map_to_id(game)]

array([-0.00247882, -0.00257652, -0.00598994, -0.00248065])

In [51]:
s_id = map_to_id(game)
print(q_table.table[s_id])
q_table.choose_action(game)

[-0.52491117 -0.52482709 -0.52481062 -0.52549827]


2