In [1]:
import datetime
import time
import copy
import random
import sys
import numpy as np

# Problem description

This file include a toy example in RL. Given a matrix of NxM like following,

                            [0, 0, 0, 0, 0]

                            [ 5, #, 0, 0, 0]

                            [0, 0, 0, -9, 0]

                            [0, 0, #, 0, 5]

                            [0, 0, 0, 0, 0]
where each non-zero number represents a final state which is the end of game and # represent a wall which is not accessible. The user start from some point in the matrix and try to find the best policy to guide his/her move.

# Value Iteration

In [47]:
class VI:
    def __init__(self, world, walls, end_states, step_cost, \
                 start_state, rows, cols, num_end_states, num_walls, verbose = False):
        self.board = world
        self.walls = set(walls)
        self.tolerance = 0.01
        self.old_board = copy.deepcopy(self.board)
        self.start_state = start_state
        self.debug = verbose
        self.rows = rows
        self.cols = cols
        self.step_cost = step_cost
        self.original = copy.deepcopy(self.board)
        self.end_states = set(end_states)
        self.num_end_states = num_end_states
        self.num_walls = num_walls
        self.policy = [["0" for i in range(cols)] for j in range(rows)]
        self.discount_factor = 0.99
        
        if self.debug:
            print("Board = ", self.board)
            print(" rows,  cols = ",  self.rows,  self.cols)
            print("world = ", world)
            print("no of end states = ", self.num_end_states)
            print("End states  = ", self.end_states)
            print("no of end states = ", self.num_end_states)
            print("End states  = ", self.end_states)
            print("no of walls = ", self.num_walls)
            print("Walls = ", self.walls)
            print("Start state = ", self.start_state)

        self.init_world()
        self.value_iteration()
        self.get_policy()

    def is_valid(self, x, y):
        if not 0 <= x < self.rows or not 0 <= y < self.cols:
            return False
        
        return not ((x, y) in self.walls)
    
    def init_world(self):
        ''' WALLS initialsed to None '''
        for x, y in self.walls:
            self.board[x, y] = None
            self.policy[x][y] = None
        
        for x, y in self.end_states:
            self.policy[x][y] = "Bad" if self.board[x][y] < 0 else "Goal"
            
    def get_valid_pos(self, x, y):
        return (x, y) not in self.walls and (x, y) not in self.end_states
        
    def value_iteration(self):
        if self.debug:
            print("Iteration #0")
            print(self.board)
            
        iter_no = 0
        max_diff = float('inf')
        while max_diff > self.tolerance and iter_no < 50:
            for i in range(self.rows):
                for j in range(self.cols):
                    if not self.get_valid_pos(i, j):
                        continue
                        
                    self.board[i][j] = self.board_update(i, j)
            
            iter_no += 1
            max_diff = max(ele for ele in abs(self.board - self.old_board).ravel() if ele != None)
            if self.debug:
                print("Iteration #{}".format(iter_no))
                print("Current max difference: ", max_diff)
                print(self.board)
            self.old_board = copy.deepcopy(self.board)
        

    def board_update(self, x, y):
        directions = [[0, 1], [- 1, 0], [0, - 1], [1, 0]]
        neigh_utilis = [self.old_board[x + dx][y + dy] \
                        for [dx, dy] in directions \
                        if self.is_valid(x + dx, y + dy)]
        
        return max(neigh_utilis)*self.discount_factor + self.step_cost
    

    def get_policy(self):
        directions = [[0, 1], [1, 0], [0, - 1], [- 1, 0]]
        dir_dict = {idx:direct for idx, direct in enumerate(['E','S','W','N'])}
        for x in range(self.rows):
            for y in range(self.cols):
                if self.get_valid_pos(x, y):
                    neigh_utilis = [[self.old_board[x + dx][y + dy], idx] \
                                    for idx, [dx, dy] in enumerate(directions) \
                                    if self.is_valid(x + dx, y + dy)]
                    neigh_utilis.sort()
                    self.policy[x][y] = dir_dict[neigh_utilis[-1][1]]

In [48]:
# generate sample data
rows, cols = 5, 5
world = np.zeros((rows, cols))

num_end_states, num_walls = 4, 2
walls_and_end_states = np.random.choice(rows*cols - 1, num_end_states + num_walls)
walls_and_end_states = [((ele + 1) // cols, (ele + 1)  % cols) for ele in walls_and_end_states]
end_states = walls_and_end_states[:num_end_states]
walls = walls_and_end_states[num_end_states:]
for i, j in end_states:
    world[i][j] = np.random.normal(scale=500)

copy_world = copy.deepcopy(world)
    
step_cost = - 1
start_state = [0, 0]

In [50]:
vi = VI(world, walls, end_states, step_cost,
        start_state, rows, cols, num_end_states, num_walls, verbose = True)

Board =  [[   0.            0.            0.            0.            0.        ]
 [ 212.27373101    0.            0.            0.            0.        ]
 [   0.            0.            0.         -922.50502381    0.        ]
 [   0.            0.            0.            0.          155.92481697]
 [   0.            0.            0.            0.            0.        ]]
 rows,  cols =  5 5
world =  [[   0.            0.            0.            0.            0.        ]
 [ 212.27373101    0.            0.            0.            0.        ]
 [   0.            0.            0.         -922.50502381    0.        ]
 [   0.            0.            0.            0.          155.92481697]
 [   0.            0.            0.            0.            0.        ]]
no of end states =  4
End states  =  {(2, 3), (1, 0), (3, 4)}
no of end states =  4
End states  =  {(2, 3), (1, 0), (3, 4)}
no of walls =  2
Walls =  {(4, 3)}
Start state =  [0, 0]
Iteration #0
[[   0.            0.            0. 

In [51]:
vi.policy

[['S', 'W', 'W', 'W', 'W'],
 ['Goal', 'W', 'W', 'W', 'W'],
 ['N', 'N', 'N', 'Bad', 'N'],
 ['N', 'N', 'N', 'W', 'Goal'],
 ['N', 'N', 'N', None, 'N']]