# Grid World

###### https://towardsdatascience.com/reinforcement-learning-markov-decision-process-part-2-96837c936ec3

In [217]:
import matplotlib.pyplot as plt
from random import gauss, uniform, randint, random, choice
import math
import pprint
import matplotlib.pyplot as plt
import seaborn as sns; 

In [218]:
class GridWorld:
    
    def __init__(self, grid_size, A, B, A_prime, B_prime, A_goal, B_goal, gamma):
        self.grid_size = grid_size
        self.A = A
        self.B = B
        self.A_prime = A_prime
        self.B_prime = B_prime
        self.A_goal = A_goal
        self.B_goal = B_goal
        self.gamma = gamma
        self.terminal = []
        self.grid_world = [[0.0 for j in range(self.grid_size)] for i in range(self.grid_size)]
        self.directions = { 'east': (0, 1), 'west': (0, -1), 'north': (-1, 0), 'south': (1, 0)}
        self.policy = {}
        self.policy_values = {}
        self.reward = 0
        self.returns = []
        self.alpha = 0
        self.beta = 0
        self.probability = 1 / len(self.directions)
        self.explored = set()
        self.start = None
        self.actions = {}
        
    def set_grid_size(self, grid_size):
        self.grid_size = grid_size
        
    def get_grid_size(self):
        return self.grid_size
    
    def set_A(self, A):
        self.A = A
        
    def get_A(self):
        return self.A
    
    def set_A_prime(self, A_prime):
        self.A_prime = A_prime
        
    def get_A_prime(self):
        return self.A_prime
    
    def set_B(self, B):
        self.B = B
        
    def get_B(self):
        return self.B
    
    def set_B_prime(self, B_prime):
        self.B_prime = B_prime
        
    def get_B_prime(self):
        return self.B_prime
   
    def set_A_goal(self, goal):
        self.A_goal = goal
        
    def get_A_goal(self):
        return self.A_goal
    
    def set_B_goal(self, goal):
        self.B_goal = goal
        
    def get_B_goal(self):
        return self.B_goal
    
    def set_gamma(self, gamma):
        self.gamma = gamma
    
    def Gamma(self):
        return self.gamma
    
    def set_terminal(self, terminal):
        self.terminal = terminal
    
    def get_terminal(self):
        return self.terminal
    
    def set_policy(self, policy):
        self.policy = policy
        
    def get_policy(self):
        return self.policy
    
    def set_policy_values(self, policy_values):
        self.policy_values = policy_values
        
    def get_policy_values(self):
        return self.policy_values
    
    def set_world(self, grid_world):
        self.grid_world = grid_world
    
    def get_world(self):
        return self.grid_world
    
    def set_returns(self, returns):
        self.returns += returns
        
    def get_returns(self):
        return self.returns
    
    def set_alpha(self, alpha):
        self.alpha = alpha
        
    def get_alpha(self):
        return self.alpha
    
    def set_beta(self, beta):
        self.beta = beta
        
    def get_beta(self):
        return self.beta
    
    def set_probability(self, probability):
        self.probability = probability
    
    def get_probability(self):
        return self.probability
    
    def initial_state(self):
        return [[0.0 for j in range(self.grid_size)] for i in range(self.grid_size)]
    
    def get_directions(self):
        return self.directions
    
    def greedy_vk(self, state, actions):
        
        # store values to find argmax a
        values = []
        
        # moves
        moves = []
        
        # check neighbors in current state
        for action in range(len(actions)):
            
            # get values for p(s',r|s,a)
            s_prime, reward = self.move(state, actions[action])
            
            # compute q_pi(s,a)
            value = reward + self.Gamma() * self.get_world()[s_prime[0]][s_prime[1]]
            
            # add to values
            values.append(value)
            
            # add action to moves 
            moves.append(actions[action])
        
        # return argmax a
        return moves[values.index(max(values))]

    def move(self, state, action):
        """
        State-Value Function Returns: State s', Reward r: from taking Action a in State s 
        q(s,a) in the Bellman Equation
        """
        
        # get the +10 reward for A -> A' 
        if self.A == state:
            return self.A_prime, self.A_goal
        
        # get the +5 reward for B -> B' 
        elif self.B == state:
            return self.B_prime, self.B_goal
        
        # get the next state
        state_prime = [state[0] + action[0], state[1] + action[1]]
        
        # row boundary
        row_boundary = (state_prime[0] < 0 or state_prime[0] >= self.grid_size) 
        
        # column boundary
        column_boundary = (state_prime[1] < 0 or state_prime[1] >= self.grid_size) 
        
        # check boundaries
        if row_boundary or column_boundary:
            self.reward = -1.0
            state_prime = state
        
        # no reward in this state
        else:
            self.reward = 0
            
        return state_prime, self.reward
    
    

