# Value Iteration

An example over 1000 iterations<br><br>
![](imgs/value_iteration_1.png)
![](imgs/value_iteration_2.png)
![](imgs/value_iteration_3.png)
![](imgs/value_iteration_4.png)
![](imgs/value_iteration_5.png)
![](imgs/value_iteration_6.png)

## Install Gymnasium

In [2]:
!pip install gymnasium



## Imports

In [3]:
import os
import random
import time

import gymnasium as gym
from gymnasium import spaces
import numpy as np

## Initialize 2D environment

In [4]:
# Custom 2D GridWorld Enviroment
class GridWorld(gym.Env):
    metadata = {'render.modes': ['console']}

    # Actions available
    UP = 0
    LEFT = 1
    DOWN = 2
    RIGHT = 3

    def __init__(self, width, height, verbose=False):
        super(GridWorld, self).__init__()
        self.ACTION_NAMES = ["UP", "LEFT", "DOWN", "RIGHT"]                         # Mapping of the actions
        self.num_actions = 4

        self.width = width
        self.height = height
        self.size = width * height                                                  # Size of the grid world
        self.num_states = self.size                                                 # States are computed (width * height)
        self.num_obstacles = int((width + height) /2)                               # Number of obstacles present in our Grid
        self.end_state = np.array([height - 1, width - 1], dtype=np.uint8)          # Goal state = Bottom right cell

        self.action_space = spaces.Discrete(4)                                      # Actions space of agent : up, down, left and right
        self.observation_space = spaces.MultiDiscrete([self.height, self.width])    # Observation : Cell indices in the grid

        self.obstacles = np.zeros((height, width))                                  # Initialize the obstacles as a Grid with all zeros

        self.verbose = verbose                                                      # If we want to print out stuff

        """
        Looping for the number of obstacles, we take a random number between height and width
        and until the two numbers are not (0, 0) -> Because it's the goal, we keep on looping.
        When we've found two suitable numbers, we put 1 in the Obstacles grid at coords height/width.
        """
        for i in range(self.num_obstacles):
            obstacle = random.randrange(height) , random.randrange(width)
            while obstacle == (0, 0):
                obstacle = random.randrange(height), random.randrange(width)
            self.obstacles[obstacle] = 1

        self.num_steps = 0
        self.max_steps = height * width

        self.current_state = np.zeros((2), np.uint8) # Initial state = [0,0]

        self.directions = np.array([
            [-1, 0], # UP
            [0, -1], # LEFT
            [1, 0],  # DOWN
            [0, 1]   # RIGHT
        ])

    def transition_function(self, s, a):
        s_prime = s + self.directions[a, :]

        # Now we check if the agent is going out of the boundaries.
        #   - If I take s_prime[0], I'm working on the rows.
        #   - If I take s_prime[1], I'm working on the columns.
        if(s_prime[0] < 0 or s_prime[0] >= self.height):
            if(self.verbose):
                print("Agent is going outside of the grid. Staying in the same cell")
            return s
        if(s_prime[1] < 0 or s_prime[1] >= self.width):
            if(self.verbose):
                print("Agent is going outside of the grid. Staying in the same cell")
            return s
        
        # Check obstacles for the agent.
        # If the agent new coordinates are in the coordinates of an obstacle, exit.
        if(self.obstacles[s_prime[0], s_prime[1]] == 1):
            print("Agent is hitting an obstacle. Staying in the same cell")
            return s            
    
        return s_prime # We simply return s_prime

    def transition_probabilities(self, s, a):
        prob_next_state = np.zeros((self.height, self.width))
        s_prime = self.transition_function(s, a)

        # In a Deterministic environment, the probability of ending up in the next state will always be 1.0
        # This doesn't happen in a non-deterministic environment, which we'll cover in the lower part.
        prob_next_state[s_prime[0], s_prime[1]] = 1.0

        return prob_next_state#.flatten()

    def reward_probabilities(self):
        rewards = np.zeros((self.num_states)) # Initialize a reward array with shape (1, 15)
        i = 0

        # For all cells
        for r in range(self.height):    # Rows
            for c in range(self.width): # Columns
                state = np.array([r,c], dtype=np.uint8) # We create an array object in order to pass it to the reward function
                rewards[i] = self.reward_function(state) # This reward function will return 1 only in cell (4, 2).
                i+=1

        return rewards

    def reward_function(self,s):
        r = 0

        # We check if both elements of the arrays s and self.end_state are equal -> (4, 2)
        if(s == self.end_state).all():
            r = 1

        return r

    def termination_condition(self, s):

        truncated = self.num_steps >= self.max_steps
        terminated = False
        
        if(truncated):
            print("Maximum steps reached. Exiting.")
            terminated = True
        if(s == self.end_state).all():
            print("Agent is in the goal state. Let's end the loop.")
            terminated = True

        return terminated, truncated

    def step(self, action):
        s_prime = self.transition_function(self.current_state, action)
        reward = self.reward_function(s_prime)
        terminated, truncated = self.termination_condition(s_prime)

        self.current_state = s_prime
        self.num_steps += 1

        return self.current_state, reward, terminated, truncated, None

    def render(self):
        '''
            Render the state
        '''

        row = self.current_state[0]
        col = self.current_state[1]

        for r in range(self.height):
            for c in range(self.width):
                if r == row and c == col:
                    print("| A ", end='')
                elif r == self.end_state[0] and c == self.end_state[1]:
                    print("| G ", end='')
                else:
                    if self.obstacles[r,c] == 1:
                        print('|///', end='')
                    else:
                        print('|___', end='')
            print('|')
        print('\n')

    def reset(self):
        self.current_state = np.zeros((2), np.uint8)
        self.num_steps = 0
        return self.current_state

    def close(self):
        pass


