# Topic 3 Value Based Q-Learning

## Activity: Value and Policy Iterations

In [None]:
import numpy as np
from gridWorldGame import standard_grid, negative_grid,print_values, print_policy

SMALL_ENOUGH = 1e-3
GAMMA = 0.9
ALL_POSSIBLE_ACTIONS = ('U', 'D', 'L', 'R')

In [None]:
# this grid gives you a reward of -0.1
# to find a shorter path to the goal, use negative grid

grid = negative_grid()
print("rewards:")
print_values(grid.rewards, grid)

In [None]:
# Define the policy: state -> action
# choose an action and update randomly 

policy = {}
for s in grid.actions.keys():
  policy[s] = np.random.choice(ALL_POSSIBLE_ACTIONS)

# initial policy
print("initial policy:")
print_policy(policy, grid)

In [None]:
## initialize V(s)value function

V = {}
states = grid.all_states()
for s in states:
  # V[s] = 0
  if s in grid.actions:
    V[s] = np.random.random()
  else:
    # terminal state
    V[s] = 0

# initial value for all states in grid
print(V)
print_values(V, grid)

In [None]:
## Value function Iteration

iteration=0
while True:
  iteration+=1
  print("values %d: " % iteration)
  print_values(V, grid)
  print("policy %d: " % iteration)
  print_policy(policy, grid)
  
  biggest_change = 0
  for s in states:
    old_v = V[s]

    # V(s) only has value if it's not a terminal state
    if s in policy:
      new_v = float('-inf')
      for a in ALL_POSSIBLE_ACTIONS:
        grid.set_state(s)
        r = grid.move(a)
        v = r + GAMMA * V[grid.current_state()]
        if v > new_v:
          new_v = v
      V[s] = new_v
      biggest_change = max(biggest_change, np.abs(old_v - V[s]))

  if biggest_change < SMALL_ENOUGH:
    break

In [None]:
## find a policy that leads to optimal value function

for s in policy.keys():
  best_a = None
  best_value = float('-inf')
  # loop through all possible actions to find the best current action
  for a in ALL_POSSIBLE_ACTIONS:
    grid.set_state(s)
    r = grid.move(a)
    v = r + GAMMA * V[grid.current_state()]
    if v > best_value:
      best_value = v
      best_a = a
  policy[s] = best_a

# our goal here is to verify that we get the same answer as with policy iteration
print("values:")
print_values(V, grid)
print("policy:")
print_policy(policy, grid)

## Activity: Q-Learning in 1D

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

ALPHA = 0.1
GAMMA = 0.95
EPSILION = 0.9
N_STATE = 6
ACTIONS = ['left', 'right']
MAX_EPISODES = 10
FRESH_TIME = 0.1

In [None]:
## Build the Q Table 

def build_q_table(n_state, actions):
    q_table = pd.DataFrame(
    np.zeros((n_state, len(actions))),
    np.arange(n_state),
    actions
    )
    return q_table

In [None]:
## Define the Action Policy

def choose_action(state, q_table):
    #epslion - greedy policy
    state_action = q_table.loc[state,:]
    if np.random.uniform()>EPSILION or (state_action==0).all():
        action_name = np.random.choice(ACTIONS)
    else:
        action_name = state_action.idxmax()
    return action_name


In [None]:
## Define the Environemnt Feedback

def get_env_feedback(state, action):
    if action=='right':
        if state == N_STATE-2:
            next_state = 'terminal'
            reward = 10
        else:
            next_state = state+1
            reward = 1
    else:
        if state == 0:
            next_state = 0
            
        else:
            next_state = state-1
        reward = -1
    return next_state, reward

In [None]:
## Update Environemnt

def update_env(state,episode, step_counter):
    env = ['-'] *(N_STATE-1)+['T']
    if state =='terminal':
        print("Episode {}, the total step is {}".format(episode+1, step_counter))
        final_env = ['-'] *(N_STATE-1)+['T']
        return True, step_counter
    else:
        env[state]='*'
        env = ''.join(env)
        print(env)
        time.sleep(FRESH_TIME)
        return False, step_counter