In [219]:
def bellman(grid_world):
    """
    Policy Iteration
    """
    
    # check if bounded
    bounded = False
    
    # set parameter vector of target policy
    theta = 0.01
    
    # V(s) for all s of S
    V = grid_world.initial_state()
    
    # iterate delta < theta 
    while not bounded:
        
        # set delta 
        delta = 0
        
        # set width for rows, cols
        width = len(V)
        
        # set length iterate grid
        length = width * width
        
        # iterate grid states
        for i in range(length):
            
            # create row 
            row = i // width 
            
            # create column
            column = i % width
            
            # v <-- V(s)
            v = V[row][column]
            
            # value for V(s)
            value = 0
            
            # test all actions
            for action in grid_world.directions.values():
                
                # get next state, reward
                s_prime, reward = grid_world.move([row, column], action)

                # computed by solving the system of linear equations BELLMAN EQUATION pi(a|s)p(s',r|s,a)[r + discount*Vpi(s)]
                value += grid_world.get_probability() * (reward + grid_world.Gamma() * V[s_prime[0]][s_prime[1]])
                
            # add value to state
            V[row][column] = value
       
            # absolute difference |v-V(S)|
            delta = max(delta, abs(v - V[row][column]))
                    
        # loop until convergence
        if delta < theta:
            bounded = True
   
        # keep track of state values
        grid_world.set_world(V.copy())
        
    # optimal policy
    return grid_world

In [220]:
def bellman_improvement(grid_world):
    """
    Policy Improvement
    """
    
    #####################
    # 1. Initialization #
    #####################
    
    # V(s) <- R for all s of S
    V = grid_world.get_world()
    
    # Pi(s) <- A(s) for all s of S
    policy = grid_world.get_policy()
    policy_values = grid_world.get_policy_values()
    
    ########################
    # 2. Policy Evaluation #
    ########################
    
    # check if bounded
    bounded = False
    
    # set parameter vector of target policy
    theta = 0.01
    
    # iterate until delta < theta 
    while not bounded:
        
        # set delta 
        delta = 0
        
        # set width for rows, cols
        width = len(V)
        
        # set length iterate grid
        length = width * width
        
        # iterate grid states
        for i in range(length):
            
            # create row 
            row = i // width 
            
            # create column
            column = i % width
            
            # v <-- V(s)
            v = V[row][column]
            
            # value for V(s)
            value = 0
            
            # store the action values for state
            actions = {}
            
            # test all actions
            for action in grid_world.directions.values():
                
                # get next state, reward
                s_prime, reward = grid_world.move([row, column], action)

                # computed by solving the system of linear equations BELLMAN EQUATION pi(a|s)p(s',r|s,a)[r + discount*Vpi(s)]
                value += (reward + grid_world.Gamma() * V[s_prime[0]][s_prime[1]])
                
                # keep track of actions and values
                actions[action] = value
                
            # add value to state
            V[row][column] = value
            
            # update policy with actions
            policy[(row, column)] = [action for action, value in actions.items()]
            
            # update policy with values
            policy_values[(row, column)] = [value for action, value in actions.items()]
            
            # set actions
            grid_world.actions = actions
       
            # absolute difference |v-V(S)|
            delta = max(delta, abs(v - V[row][column]))
                 
        # keep track of state values
        grid_world.set_world(V.copy())
        grid_world.set_policy(policy)
        grid_world.set_policy_values(policy_values)
                
        # loop until convergence
        if delta < theta:
   
        #########################
        # 3. Policy Improvement #
        #########################
            
            # policy stable <-- true
            policy_stable = True

            # set width for rows, cols
            width = len(V)

            # set length iterate grid
            length = width * width

            # iterate states of grid
            for i in range(length):

                # create row 
                row = i // width 

                # create column
                column = i % width 

                # test actions in policy
                for action in grid_world.policy[(row, column)]:

                    # old_action <-- pi(s)
                    old_action = action

                    # pi(s) <-- argmax a
                    new_action = grid_world.policy[(row, column)][grid_world.policy_values[(row, column)].index(max(grid_world.policy_values[(row, column)]))]

                    print(old_action, new_action)
                    # if old action != pi(s) policy stable <-- false 
                    if old_action != new_action:
                        policy_stable = False

            # if policy stable return v* and pi* 
            if policy_stable:
                bounded = True
                return grid_world


