# Deep RL Bootcamp Lab 1

https://gymnasium.farama.org/environments/toy_text/frozen_lake/

In [None]:
import gymnasium as gym
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True, render_mode="rgb_array")
print(env.__doc__)


ACTION_UP = 3
ACTION_LEFT = 0
ACTION_RIGHT = 2
ACTION_DOWN = 1

action_name = {
    ACTION_UP: 'up',
    ACTION_LEFT: 'left',
    ACTION_RIGHT: 'right',
    ACTION_DOWN: 'down'
}

### Play the game.

In [None]:
#from gymnasium.utils.play import play
import numpy as np

mapping = {
    "w": ACTION_UP,
    "a": ACTION_LEFT,
    "s": ACTION_DOWN,
    "d": ACTION_RIGHT,
}

#play(env, keys_to_action=mapping, noop=2, fps=5)


## Step through the game

In [None]:
import matplotlib.pyplot as plt

def render_inline(env):
    img = env.render()
    plt.figure(figsize=(2,2))
    plt.imshow(img)
    plt.axis('off')
    plt.show()

def simulate_random_episode():
    env.reset()
    for t in range(100):
        render_inline(env)
        action = env.action_space.sample()
        print(f"Going {action_name[action]}")
        observation, reward, terminated, truncated, info = env.step(action)
        if terminated:
            break
    assert terminated
    render_inline(env)

simulate_random_episode()

## Making a Markov Decision Process from the game

In [None]:
class MDP(object):
    def __init__(self, P, num_states, num_actions, desc=None):
        self.P = P
        self.num_states = num_states
        self.num_actions = num_actions
        self.desc = desc
    
    def probability(self, state, action, next_state):
        """
        Probability of going to `next_state` if taking `action` in `state`.
        """
        return sum(tup[0] for tup in self.P[state][action] if tup[1] == next_state)
    
    def reward(self, state, action, next_state):
        """
        Reward from going from `state` to `next_state` via `action`.
        """
        return sum(tup[2] for tup in self.P[state][action] if tup[1] == next_state)


#state = 2
#action = ACTION_DOWN
#env.P[state][action]
num_states = env.observation_space.n
num_actions = env.action_space.n
mdp = MDP(env.P, num_states, num_actions)

mdp.reward(14, ACTION_RIGHT, 15)

```
env.P[14][ACTION_RIGHT] = 
[(0.3333333333333333, 14, 0.0, False),
 (0.3333333333333333, 15, 1.0, True),
 (0.3333333333333333, 10, 0.0, False)]
```
Which means the probability of reaching the goal at 15 with a reward of 1 is equal to 0.33333 if going right towards the goal.

Going backwards is not possible when trying to walk in a certain direction, but going left or right instead is possible.

```
env.P[7][ACTION_RIGHT] = 
[(1.0, 7, 0, True)]
```
Which means that any action taken from a hole still ends up in the hole.

Transitions at the edge can end up in the same state for some actions, while transitions in the middle of the board (except holes) always end up in another state.

## Value iteration without policy updates
To check if this approach works. Uses a static policy of "always go right", which turns out to have a 3% success rate from state 0.

In [None]:
import numpy as np

def value_iteration_version1(mdp, gamma, num_iterations):
    V = np.zeros(mdp.num_states) # maps state -> value
    policy = 2*np.ones(mdp.num_states) # maps state -> action
    
    for it in range(num_iterations):
        Vprev = V
        V = np.zeros(mdp.num_states)
        
        for s in range(mdp.num_states):
            V[s] = 0.0
            action = policy[s]
            for sPrime in range(mdp.num_states):
                probability = mdp.probability(s, action, sPrime)
                reward = mdp.reward(s, action, sPrime)
                V[s] += probability * (reward + Vprev[sPrime])
                
    return V
        
value_iteration_version1(mdp, 0.9, 20)

## Value iteration

In [None]:
import numpy as np

def value_iteration_version2(mdp, gamma, num_iterations):
    V = np.zeros(mdp.num_states) # maps state -> value
    policy = np.zeros(mdp.num_states, dtype=int) # maps state -> action
    
    for it in range(num_iterations):
        Vprev = V
        policy_prev = policy
        
        V = np.zeros(mdp.num_states)
        policy = np.zeros(mdp.num_states, dtype=int)
        
        # Evaluate current policy
        for s in range(mdp.num_states):
            V[s] = 0.0
            action = policy_prev[s]
            for sPrime in range(mdp.num_states):
                probability = mdp.probability(s, action, sPrime)
                reward = mdp.reward(s, action, sPrime)
                V[s] += probability * (reward + Vprev[sPrime])
                
        # Improve policy
        for s in range(mdp.num_states):
            expected_value_for_action = np.zeros(mdp.num_actions)
            for action in range(mdp.num_actions):
                sum_over_sPrime = 0.0
                for sPrime in range(mdp.num_states):
                    probability = mdp.probability(s, action, sPrime)
                    reward = mdp.reward(s, action, sPrime)
                    sum_over_sPrime += probability * (reward + Vprev[sPrime])
                expected_value_for_action[action] = sum_over_sPrime
            policy[s] = np.argmax(expected_value_for_action)

        max_diff = np.abs(V - Vprev).max()
        #print(max_diff)
        #print("win chance: ", V[0])
                
    return V, policy
        
V, policy = value_iteration_version2(mdp, 0.95, 1000)
print("win chance: ", V[0])

In [None]:

def simulate_episode(policy):
    env.reset()
    observation = 0
    for t in range(100):
        render_inline(env)
        action = policy[observation]
        print(f"Going {action_name[action]}")
        observation, reward, terminated, truncated, info = env.step(action)
        if terminated:
            break
    assert terminated
    render_inline(env)

simulate_episode(policy)

It works!