In [None]:
## Define Q Learning Algorithm

def q_learning():
    q_table = build_q_table(N_STATE, ACTIONS)
    step_counter_times = []
    for episode in range(MAX_EPISODES):
        state = 0
        is_terminal = False
        step_counter = 0
        update_env(state, episode, step_counter)
        while not is_terminal:
            action = choose_action(state,q_table)
            next_state, reward = get_env_feedback(state, action)
            next_q = q_table.loc[state, action]
            if next_state == 'terminal':
                is_terminal = True
                q_target = reward
            else:
                delta = reward + GAMMA*q_table.iloc[next_state,:].max()-q_table.loc[state, action]
                q_table.loc[state, action] += ALPHA*delta
            state = next_state
            is_terminal,steps = update_env(state, episode, step_counter+1)
            step_counter+=1
            if is_terminal:
                step_counter_times.append(steps)
                
    return q_table, step_counter_times


In [None]:
## Execute the Q-learning

def main():
    q_table, step_counter_times= q_learning()
    print("Q table\n{}\n".format(q_table))
 
    plt.plot(step_counter_times,'g-')
    plt.ylabel("steps")
    plt.show()
    print("The step_counter_times is {}".format(step_counter_times))

    policy = {}
    V = np.zeros(N_STATE)
    
    q_table_np = q_table.to_numpy()

    for S in range(N_STATE):
        policy[S] = np.argmax(q_table_np[S,:])
        V[S] = np.max(q_table_np[S,:])
    print('policy :', policy)
    print('value function: ', V)

main()

## Activity: Q-Learning in 1D (another version)

In [None]:
import numpy as np
import time

N_STATES = 6   # No of States
N_ACTIONS = 2  # No of Actions
EPSILON = 0.2  # greedy police
ALPHA = 0.1     # learning rate
GAMMA = 0.9    # discount factor
MAX_EPISODES = 10   # maximum episodes

In [None]:
## Initialize Q Tables

Q = np.zeros((N_STATES, N_ACTIONS))
print(Q)

In [None]:
## Env Feedback

def get_env_feedback(S, A):
    if A == 1:    
        if S == N_STATES - 2:   
            S_ = N_STATES - 1
            R = 10
        else:
            S_ = S + 1
            R = 1
    else:   # move left
        R = -1
        if S == 0:
            S_ = S  
        else:
            S_ = S - 1
    return S_, R

In [None]:
## Update Env

def update_env(S, episode, step_counter):
    env_list = ['-']*(N_STATES-1) + ['T']   # '---------T' our environment
    if S == N_STATES - 1:
        print(' Episode {}: total_steps = {}'.format(episode+1,step_counter))
        time.sleep(0.3)
    else:
        env_list[S] = 'o'
        interaction = ''.join(env_list)
        print('\r{}'.format(interaction), end='')
        time.sleep(0.3)

In [None]:
## Q-Learning

for episode in range(MAX_EPISODES):
    step_counter = 0
    S = 0
    is_terminated = False
    update_env(S, episode, step_counter)
    while not is_terminated:
        
        if np.random.random() < EPSILON:
            A = np.random.randint(0, N_ACTIONS)
        else:
            A = np.argmax(Q[S,:])

        S_, R = get_env_feedback(S, A)  
        q_current = Q[S, A]
        if S_ != N_STATES-1:
            q_target = R + GAMMA*np.max(Q[S_, :])
        else:
            q_target = R     
            is_terminated = True    

        Q[S, A] += ALPHA * (q_target - q_current)  
        S = S_  

        update_env(S, episode, step_counter+1)
        step_counter += 1

## Final Q Table
print(Q)

In [None]:
## Test the final Q Table

S = 0
step_counter = 0
is_terminated = False
while not is_terminated:  
    update_env(S, 0, step_counter)
    A = np.argmax(Q[S,:])
    S_, R = get_env_feedback(S, A)
    step_counter += 1
    S = S_
    if S == N_STATES-1: 
        is_terminated = True
print(' Total steps = ',step_counter)

