# Reinforcement Learning in Finite MDPs

In [0]:
!git clone https://github.com/rlgammazero/mvarl_hands_on.git > /dev/null 2>&1
!cd mvarl_hands_on && git pull

In [5]:
!pip3 install gridworld



## MDPs

In [7]:
import sys
sys.path.insert(0, './mvarl_hands_on/utils')
import numpy as np
import matplotlib.pyplot as plt
import json
import math
# from gridworld import GridWorldWithPits

In [10]:
!pip3 install finite_env

Collecting finite_env
[31mCould not install packages due to an EnvironmentError: 404 Client Error: Not Found for url: https://pypi.org/simple/finite-env/
[0m


## Define the environment

In [9]:
from finite_env import FiniteEnv
import numpy as np

# Note: You do not need to read this Class. It only redefines the clearning robots


class RobotEnv(FiniteEnv):
    """
    Enviroment with 2 states and 3 actions
    Args:
        gamma (float): discount factor
        seed    (int): Random number generator seed
    """

    def __init__(self, gamma=0.5, seed=42):
        # Set seed
        self.RS = np.random.RandomState(seed)

        # Transition probabilities
        # shape (Ns, Na, Ns)
        # P[s, a, s'] = Prob(S_{t+1}=s'| S_t = s, A_t = a)

        Ns = 2
        Na = 3
        
        # Note we add a recharge option in state A with a negative reward (to have a well defined matrix-transition)
        P = np.array([[[1, 0], [3/4, 1/4], [1, 0]], [[0,1],[1,0], [1,0]]])
        print(P)
        print(P.shape)
        self._R = np.array([[0,1,-0.5], [0, -1, 0]])

        self.state_decoder  = {0: "A", 1: "B"}
        self.action_decoder = {0: "WAIT", 1: "SEARCH", 2: "RECHARGE"}
        
        # Initialize base class
        states = np.arange(Ns).tolist()
        action_sets = [np.arange(Na).tolist()]*Ns
        print(states)
        print(action_sets)
        super().__init__(states, action_sets, P, gamma)

    def reward_func(self, state, action, *_):
        return self._R[state, action]

    def reset(self, s=0):
        self.state = s
        return self.state

    def step(self, action):
        next_state = self.sample_transition(self.state, action)
        reward = self.reward_func(self.state, action, next_state)
        done = False
        info = {"str" : "In {} do {} arrive at {} get {}".format(
            self.state_decoder[state],
            self.action_decoder[action],
            self.state_decoder[next_state],
            reward )}
        self.state = next_state

        observation = next_state
        return observation, reward, done, info

    def sample_transition(self, s, a):
        prob = self.P[s,a,:]
        s_ = self.RS.choice(self.states, p = prob)
        return s_

    def render_policy(self, policy):
        if len(np.array(policy).shape) > 1:
            policy = densify_policy(policy)

        txt = ""
        for i, a in enumerate(policy):
            txt += "In state {} perform {}\n".format(self.state_decoder[i], self.action_decoder[a])
        return txt[:-1]

    @property
    def R(self):
        return self._R
  
    env = RobotEnv()

ModuleNotFoundError: No module named 'finite_env'

Setting up the environment

In [0]:
# Useful attributes
print("Set of states:", env.states)
print("Set of actions:", env.actions)
print("Number of states: ", env.Ns)
print("Number of actions: ", env.Na)
print("P has shape: ", env.P.shape)  # P[s'|s,a] = P[s, a, s'] = env.P[s, a, s']
print("R has shape: ", env.R.shape)  
print("discount factor: ", env.gamma)
print("")

# Usefult methods
state = env.reset() # get initial state
print("initial state: ", state)
print("reward at (s=0, a=1): ", env.reward_func(0,1))
print("")

# A random policy
policy = np.random.randint(env.Na, size = (env.Ns,))
print("random policy = ", policy)
print(env.render_policy(policy))

# Interacting with the environment
print("(s, a, s', r):")
for time in range(4):
    action = policy[state]
    next_state, reward, done, info = env.step(action)
    print(state, action, next_state, reward, "   --> " + info["str"] if "str" in info else "") 
    if done:
        break
    state = next_state
print("")


## Useful functions


In [0]:
# This functions ains at alternating between sparse and dense policy:
# sparse policy allows to perform matrix completion pi \in R^{SxA}. It is denoted pi in the code Ex: [[1, 0, 0], [0, 1, 0]])
# dense policy is a determinist policy whose value are the indexed actions. It is the argmax of pi. It is denoted dpi in the code. Ex [0, 1] 

def sparsify_policy(policy, Na):
    ### Turn a dense policy into a sparse one.
    #  Ex: [0, 1], Na=3  -> [[1, 0, 0], [0, 1, 0]]
    ###

    Ns = len(policy)
    sparse_policy = np.zeros(shape=(Ns, Na))
    for i, a in enumerate(policy):
        sparse_policy[i,a]=1
        return sparse_policy

def densify_policy(policy):
  ### Turn a sparse determinist policy into a dense one.
  #  Ex: [[1, 0, 0], [0, 1, 0]] -> [0, 1]
  ###
  return np.array(policy).argmax(axis=1)


## Exercice : Policy Evaluation
1. Evaluate the policy by solving the linear system
2. Evaluate the policy through recursion

In [0]:
# Policy evaluation (exact)

# Retrieve the environment MDP
P = env.P
R = env.R
gamma = env.gamma


# Policy to evaluate
# State A: Search
# State B: Wait
dpi = np.array([1, 0])
pi = sparsify_policy(dpi, Na=env.Na)
print("## old pi:")
print(pi)
print(env.render_policy(pi))

# states = [0, 1]
# actions = [0, 1, 2]
# Ppi = np.zeros([2,2])
# for s in states:
#   for s_next in states:
#     Ppi[s, s_next] = np.sum(pi[s] * P[s, :, s_next]) 

# Compute the dynamics given the policy
Ppi = np.sum(P * np.expand_dims(pi, axis=-1), axis=1)
Rpi = np.sum(R * pi, axis=1) #Note, we assume that R(s,a) does not depend of the next state

# Evaluate the policy
Vpi = np.linalg.inv( np.identity(env.Ns) - gamma*Ppi).dot(Rpi)
print("## Vpi: ")
print(Vpi)


# # Compute the Q values
Qpi = R + gamma * P.dot(Vpi)

# print("## Qpi:")
print("## Qpi: ")
print('**')
print(Qpi)


# # What is the next policy if we perform one step of policy improvment ?
dpi_new = Qpi.argmax(axis=1)

# print("## new pi:")
print("## pi: ")
print(sparsify_policy(dpi_new, Na=env.Na))
print(env.render_policy(dpi_new))

In [0]:
# Policy evaluation (recursive)

# Retrieve the environment MDP
P = env.P
R = env.R
gamma = env.gamma

# Policy to evaluate
dpi = np.array([1, 0])
pi = sparsify_policy(dpi, Na=env.Na)

# Compute the dynamics given the policy
Ppi = np.sum(P * np.expand_dims(pi, axis=-1), axis=1)
Rpi = np.sum(R * pi, axis=1) 

# Stopping criterion -> maximum number of steps
epsilon = 1e-3
r_max = np.max(R)
max_k = int(np.log(r_max/epsilon)/np.log(1/gamma))+1

v = np.array([-1, 1])
print("0:", v)
for k in range(max_k):
    v = Rpi + gamma*Ppi.dot(v)
    print("{}:".format(k), v)

print()

# Stopping criterion -> compute the infinite norm
v_new = np.array([-1, 1])
v_old = np.array([2*epsilon, 0])
k = 1
print("0:", v_new)
while np.absolute(v_new-v_old).max() > epsilon:
    v_old = v_new
    v_new = Rpi + gamma*Ppi.dot(v_new)
    print("{}:".format(k), v_new)
    k += 1

   

## New Environment!!!

In [0]:
# New Maze environment
# s: start
# g: goal
# x: negative reward state

grid1 = [
    ['', '', '', 'g'],
    ['', 'x', '', ''],
    ['s', '', '', '']
]
grid1_MAP = [
    "+-------+",
    "| : : :G|",
    "| :x: : |",
    "|S: : : |",
    "+-------+",
]

env = GridWorldWithPits(grid=grid1, txt_map=grid1_MAP, uniform_trans_proba=0)

In [0]:
# Useful attributes
print("Set of states:", env.states)
print("Set of actions:", env.actions)
print("Number of states: ", env.Ns)
print("Number of actions: ", env.Na)
print("P has shape: ", env.P.shape)  # P[s'|s,a] = P[s, a, s'] = env.P[s, a, s']
print("R has shape: ", env.R.shape)  
print("discount factor: ", env.gamma)
print("")

# Usefult methods
state = env.reset() # get initial state
print("initial state: ", state)
print("reward at (s=0, a=1,s'=1): ", env.reward_func(0,1,1))
print("")

# A random policy
dpi = np.random.randint(env.Na, size = (env.Ns,))
print("random policy = ", dpi)

# Interacting with the environment
print("(s, a, s', r):")
for time in range(4):
    action = dpi[state]
    next_state, reward, done, info = env.step(action)
    print(state, action, next_state, reward) 
    if done:
        break
    state = next_state
print("")

In [0]:
dpi = np.random.randint(env.Na, size = (env.Ns,))
env.render_policy(dpi)

state = env.reset()
env.render()
for i in range(5):
    action = dpi[state]
    state, reward, done, _ = env.step(action)
    env.render()
    if done:
        break

## Exercice: Value Iteration
1. Write a function applying the optimal Bellman operator on a provided Q function: $Q_1 = LQ_0, \; Q_0\in \mathbb{R}^{S\times A}$
2. Write a function implementing Value Iteration (VI) with $\infty$-norm stopping condition 
3. Evaluate the convergence of your estimate, i.e., plot the value $\|V_n - V^\star\|_{\infty}$
4. Evaluate the convergence of your estimate, i.e., plot the value $\|\pi_n - \pi^\star\|_{\infty}$

In [0]:
# useful function
def plot_infnorm(lst, star, name="V"):
  
    lst = np.array(lst)
    star = np.array(star)

     # Compute inf norm
    diff = np.absolute(lst - star).max(axis=1)
    plt.figure()
    plt.plot(diff)
    plt.xlabel('Iteration')
    plt.ylabel('Error')
    plt.title("||{} - {}*||_inf".format(name, name))

In [0]:
# Compute Value Iteration

# Retrieve the environment MDP
P = env.P
R = env.R
gamma = env.gamma
epsilon = 1e-3

# Prepare v, and storage
v = np.zeros(env.Ns)
v_all = []
pi_all = []

# iterate over the value
while True:
    
  
  # Compute v_k
    v, v_old = np.max(R + gamma * P.dot(v), axis=1), v
    v_all.append(v)

  # Esimate Intermediate policy
    q = R + gamma * P.dot(v)
    dpi = q.argmax(axis=1)
    print(dpi)

    pi_all.append(dpi)

    # stopping criterion 
    if np.absolute(v-v_old).max() < epsilon: 
    break

# Plot optimal policy
env.render_policy(dpi)

# You need to evaluate V* here from your optimal policy
# You have no guarantee that the last v-value is converge to the actual Vpi
pi = sparsify_policy(dpi, Na=env.Na)
print(pi)
Ppi = np.sum(P * np.expand_dims(pi, axis=-1), axis=1)
Rpi = np.sum(R * pi, axis=1)

v_star = np.linalg.inv( np.identity(env.Ns) - gamma*Ppi).dot(Rpi)
print("v (last):", v_all[-1])
print("v (star):", v_star)

# The difference is not exactly epsilon. We here we are comparing V*-V while the algorithm we are comparing V_new - V_old
print("epsilon: ", epsilon)
print("|v - v_star|_inf: ", np.absolute(v_all[-1]-v_star).max())

# The value function converge towards v* iteration after iteration
plot_infnorm(v_all, v_star, name="V")

# If we estimate the intermediate policies, we observe that the optimal policy is obtained earlier in the process.
# However, the value function has not yet converge. Thus ares still unaware that it is the optimal policty at iteration 4.
plot_infnorm(pi_all, dpi, name="Pi")


## Exercice: Policy Iteration (Homework)
1. Implement Policy iteration!
2. Evaluate the convergence of your estimate, i.e., plot the value $\|V_n - V^\star\|_{\infty}$
3. Evaluate the convergence of your estimate, i.e., plot the value $\|\pi_n - \pi^\star\|_{\infty}$

In [0]:
# Retrieve the environment MDP
P = env.P
R = env.R
gamma = env.gamma
epsilon = 5e-2
import copy

v_all = []
p_all = []

#Initialize policy
dpi = np.zeros(shape=(env.Ns,), dtype=np.int32) + 2
print(env.render_policy(dpi))

def policy_eval(env, policy, gamma=env.gamma, eps = 0.05):
    
    Ppi = np.sum(P * np.expand_dims(policy, axis=-1), axis=1)
    Rpi = np.sum(R * policy, axis=1)
    v_star = np.linalg.inv( np.identity(env.Ns) - gamma*Ppi).dot(Rpi)
    return v_star

def policy_inprovemnt(env, va_ameleorate):
    q = R + gamma * P.dot(va_ameleorate)
    dpi = q.argmax(axis=1)
    return dpi

def policy_iteration(env, gamma=env.gamma, eps = 0.05):
    old_policy = np.zeros(env.Ns, dtype=np.int32)
    while True:
    v = policy_eval(env, sparsify_policy(old_policy, env.Na))
    new_policy = policy_inprovemnt(env, v)
    if (new_policy == old_policy).all():
        break

    old_policy = copy.copy(new_policy)
    v_all.append(v)
    p_all.append(new_policy)
  return old_policy, v

save_, v = policy_iteration(env)

print(env.render_policy(save_))

plot_infnorm(v_all, v, name="V")


plot_infnorm(pi_all, save_, name="Pi")

## Exercice: Q learning
Q learning is a model-free algorithm for estimating the optimal Q-function online.
It is an off-policy algorithm since the samples are collected with a policy that is (potentially) not the one associated to the estimated Q-function.

1. Implement Q learning with $\epsilon$-greedy exploration.
  - Plot the error in Q-functions over iterations
  - Plot the cumulative sum of rewards

In [0]:
# ---------------------------
# Q-Learning
# ---------------------------
class QLearning:
    """
    Q learning with epsilon-greedy exploration
    """
    def __init__(self, env, gamma, learning_rate, epsilon, min_epsilon):
        self.env = env
        self.gamma = gamma
        self.learning_rate = learning_rate
        self.epsilon = epsilon
        self.min_epsilon = min_epsilon
        self.Q = np.zeros((env.Ns, env.Na))
        self.Nsa = np.zeros((env.Ns, env.Na))
    
    def sample_action(self, state, greedy=False):
        epsilon = self.epsilon / max(1, math.sqrt(self.Nsa[state].min()))
        
        # explore
        if not greedy and np.random.uniform(0, 1) < epsilon:
            return np.random.choice(self.env.actions)

        # exploit
        else:
            action = self.Q[state, :].argmax()
            return self.env.actions[action]
        
    
    def update(self, state, action, next_state, reward, done):
        alpha = self.learning_rate / max(1, math.sqrt(self.Nsa[state, action]))
        
        if not done:
          max_q = self.Q[next_state, :].max()
        else:
          max_q = 0.  # We do not bootstrap further
        
        q = self.Q[state, action]
        increment = (reward + self.gamma*max_q) - q

        # Update
        self.Q[state, action] = self.Q[state, action] + alpha*increment
        self.Nsa[state, action] += 1


In [0]:
q_learning = QLearning(env, gamma=env.gamma, learning_rate=1, epsilon=0.6, min_epsilon=0.1)

# Define storage and variable
q_all = []
r_all = []
pi_all = []
max_steps = int(5e4)


# main algorithmic loop
state = env.reset()
for t in range(max_steps):
    
    
  # Sample the action
    action = q_learning.sample_action(state, greedy=False)
  
  # Sample the environment
    next_state, reward, done, _ = env.step(action)
  
  # Update q-function
    q_learning.update(state=state, action=action, next_state=next_state, reward=reward, done=done)

    # Store information 
    r_all.append(reward)
    q_all.append(q_learning.Q)
    pi_all.append(q_learning.Q.argmax(axis=1))

    state = next_state
    if done:
        state = env.reset()

dpi = q_learning.Q.argmax(axis=1)
print(env.render_policy(dpi))


In [0]:
state = env.reset()
env.render()
for i in range(5):
    action = q_learning.sample_action(state, greedy=True)
    next_state, reward, done, _ = env.step(action)
    env.render()
    state = next_state

In [0]:
# Does the Q-value in the goal-state make sense?
print(q_learning.Q)

In [0]:
plot_infnorm(pi_all, pi_all[-1], name="Pi")

In [0]:
pi = sparsify_policy(pi_all[-1], Na=env.Na)
Ppi = np.sum(env.P * np.expand_dims(pi, axis=-1), axis=1)
Rpi = np.sum(env.R * pi, axis=1)

v_star = np.linalg.inv( np.identity(env.Ns) - gamma*Ppi).dot(Rpi)

v_all = []
for dpi_prev, q_prev in zip(pi_all, q_all):
    pi_prev = sparsify_policy(dpi_prev, Na=env.Na)
    v_prev = np.sum(pi_prev*q_prev, axis=1)
    v_all.append(v_prev)


# You may notive that some states have a high errors (especially the one with the negative rewards)
# Indeed, as there are less and less explored, the Q-values in "bad" states are not updated anymore as they are avoided by the agent.
plot_infnorm(v_all, v_star, name="V")


## Exercice: SARSA (Homework)
SARSA is another control algorithm. While Qlearning is off-policy, SARSA is on-policy.

1. Implement SARSA with softmax (Gibbs) exploration and test the convergence to $Q^\star$
2. Plot the value $\|V_n - V^\star\|_{\infty}$

In [0]:
# ---------------------------
# SARSA
# ---------------------------
class SARSA:
    """
    SARSA with deacreasing epsilon for exploration
    """

    def __init__(self, env, gamma, min_epsilon, learning_rate, epsilon):
        self.env = env
        self.gamma = gamma
        self.learning_rate = learning_rate
        self.min_epsilon = min_epsilon
        self.epsilon = epsilon
        self.Q = np.zeros((env.Ns, env.Na))
        self.Nsa = np.ones((env.Ns, env.Na))
    
    def sample_action(self, state, greedy=False):
        epsilon = min(
            self.epsilon / math.sqrt(self.Nsa[state].min()),
            self.min_epsilon)
        
        # explore
        if not greedy and np.random.uniform(0, 1) < epsilon:
            return np.random.choice(self.env.actions)

        # exploit
        else:
            action = self.Q[state, :].argmax()
        return self.env.actions[action]


    def update(self, state, action, next_state, next_action, reward):
        alpha = self.learning_rate / math.sqrt(self.Nsa[state, action])
        
        if not done:
            max_q = self.Q[next_state, next_action]
        else:
            max_q = 0.  # We do not bootstrap further
        
        q = self.Q[state, action]
        increment = (reward + self.gamma*max_q) - q

        # Update
        self.Q[state, action] = self.Q[state, action] + alpha*increment
        self.Nsa[state, action] += 1



In [0]:

sarsa = SARSA(env, gamma=env.gamma, min_epsilon = 0.1, learning_rate=1., epsilon=1.)

max_steps = int(5e4)


# main algorithmic loop
state = env.reset()
action = sarsa.sample_action(state, greedy=False)

for t in range(max_steps):
    next_state, reward, done,_ = env.step(action)
    
  # Sample the action
    next_action = sarsa.sample_action(next_state, greedy=False)
  
  # Sample the environment
  
  # Update q-function
sarsa.update(state=state, action=action, next_state=next_state, next_action = next_action, reward=reward)

  # Store information 

state = next_state
action = next_action
 if done:
    state = env.reset()
    action = sarsa.sample_action(state, greedy=False)

dpi = sarsa.Q.argmax(axis=1)
print(env.render_policy(dpi))