<a href="https://colab.research.google.com/github/ougrid/my-knowledge-resource/blob/master/reinforcement_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Reinforcement Learning 

1. Value Iteration
2. Q-Learning 

ref: https://gibberblot.github.io/rl-notes/single-agent/MDPs.html


In [None]:
!pip3 install --upgrade gdown
!gdown --folder --no-check-certificate https://drive.google.com/drive/folders/1Cg-nfqseH-vB4eCRUY3hc1sFJ8Xnrvjw?usp=share_link  
!cp python_code/* .

In [None]:
from gridworld import *

In [None]:
gw = GridWorld()
gw.visualise()

Things can go wrong — sometimes the effects of the actions are not what we want:

* If the agent tries to move north, 80% of the time, this works as planned (provided the wall is not in the way)

* 10 %
 of the time, trying to move north takes the agent west (provided the wall is not in the way);

* 10 %
 of the time, trying to move north takes the agent east (provided the wall is not in the way)

* If the wall is in the way of the cell that would have been taken, the agent stays in the current cell.

In [None]:
from collections import defaultdict
from value_function import ValueFunction
from qtable import QFunction

class TabularValueFunction(ValueFunction):
    def __init__(self, default=0.0):
        self.value_table = defaultdict(lambda: default)

    def update(self, state, value):
        self.value_table[state] = value

    def merge(self, value_table):
        for state in value_table.value_table.keys():
            self.update(state, value_table.get_value(state))

    def get_value(self, state):
        return self.value_table[state]

class QTable(QFunction):
    def __init__(self, default=0.0):
        self.qtable = defaultdict(lambda: default)

    def update(self, state, action, qvalue):
        self.qtable[(state, action)] = qvalue

    def get_q_value(self, state, action):
        return self.qtable[(state, action)]
    
    def merge(self, qtable):
        for state, action in qtable.qtable.keys():
            self.update(state, action, qtable.get_q_value(state, action))


In [None]:
values = TabularValueFunction()
qtable = QTable()

We start with an initial guess for the value function

In [None]:
gridworld = GridWorld()
gridworld.visualise_value_function(values, "Value function")

In [None]:
gridworld.visualise_q_function(qtable, "Q-Value function")

In [None]:
policy = values.extract_policy(gridworld)
gridworld.visualise_policy(policy, "Policy")

Q(s,a) = E[ r + gamma * V(s')]

V(s')  = max Q(s', a')

In [None]:
values = TabularValueFunction()
qtable = QTable()

In [None]:
gw.get_discount_factor()

In [None]:
new_values = TabularValueFunction()
new_qtable = QTable()

for state in gw.get_states():
  for action in gw.get_actions(state):

    new_value = 0.0
    for (new_state, probability) in gw.get_transitions(state, action):

      reward = gw.get_reward(state, action, new_state)
      new_value += probability * ( reward + (gw.get_discount_factor() * values.get_value(new_state)))

    new_qtable.update(state, action, new_value)

  (_, max_q) = new_qtable.get_max_q(state, gw.get_actions(state))
  new_values.update(state, max_q)

values.merge(new_values)
qtable.merge(new_qtable)

In [None]:
# 1-step value propagate back from the rewarded state
# try run the previous cell couple of times to see how the value changes
gridworld.visualise_value_function(values, "Value function")

In [None]:
gridworld.visualise_q_function(qtable, "Q function")

In [None]:
values = TabularValueFunction()
qtable = QTable()

max_iterations = 1000
threshold = 0.1
for i in range(max_iterations):
  new_values = TabularValueFunction()
  new_qtable = QTable()
  delta = 0.0 

  for state in gw.get_states():
    for action in gw.get_actions(state):

      new_value = 0.0
      for (new_state, probability) in gw.get_transitions(state, action):

        reward = gw.get_reward(state, action, new_state)
        new_value += probability * ( reward + (gw.get_discount_factor() * values.get_value(new_state)))

      new_qtable.update(state, action, new_value)

    (_, max_q) = new_qtable.get_max_q(state, gw.get_actions(state))
    
    delta = max(delta, abs(values.get_value(state) - max_q))
    new_values.update(state, max_q)

  values.merge(new_values)
  qtable.merge(new_qtable)

  if delta < threshold:
    print(i)
    break


In [None]:
gridworld.visualise_value_function(values, "Value function")

In [None]:
gridworld.visualise_q_function(qtable, "Q function")

Reinforcement Learning is needed when transition probability and reward is unknown.
We need to interact with the environment to gain knowledge of what does the environment look like.

In [None]:
class UnknownWorld():
  def __init__(self):
    self.mdp = GridWorld()
    self.state = self.mdp.get_initial_state()
    self.possible_actions = self.mdp.get_actions(self.state)

  def reset(self):
    self.state = self.mdp.get_initial_state()
    return self.state 
    
  def step(self, action):
    terminal = self.mdp.is_terminal(state)
    if not terminal:
      next_state, reward = self.mdp.execute(self.state, action)
      self.state = next_state
      return next_state, reward, terminal

    else:
      return state, 0, terminal


In [None]:
import numpy as np

class RandomAgent():
  def __init__(self, possible_actions):
    self.possible_actions = possible_actions
  def step(self, state):
    return np.random.choice(self.possible_actions)
    

In [None]:
world = UnknownWorld()
random_agent = RandomAgent(world.possible_actions)
state = world.state
terminal = False
for step in range(10000):
    action = random_agent.step(state)
    next_state, reward, terminal = world.step(action)
    state = next_state
    if terminal:
      break


In [None]:
class QLearning:
  def __init__(self, possible_actions):
    self.qfunction = QTable()
    self.eps = 0.1
    self.possible_actions = possible_actions
    self.gamma = 0.99
    self.step_size = 0.1

  def step(self, state):

    if np.random.rand() < self.eps:  
      return np.random.choice(self.possible_actions)
    else:
      (best_action, max_q) = self.qfunction.get_max_q(state, self.possible_actions)
      return best_action

  def learn_one_step(self, state, action, reward, next_state):
    # Q(s,a) = r + gamma * max_b Q(s', b)
    current_q = self.qfunction.get_q_value(state, action)
    
    max_next_state_q = -np.inf
    for b in self.possible_actions:
      next_state_q = self.qfunction.get_q_value(next_state, b)
      if next_state_q > max_next_state_q:
        max_next_state_q = next_state_q

    new_q     = current_q  + self.step_size * (reward + self.gamma*max_next_state_q - current_q) 
    self.qfunction.update(state, action, new_q)

  

In [None]:
world = UnknownWorld()
agent = QLearning(world.possible_actions)

for episode in range(2000):
  terminal = False
  state = world.reset()
  for step in range(10000):
      
    action = agent.step(state)
    next_state, reward, terminal = world.step(action)

    agent.learn_one_step(state, action, reward, next_state)
    state = next_state
    if terminal:
      break


In [None]:
gridworld.visualise_q_function(agent.qfunction, "Q function")

Deep Reinforcement Learning