In [221]:
def bellman_optimal(grid_world):
    """
    Value Iteration
    """
    
    # check if bounded
    bounded = False
    
    # set parameter vector of target policy
    theta = 0.0000001
    
    # V(s) for all s of S
    V = grid_world.initial_state()
    
    # iterate delta < theta 
    while not bounded:
        
        # set delta 
        delta = 0
        
        # set width for rows, cols
        width = len(V)
        
        # set length it iterate 2d array
        length = width * width
        
        # iterate grid
        for i in range(length):
            
            # create row 
            row = i // width 
            
            # create column
            column = i % width 
            
            # store the action values for state
            actions = {}
            
            # v <-- V(s)
            v = V[row][column]
            
            # test all actions
            for action in grid_world.directions.values():
                
                # get next state, reward
                s_prime, reward = grid_world.move([row, column], action)
                
                # UPDATE USING BELLMAN EQUATION
                value = grid_world.get_probability() * (reward + grid_world.Gamma() * V[s_prime[0]][s_prime[1]])
                
                # update actions
                actions[action] = value
            
            # place the optimal value in state
            V[row][column] = max(actions.values()) 
            
            # update policy with optimal actions
            grid_world.policy[(row, column)] = [action for action, value in actions.items() if value == max(actions.values())]
            
            # update policy with optimal values
            grid_world.policy_values[(row, column)] = [value for action, value in actions.items() if value == max(actions.values())]
            
        # absolute difference |v-V(S)|
        delta = max(delta, abs(sum(list(map(sum, v))) - sum(list(map(sum, V)))))
        
        # loop until difference is less than parameter
        if  delta < theta:
            bounded = True
            
        # keep track of previous grid state values
        grid_world.set_world(V)
        
    # optimal policy
    return grid_world

In [222]:
def monte_carlo(grid_world):
    """
    Performs estimate prediction per monte carlo chapter 5.1
    First-visit MC prediction, for estimating V = v_pi
    """
    
    # grid with zeros to keep track of difference
    grid = grid_world.initial_state()
    
    # set width for rows, cols
    width = len(grid)
    
    # set runs for horizon
    runs = 2000
    
    # set time steps
    times = 1000
    
    # returns
    returns = {}
    
    # iterate until horizon 
    for i in range(runs):
        
        # generate episodes
        episodes = []
        
        # set return gets 0
        G = 0

        # test all actions
        for j in range(times, 0, -1):
            
            # create row 
            row = randint(0, width-1)

            # create column
            column = randint(0, width-1) 
        
            # set state 
            state = [row, column]
            
            # check if state exists in Returns(St)
            if tuple(state) not in returns:
                returns[tuple(state)] = []
            
            # choose a random action
            action = choice(list(grid_world.directions.values()))
            
            # get next state, reward
            s_prime, reward = grid_world.move(state, action)

            # First-visit MC prediction, for estimating V
            G = grid_world.Gamma() * G + reward

            # Unless St appears in S0, S1,...,St1:
            if state not in episodes:

                # append G to Returns(St)
                returns[tuple(state)].append(G)

                # V(s) <- average(Returns(St))
                grid[row][column] = sum(returns[tuple(state)])/len(returns[tuple(state)])

                # keep track of episodes for appending G
                episodes.append(state)

            # set world
            grid_world.set_world(grid)
        
        
    return grid

In [223]:
def monte_carlo_convergence(grid_world):
    """
    Random policy 
    """
    
    # check if bounded
    bounded = False
    
    # set parameter vector of target policy
    theta = 0.00001
    
    # returns
    returns = {}
    
    # iterate delta < theta 
    while not bounded:
        
        # set delta 
        delta = 0
        
        # grid with equal probability
        grid = grid_world.get_world()
        
        # set width for rows, cols
        width = len(grid)
        
        # set length it iterate 2d array
        length = width * width
        
        # generate episodes
        episodes = []
        
        # set return gets 0
        G = 0
        
        # iterate grid
        for i in range(length):
            
            # create row 
            row = i // width 
            
            # create column
            column = i % width 
            
            # current state
            state = [row, column]
            
            # check if state is in Returns(St)
            if tuple(state) not in returns:
                returns[tuple(state)] = []
            
            # test all actions
            for action in grid_world.directions.values():
                
                # get next state, reward
                s_prime, reward = grid_world.move(state, action)
                
                # First-visit MC prediction, for estimating V
                G = grid_world.Gamma() * G + reward

                # Unless St appears in S0, S1,...,St1:
                if state not in episodes:

                    # append G to Returns(St)
                    returns[tuple(state)].append(G)

                    # V(s) <- average(Returns(St))
                    grid[row][column] = sum(returns[tuple(state)])/len(returns[tuple(state)])

                    # keep track of episodes for appending G
                    episodes.append(state)
                
                          
        # sum of grid state values to see if bounded
        delta = max(delta, abs(sum(list(map(sum, grid_world.get_world()))) - sum(list(map(sum, grid)))))
        
        # loop until difference is less than parameter
        if  delta < theta:
            bounded = True
            
        # keep track of previous grid state values
        grid_world.set_world(grid)
        
    # optimal policy
    return grid