In [None]:
## Compute Optimal Policy and Value Function

policy = {}
V = np.zeros(N_STATES)
for S in range(N_STATES):
    policy[S] = np.argmax(Q[S,:])
    V[S] = np.max(Q[S,:])
print('policy :', policy)
print('value function: ', V)

## Activity: Q-Learnning for FrozenLake-v0 or Taxi-v3 Gym

In [None]:
import numpy as np
import gym
import time

lr = 0.1
gamma = 0.8
epsilon = 0.1
episodes = 10000

#env = gym.make('FrozenLake-v0')
env = gym.make("Taxi-v3").env

In [None]:
## Initialize Q Table

Q = np.zeros((env.observation_space.n, env.action_space.n))
print(Q)

In [None]:
## Q-Learning

for i in range(episodes):
    print("Episode {}/{}".format(i + 1, episodes))
    s = env.reset()
    done = False

    while not done:
        if np.random.random() < epsilon:
            a = env.action_space.sample()
        else:
            a = np.argmax(Q[s,:])
        s_, r, done, _ = env.step(a)
        Q[s,a] += lr*(r+gamma*np.max(Q[s_,:]) - Q[s,a])
        s = s_

# Print Final Q Table
print(Q)

In [None]:
## Compute the # of Steps and Total Rewards

s = env.reset()
done = False
step_count = 0
total_reward = 0

while not done:
    env.render()
    a = np.argmax(Q[s,:])
    s_, r, done, _ = env.step(a)
    s = s_
    step_count += 1
    total_reward += r
    time.sleep(0.1)

print("Total steps: ",step_count)
print("Total rewards: ",total_reward)

## Activity: Q-Learnning with Decay Learning Rate for FrozenLake-v0 or Taxi-v3 Gym

In [None]:
import numpy as np
import gym
import time

lr = 0.33
lrMin = 0.001
lrDecay = 0.9999
gamma = 0.8
epsilon = 1.0
epsilonMin = 0.001
epsilonDecay = 0.97
episodes = 10000

#env = gym.make('FrozenLake-v0')
env = gym.make("Taxi-v3").env

In [None]:
## Initialize Q Table

Q = np.zeros((env.observation_space.n, env.action_space.n))
print(Q)

In [None]:
## Q-Learning

for i in range(episodes):
    print("Episode {}/{}".format(i + 1, episodes))
    s = env.reset()
    done = False

    while not done:
        
        if np.random.random() < epsilon:
            a = env.action_space.sample()
        else:
            a = np.argmax(Q[s,:])
        
        s_, r, done, _ = env.step(a)
        Q[s,a] += lr*(r+gamma*np.max(Q[s_,:]) - Q[s,a])
        s = s_
        
        if lr > lrMin:
            lr *= lrDecay

        if not r==0 and epsilon > epsilonMin:
            epsilon *= epsilonDecay

### Print Final Q Table
print(Q)

In [None]:
## Compute the # of Steps and Total Rewards

s = env.reset()
done = False
step_count = 0
total_reward = 0

while not done:
    env.render()
    a = np.argmax(Q[s,:])
    s_, r, done, _ = env.step(a)
    s = s_
    step_count += 1
    total_reward += r
    time.sleep(0.1)

print("Total steps: ",step_count)
print("Total rewards: ",total_reward)

## Activity: SARSA in 1D

In [None]:
import numpy as np
import time

N_STATES = 6   # No of States
N_ACTIONS = 2  # No of Actions 
EPSILON = 0.2  # greedy police
ALPHA = 0.1     # learning rate
GAMMA = 0.9    # discount factor
MAX_EPISODES = 10   # maximum episodes

In [None]:
## Initialize Q Tables

Q = np.zeros((N_STATES, N_ACTIONS))
print(Q)

In [None]:
## Define the Action Policy

def choose_action(S, Q):
    if np.random.random() < EPSILON:
        A = np.random.randint(0, N_ACTIONS)
    else:
        A = np.argmax(Q[S,:])
    return A

In [None]:
## Define the Environemnt Feedback