class NonDeterministicGridWorld(GridWorld):
    def __init__(self, width, height, verbose=False, p=0.8):
        super(NonDeterministicGridWorld, self).__init__(width, height)
        self.probability_right_action = p
        self.verbose = verbose

    def transition_function(self, s, a):
        if(self.verbose):
            print("Original action to perform:", self.ACTION_NAMES[a])
        s_prime = s + self.directions[a, :]

        # With probability 1 - p, we have a diagonal movement
        # random.random() returns a number between 0 and 1
        if random.random() <= 1 - self.probability_right_action:
            if random.random() < 0.5:
                if(self.verbose):
                    print(F"Actual action performed: {self.ACTION_NAMES[a]} + {self.ACTION_NAMES[(a+1) % self.num_actions]}")
                s_prime = s_prime + self.directions[(a+1) % self.num_actions, :]
            else:
                if(self.verbose):
                    print(F"Actual action performed: {self.ACTION_NAMES[a]} + {self.ACTION_NAMES[(a-1) % self.num_actions]}")
                s_prime = s_prime + self.directions[(a-1) % self.num_actions, :]

        # Check if the agent goes out of the grid along with obstacles 
        if s_prime[0] < self.height and s_prime[1] < self.width and (s_prime >= 0).all():
            if self.obstacles[s_prime[0], s_prime[1]] == 0 :
                return s_prime

        return s

    def transition_probabilities(self, s, a):
        cells = []
        probs = []
        prob_next_state = np.zeros((self.height, self.width)) # Initialize the probabilities array with all zeros

        # We apply our new direction to our current state
        # These are our new coordinates in the GridWorld
        s_prime_right =  s + self.directions[a, :]

        # Usual check if we are hitting the walls or an obstacle
        if(self.verbose):
            print(F"Trying to go: {self.ACTION_NAMES[a]}")
        if s_prime_right[0] < self.height and s_prime_right[1] < self.width and (s_prime_right >= 0).all() and self.obstacles[s_prime_right[0], s_prime_right[1]] == 0:
                if(self.verbose):          
                    print("Success!")
                # At this point, we haven't hit a wall or an obstacle, so we say that the probability
                # of going to our new coordinates is equal to 0.8
                prob_next_state[s_prime_right[0], s_prime_right[1]] = self.probability_right_action
                cells.append(s_prime_right)
                probs.append(self.probability_right_action)
        else:
            if(self.verbose):
                print("Obstacle present.")

        if(self.verbose):
            print("--------------------")
        # If we have hit an obstacle or a wall using our original action, that's where we try
        # the action (direction) that comes just after the one we computed -> a + 1
        # One thing to notice is that we SUM the previous action with this new action
        #
        # ============================================================
        # The division by 2 in the line prob_next_state[s_prime[0], s_prime[1]] = (1 - self.probability_right_action) / 2
        # is used to evenly distribute the remaining probability among the two diagonal directions when there is a 
        # non-deterministic action. This is due to the fact that we have two alternative actions (a + 1 / a - 1) 
        # along with the original action (a)
        # ============================================================
        s_prime = s_prime_right + self.directions[(a + 1) % self.num_actions, :]

        if(self.verbose):
            print(F"Trying to go: {self.ACTION_NAMES[(a + 1) % self.num_actions]}")
        # Usual check if we are hitting the walls or an obstacle
        if s_prime[0] < self.height and s_prime[1] < self.width and (s_prime >= 0).all() and self.obstacles[s_prime[0], s_prime[1]] == 0:                
                # With our new action (original + new action), we haven't hit anything at all
                # so let's say that the probability of going to our new coordinates is equal to:
                # (1 - 0.8) / 2 = 0.1
                if(self.verbose):
                    print("Success!")
                prob_next_state[s_prime[0], s_prime[1]] = (1 - self.probability_right_action) / 2
                cells.append(s_prime.copy())
                probs.append((1 - self.probability_right_action) / 2)
        else:
            if(self.verbose):
                print("Obstacle present.")

        if(self.verbose):
            print("--------------------")
        # If we have hit an obstacle or a wall using our original action, that's where we try
        # the action (direction) that comes just before the one we computed -> a - 1
        # One thing to notice is that we SUM the previous action with this new action
        # In this specific case, though, we are still using our original "s_prime_right" action and
        # not "s_prime". So we are not using the a + 1 action, but the very original first action we computed.
        s_prime = s_prime_right + self.directions[(a - 1) % self.num_actions, :]

        if(self.verbose):
            print(F"Trying to go: {self.ACTION_NAMES[(a - 1) % self.num_actions]}")
        # Usual check if we are hitting the walls or an obstacle
        if s_prime[0] < self.height and s_prime[1] < self.width and (s_prime >= 0).all() and self.obstacles[s_prime[0], s_prime[1]] == 0:
                # With our new action (original + new action), we haven't hit anything at all
                # so let's say that the probability of going to our new coordinates is equal to:
                # (1 - 0.8) / 2 = 0.1
                if(self.verbose):
                    print("Success!")
                prob_next_state[s_prime[0], s_prime[1]] = (1 - self.probability_right_action) / 2
                cells.append(s_prime.copy())
                probs.append((1 - self.probability_right_action) / 2)
        else:
            if(self.verbose):
                print("Obstacle present.")

        if(self.verbose):
            print("--------------------")
        # Normalization
        # We sum up the probabilities and we say that the probability of staying in our original coordinates is:
        # (1 - sum of probabilities)
        sump = sum(probs) 
        prob_next_state[s[0], s[1]] = 1 - sump
        
        return prob_next_state