In [224]:
def monte_bello(grid_world):
    """
    Performs updates of grid values using a random policy
    """

    # check if bounded
    bounded = False
    
    # set parameter vector of target policy
    theta = 0.0001
    
    # iterate delta < theta 
    while not bounded:
        
        # set delta 
        delta = 0
        
        # grid with equiprobable random policy
        grid = grid_world.get_world()
    
        # create row 
        row = randint(0, 4) 

        # create column
        column = randint(0, 4) 

        # test all actions
        for action in grid_world.directions.values():

            # get next state, reward
            s_prime, reward = grid_world.move([row, column], action)

            # computed by solving the system of linear equations BELLMAN EQUATION pi(a|s)p(s',r|s,a)[r + discount*Vpi(s)]
            grid[row][column] += grid_world.get_probability() * (reward + grid_world.Gamma() * grid[s_prime[0]][s_prime[1]])

#                 print(f"action: {action} taken in state: {(row, column)} with reward: {reward} weighted by {grid_world.Gamma()**i} and move to state: {s_prime} and update state value: {grid[row][column]}")
        
        # sum of grid state values to see if bounded
        delta = max(delta, abs(sum(list(map(sum, grid_world.get_world()))) - sum(list(map(sum, grid)))))
       
        # loop until difference is less than parameter
        if  delta < theta:
            bounded = True
            
        # keep track of previous grid state values
        grid_world.set_world(grid)
        
    # optimal policy
    return grid

In [225]:
 if __name__ == '__main__':
        
        # create terminal printer instance
        pp = pprint.PrettyPrinter(width=160, compact=True)
        
        # create the grid world object
        grid_world = GridWorld(5, [0, 1], [0, 3], [4, 1], [2, 3], 10, 5, 0.9)
        
        # create the grid world object
        grid_world1 = GridWorld(5, [0, 1], [0, 3], [4, 1], [2, 3], 10, 5, 0.9)

        # initialize random algorithm
        b = bellman(grid_world)
        print("random policy")
        pp.pprint(b.get_world())
        
        # initialize improved algorithms
        b1 = bellman_improvement(grid_world1)
        print("Improved Optimal policy")
        pp.pprint(b1.get_policy())
  

random policy
[[3.362486775241769, 8.834659309360454, 4.471501905905386, 5.363176926234546, 1.533956921644943],
 [1.571000984726717, 3.035910592348995, 2.290863928710077, 1.946436976692719, 0.5859122065442595],
 [0.098722795581945, 0.7806930738022357, 0.7126059736710069, 0.39602772675179676, -0.36591308664296684],
 [-0.9260154548630697, -0.39332129185147424, -0.3157776047095224, -0.5481194156586933, -1.1462548418986562],
 [-1.8101295438052738, -1.3031313050747872, -1.1902602939304843, -1.385529460093907, -1.9384673291472465]]
(0, 1) (0, 1)
(0, -1) (0, 1)
(-1, 0) (0, 1)
(1, 0) (0, 1)
(0, 1) (0, 1)
(0, -1) (0, 1)
(-1, 0) (0, 1)
(1, 0) (0, 1)
(0, 1) (0, 1)
(0, -1) (0, 1)
(-1, 0) (0, 1)
(1, 0) (0, 1)
(0, 1) (0, 1)
(0, -1) (0, 1)
(-1, 0) (0, 1)
(1, 0) (0, 1)
(0, 1) (0, 1)
(0, -1) (0, 1)
(-1, 0) (0, 1)
(1, 0) (0, 1)
(0, 1) (0, 1)
(0, -1) (0, 1)
(-1, 0) (0, 1)
(1, 0) (0, 1)
(0, 1) (0, 1)
(0, -1) (0, 1)
(-1, 0) (0, 1)
(1, 0) (0, 1)
(0, 1) (0, 1)
(0, -1) (0, 1)
(-1, 0) (0, 1)
(1, 0) (0, 1)
(0, 

KeyboardInterrupt: 