def get_env_feedback(S, A):
    if A == 1:  # move right
        if S == N_STATES - 2:   
            S_ = N_STATES - 1
            R = 10
        else:
            S_ = S + 1
            R = 1
    else:   # move left
        R = -1
        if S == 0:
            S_ = S  
        else:
            S_ = S - 1
    return S_, R

In [None]:
## Update Environemnt

def update_env(S, episode, step_counter):
    env_list = ['-']*(N_STATES-1) + ['T']   # '---------T' our environment
    if S == N_STATES - 1:
        print(' Episode {}: total_steps = {}'.format(episode+1,step_counter))
        time.sleep(0.3)
    else:
        env_list[S] = 'o'
        interaction = ''.join(env_list)
        print('\r{}'.format(interaction), end='')
        time.sleep(0.3)

In [None]:
## Q-Learning

for episode in range(MAX_EPISODES):
    step_counter = 0
    S = 0
    is_terminated = False
    update_env(S, episode, step_counter)
    
    A = choose_action(S, Q)
    while not is_terminated:
        
        S_, R = get_env_feedback(S, A)  
        A_ = choose_action(S_, Q)
  
        q_current = Q[S, A]
        if S_ != N_STATES-1:
            q_target = R + GAMMA*Q[S_, A_] 
        else:
            q_target = R     
            is_terminated = True    

        Q[S, A] += ALPHA * (q_target - q_current)  
        S = S_  
        A = A_

        update_env(S, episode, step_counter+1)
        step_counter += 1

print(Q)

In [None]:
## Test the final Q Table

S = 0
step_counter = 0
is_terminated = False
while not is_terminated:  
    update_env(S, 0, step_counter)
    A = np.argmax(Q[S,:])
    S_, R = get_env_feedback(S, A)
    step_counter += 1
    S = S_
    if S == N_STATES-1: 
        is_terminated = True
print(' Total steps = ',step_counter)

In [None]:
## Compute Optimal Policy and Value Function

policy = {}
V = np.zeros(N_STATES)
for S in range(N_STATES):
    policy[S] = np.argmax(Q[S,:])
    V[S] = np.max(Q[S,:])
print('policy :', policy)
print('value function: ', V)

## Activity: SARSA with Decay Learning Rate for FrozenLake-v0 or Taxi-v3 Gym

In [None]:
import numpy as np
import gym
from IPython.display import clear_output
import time

lr = 0.33
lrMin = 0.001
lrDecay = 0.9999
gamma = 1.0
epsilon = 1.0
epsilonMin = 0.001
epsilonDecay = 0.97
episodes = 2000

#env = gym.make('FrozenLake-v0')
env = gym.make("Taxi-v3").env

In [None]:
## Initialize Q Table

Q = np.zeros((env.observation_space.n, env.action_space.n))
print(Q)

In [None]:
## Action Policy

def choose_action(s, Q):
    if np.random.random() < epsilon:
        a = np.random.randint(0, env.action_space.n)
    else:
        a = np.argmax(Q[s,:])
    return a

In [None]:
## Q-Learning

for i in range(episodes):
    print("Episode {}/{}".format(i + 1, episodes))
    s = env.reset()
    done = False
    
    a = choose_action(s, Q)
    while not done:
        s_, r, done, _ = env.step(a)
        a_ = choose_action(s_, Q)
        Q[s,a] = Q[s,a] + lr*(r+gamma*(Q[s_,a_]) - Q[s,a])
        s = s_
        a = a_
        
        if lr > lrMin:
            lr *= lrDecay

        if not r==0 and epsilon > epsilonMin:
            epsilon *= epsilonDecay

print(Q)

In [None]:
## Compute the # of Steps and Total Rewards

s = env.reset()
done = False
step_count = 0
total_reward = 0

while not done:
    env.render()
    a = np.argmax(Q[s,:])
    s_, r, done, _ = env.step(a)
    s = s_
    step_count += 1
    total_reward += r
    time.sleep(0.1)

print("Total steps: ",step_count)
print("Total rewards: ",total_reward)