### Applying Value iteration
In order to apply Value Iteration, we need the **transition probabilities** and the **reward function**.<br>
In this piece of code, we simply print the probability over the next state in a Non-deterministic GridWorld

In [5]:
env = NonDeterministicGridWorld(3,5, verbose=False)
state = env.reset()
env.render()

# Now we want to print what are the probabilities of our state if we:
# - Start from state [0, 0]
# - Perform action "DOWN"
action = "DOWN"
next_state_prob = env.transition_probabilities(state, env.ACTION_NAMES.index(action)) # Get index of "action" from ACTION_NAMES list

print("Probabilities:")
print(next_state_prob)

| A |___|___|
|///|___|___|
|///|___|___|
|___|___|___|
|///|___| G |


Probabilities:
[[0.9 0.  0. ]
 [0.  0.1 0. ]
 [0.  0.  0. ]
 [0.  0.  0. ]
 [0.  0.  0. ]]


### Reward values

In [6]:
print(env.reward_probabilities())


[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]


# Value Iteration algorithm

### Pseudocode

![](imgs/value_iteration.png)

## Implementation

In [7]:
def value_iteration(env, gamma=0.99, iters=100):
    
    # Initialize values
    values = np.zeros((env.num_states))                     # We initialize an array "values" with all zeros and shape as num_states (3x5 -> 15)
    best_actions = np.zeros((env.num_states), dtype=int)    # Array for storing the best actions
    STATES = np.zeros((env.num_states, 2), dtype=np.uint8)  # We redefine the STATES as a collection of rows (15). Each row has 2 values definining that state
                                                            # STATES[0] = [0, 0]
                                                            # STATES[1] = [0, 1]
                                                            # ...
                                                            # STATES[14] = [4, 2] -> Goal
    REWARDS = env.reward_probabilities()                    # We get a REWARD array with 1 only in the last cell

    # Popolate the STATES array
    i = 0
    for r in range(env.height):
        for c in range(env.width):
            state = np.array([r, c], dtype=np.uint8)
            STATES[i] = state
            i += 1

    for i in range(iters):
        v_old = values.copy() 
        for s in range(env.num_states): # Looping through all the states i-times.
            state = STATES[s] # Take that specific state "s"

            # If we reach the termination condition, we cannot perform any action
            # Termination condition = Either goal or max_steps reached
            if (state == env.end_state).all() or i >= env.max_steps:
                continue
            
            max_va = -np.inf
            best_a = 0

            # For all actions, we see which one is the best in this specific state "s".
            for a in range(env.num_actions):
                next_state_prob = env.transition_probabilities(state, a).flatten() # T(s, a, s')

                # Important thing to notice.
                # We are working directly on ALL the array, not one by one.
                # We could've done it, but it would've been a mess with indices and stuff.
                # So we stack everything together and at the end we sum all the values we got
                # in this (1, 15) array. The sum will be the formula up to the summation symbol.
                va = next_state_prob * (REWARDS + gamma * v_old)
                va = va.sum() # We sum the values to get the final exact value, 
                              # we don't want to return a (1, 15) array.

                if va > max_va:
                    max_va = va # max over the states (v*_i+1)
                    best_a = a  # argmax over the action (pi*_i+1)
            values[s] = max_va
            best_actions[s] = best_a

    return values.reshape((env.height, env.width)), best_actions.reshape((env.height, env.width))

