### Value Iteration

Value Iteration is a dynamic programming algorithm used to solve Markov Decision Processes (MDPs). It computes the optimal policy and the value function for an agent interacting with an environment. The algorithm iteratively updates the value of each state until it converges to the optimal value function.

#### Key Concepts:
1. **Value Function (V)**: Represents the maximum expected reward an agent can achieve starting from a given state and following the optimal policy.
2. **Bellman Equation**: The core of value iteration, it expresses the relationship between the value of a state and the values of its successor states.

#### Algorithm Steps:
1. **Initialization**: Start with an arbitrary value function $V(s)$ for all states $s$ (e.g., $V(s) = 0$).
2. **Update Rule**: For each state $s$, update its value using:
    $$
    V(s) = \max_a \sum_{s'} P(s' | s, a) \left[ R(s, a, s') + \gamma V(s') \right]
    $$
   where:
    - $P(s' | s, a)$: Transition probability from state $s$ to $s'$ under action $a$.
    - $R(s, a, s')$: Reward received when transitioning from $s$ to $s'$ under action $a$.
    - $\gamma$: Discount factor ($0 \leq \gamma < 1$) that prioritizes immediate rewards over future rewards.
3. **Convergence**: Repeat the update rule until the value function converges (i.e., the change in $V(s)$ is smaller than a predefined threshold).

#### Optimal Policy:
Once the value function converges, the optimal policy $\pi^*(s)$ can be derived as:
$$
\pi^*(s) = \arg\max_a \sum_{s'} P(s' | s, a) \left[ R(s, a, s') + \gamma V(s') \right]
$$

#### Advantages:
- Guarantees convergence to the optimal value function and policy.
- Simple and effective for small state and action spaces.

#### Limitations:
- Computationally expensive for large state or action spaces due to the need to evaluate all states and actions in each iteration.
- Requires knowledge of the transition probabilities and rewards, which may not always be available.

Value Iteration is widely used in reinforcement learning and decision-making problems where the environment can be modeled as an MDP.

In [1]:
import numpy as np
import gym

def value_iteration(env, gamma = 1.0):
    # Initialize the value function for all states to zero
    v = np.zeros(env.observation_space.n)
    
    # Set the maximum number of iterations and the convergence threshold
    max_iterations = 200000
    eps = 1e-5
    
    # Iterate to update the value function
    for i in range(max_iterations):
        # Make a copy of the current value function to track changes
        v_ = np.copy(v)
        
        # Loop through each state in the environment
        for state in range(env.observation_space.n):
            # Compute the value of each action by summing over all possible transitions
            q = [sum([prob * (reward + gamma * v_[state_]) 
                      for prob, state_, reward, _ in env.env.P[state][action]]) 
                 for action in range(env.action_space.n)]
            
            # Update the value of the current state to the maximum value of all actions
            v[state] = max(q)
        
        # Check for convergence by comparing the change in value function
        if (np.sum(np.fabs(v_ - v)) <= eps):
            print('Value-iteration converged at iteration %d.' % (i + 1))
            break

    # Return the optimal value function
    return v


In [2]:
def policy_extraction(env, v, gamma = 1.0):
    # Initialize the policy with zeros for all states
    policy = np.zeros(env.observation_space.n)
    
    # Loop through each state in the environment
    for state in range(env.observation_space.n):
        # Initialize an array to store the value of each action
        q = np.zeros(env.action_space.n)
        
        # Loop through each action available in the current state
        for action in range(env.action_space.n):
            # Compute the value of the action by summing over all possible transitions
            for prob, state_, reward, _ in env.env.P[state][action]:
                q[action] += prob * (reward + gamma * v[state_])
        
        # Select the action with the highest value as the optimal action for the current state
        policy[state] = np.argmax(q)

    # Return the extracted policy
    return policy

In [3]:
def run_episode(env, policy, gamma = 1.0, render = False):
    # Reset the environment to the initial state
    observation = env.reset()

    if isinstance(observation, tuple):
        observation = observation[0]  # Extract the initial observation
    else:
        observation = observation


    total_reward = 0  # Initialize the total reward
    step_idx = 0  # Initialize the step counter

    while True:
        if render:
            env.render()  # Render the environment if specified
        # Take the action dictated by the policy for the current state
        observation, reward, done, truncated, _ = env.step(int(policy[observation]))
        # Accumulate the discounted reward
        total_reward += (gamma ** step_idx * reward)
        step_idx += 1  # Increment the step counter
        if done:  # Exit the loop if the episode is finished
            break

    return total_reward  # Return the total reward for the episode

def test_episode(env, policy):
    # Reset the environment to the initial state
    observation = env.reset()

    if isinstance(observation, tuple):
        observation = observation[0]  # Extract the initial observation
    else:
        observation = observation

    while True:
        env.render()  # Render the environment
        # Take the action dictated by the policy for the current state
        observation, _, done, truncated, _ = env.step(int(policy[observation]))
        if done:  # Exit the loop if the episode is finished
            break

def evaluate_policy(env, policy, gamma = 1.0, n = 100):
    # Run multiple episodes and calculate the average score
    scores = [run_episode(env, policy, gamma, False) for _ in range(n)]
    return np.mean(scores)  # Return the mean score across episodes


In [4]:
env_name = 'FrozenLake8x8-v1' 
env = gym.make(env_name)
optimal_v = value_iteration(env, gamma = 1.0)
policy = policy_extraction(env, optimal_v, gamma = 1.0)
score = evaluate_policy(env, policy, gamma = 1.0)
print('Average scores = ', np.mean(score))

Value-iteration converged at iteration 813.
Average scores =  1.0


In [5]:
env = gym.make(env_name)
test_episode(env, policy)

  logger.warn(


In [6]:
policy.reshape(8, 8)

array([[3., 2., 2., 2., 2., 2., 2., 2.],
       [3., 3., 3., 3., 3., 3., 3., 2.],
       [0., 0., 0., 0., 2., 3., 3., 2.],
       [0., 0., 0., 1., 0., 0., 2., 2.],
       [0., 3., 0., 0., 2., 1., 3., 2.],
       [0., 0., 0., 1., 3., 0., 0., 2.],
       [0., 0., 1., 0., 0., 0., 0., 2.],
       [0., 1., 0., 0., 1., 2., 1., 0.]])

In [7]:
from gym import Env
from gym import spaces


class GridWorldEnv(Env):
    def __init__(self, grid_size=4, holes=None, mode = None):
        super(GridWorldEnv, self).__init__()
        self.grid_size = grid_size
        self.holes = holes if holes else []  # List of hole states
        self.observation_space = spaces.Discrete(grid_size * grid_size)  # Total states
        self.action_space = spaces.Discrete(4)  # Actions: 0=Left, 1=Down, 2=Right, 3=Up

        self.mode = mode

        # Define the transition probabilities and rewards
        self.P = self._build_transition_probabilities()
        self.env = self  # Make the environment accessible via self.env

    def _build_transition_probabilities(self):
        P = {}
        for state in range(self.observation_space.n):
            P[state] = {action: [] for action in range(self.action_space.n)}
            x, y = divmod(state, self.grid_size)

            for action in range(self.action_space.n):
                if action == 0:  # Left
                    next_x, next_y = x, max(y - 1, 0)
                elif action == 1:  # Down
                    next_x, next_y = min(x + 1, self.grid_size - 1), y
                elif action == 2:  # Right
                    next_x, next_y = x, min(y + 1, self.grid_size - 1)
                elif action == 3:  # Up
                    next_x, next_y = max(x - 1, 0), y

                next_state = next_x * self.grid_size + next_y
                if next_state in self.holes:
                    reward = -1  # Negative reward for falling into a hole
                    done = True  # Episode ends if the agent falls into a hole
                elif next_state == self.observation_space.n - 1:
                    reward = 1  # Positive reward for reaching the goal
                    done = True
                else:
                    reward = 0  # No reward for other states
                    done = False

                P[state][action].append((1.0, next_state, reward, done))
        return P

    def reset(self):
        self.state = 0
        return self.state

    def step(self, action):
        transitions = self.P[self.state][action]
        prob, next_state, reward, done = transitions[0]
        self.state = next_state

        return next_state, reward, done, False, {}

    def render(self, mode='human'):
        grid = np.zeros((self.grid_size, self.grid_size), dtype=str)
        grid[:] = '.'
        for hole in self.holes:
            x, y = divmod(hole, self.grid_size)
            grid[x, y] = 'H'  # Mark holes
        x, y = divmod(self.state, self.grid_size)
        grid[x, y] = 'A'  # Mark agent's position
        grid[-1, -1] = 'G'  # Mark goal
        print("\n".join([" ".join(row) for row in grid]))
        print()

In [8]:

env = GridWorldEnv(grid_size=4, holes=[5,10,13])
optimal_v = value_iteration(env, gamma = 1.0)
optimal_policy = policy_extraction(env, optimal_v, gamma = 1.0)
score = evaluate_policy(env, optimal_policy, gamma = 1.0)
print('Average scores = ', np.mean(score))

Average scores =  1.0


In [9]:
optimal_policy.reshape(4, 4)

array([[2., 2., 1., 1.],
       [1., 2., 2., 1.],
       [1., 1., 1., 1.],
       [2., 2., 2., 1.]])

In [10]:

env = GridWorldEnv(grid_size=4, holes=[5, 9, 13], mode='human')
test_episode(env, optimal_policy)
print(optimal_policy.reshape(env.grid_size, env.grid_size))


A . . .
. H . .
. H . .
. H . G

. A . .
. H . .
. H . .
. H . G

. . A .
. H . .
. H . .
. H . G

. . . .
. H A .
. H . .
. H . G

. . . .
. H . A
. H . .
. H . G

. . . .
. H . .
. H . A
. H . G

[[2. 2. 1. 1.]
 [1. 2. 2. 1.]
 [1. 1. 1. 1.]
 [2. 2. 2. 1.]]