### Estimate values

In [8]:
values, best_actions = value_iteration(env)

print("Best values: \n", values)
print("Best actions: \n", best_actions)

Best values: 
 [[0.94825225 0.95777204 0.96217993]
 [0.95819816 0.9678848  0.97361562]
 [0.96815382 0.97791984 0.98537072]
 [0.97791984 0.98781902 0.99750623]
 [0.98537072 0.99750623 0.        ]]
Best actions: 
 [[3 2 2]
 [3 2 2]
 [2 2 2]
 [3 2 2]
 [3 3 0]]


### Simulate the Optimal policy


In [9]:
done = False
state = env.reset()
print("Initial state: [0, 0]\n")
env.render()
while not done:
    action = best_actions[state[0],state[1]]

    state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    env.render()

Initial state: [0, 0]

| A |___|___|
|///|___|___|
|///|___|___|
|___|___|___|
|///|___| G |


|___| A |___|
|///|___|___|
|///|___|___|
|___|___|___|
|///|___| G |


|___|___|___|
|///| A |___|
|///|___|___|
|___|___|___|
|///|___| G |


|___|___|___|
|///|___|___|
|///| A |___|
|___|___|___|
|///|___| G |


|___|___|___|
|///|___|___|
|///|___|___|
|___| A |___|
|///|___| G |


|___|___|___|
|///|___|___|
|///|___|___|
|___|___|___|
|///| A | G |


Agent is in the goal state. Let's end the loop.
|___|___|___|
|///|___|___|
|///|___|___|
|___|___|___|
|///|___| A |


