By: Muhammad Khalid - Samaa Khaled
Based on: Eng. Muhammad Alaref startup code

# Reinforcement Learning (RL)

Reinforcement learning is a technique used in _unknown_ environments. It is based on the idea of rewards that differentiate good states from bad ones. That is, the environment changes with every action you take in undeterministic ways and the distribution of possible next states is _unknown_. The only _precepts_ taken from the environment are the current state, the possible actions and the reward upon doing any action. Therefore, some form of _learning_ is needed to make progress in these environments.

We use the same definitions, concepts and annotations from decision processes; $s$, $a$, $s'$, $a'$, etc.

## Environment Formulation

We formulate an environment in a similar way to decision processes _without_ a way to know the transition model.


In [2]:
class Environment:
    '''
    Abstract base class for an (interactive) environment formulation.
    It declares the expected methods to be used to solve it.
    All the methods declared are just placeholders that throw errors if not overriden by child "concrete" classes!
    '''
    
    def __init__(self):
        '''Constructor that initializes the problem. Typically used to setup the initial state.'''
        self.state = None
    
    def actions(self):
        '''Returns an iterable with the applicable actions to the current environment state.'''
        raise NotImplementedError
    
    def apply(self, action):
        '''Applies the action to the current state of the environment and returns the new state from applying the given action to the current environment state; not necessarily deterministic.'''
        raise NotImplementedError
    
    @classmethod
    def new_random_instance(cls):
        '''Factory method to a problem instance with a random initial state.'''
        raise NotImplementedError


Generally, to _learn_ any value, we can experiment in the environment and keep a tally of our the experiences. Then, using the consecutive rewards collected, value estimates can be calculated. Also, by keeping track of transition statistics, the transition model can be estimated. This, in effect, reduces the problem to an MDP that can be solved using value or policy iteration techniques. This is called **model-based** learning.

$$V = E[V]$$

Another way to estimate a value is to start with an initial value (say 0) and update it with every new experience. This is called **temporal-difference (TD) learning**. _Note that does NOT require the transition model for learning!_ This is called **model-free** learning and it will be focus of this notebook.

$$V = (1 - \alpha)V_{old} + \alpha V_{new} $$
or equivalently,
$$V = V_{old} + \alpha (V_{new} - V_{old}) $$

_Provided $\alpha$ decreases as the number of experiences (of $V$) increases, $V$ will converge to the true value._

## Q-Learning

Recall how we get a policy from Q-values!

$$\pi^*(s) = \argmax_{a \in A_s} Q(s,a)$$

Unlike value iteration and policy iteration that rely on utility values, extracting the policy from Q-values does NOT depend on the transition model in any way.

In [3]:
from shutil import get_terminal_size
terminal_width, _ = get_terminal_size()

_visualizers = {}

def _default_visualizer(_, state):
    '''Generic visualizer for unknown problems.'''
    print(state)

class Visualizer:
    '''Visualization and printing functionality encapsulation.'''

    def __init__(self, problem):
        '''Constructor with the problem to visualize.'''
        self.problem = problem
        self.counter = 0
    
    def visualize(self, frontier):
        '''Visualizes the frontier at every step.'''
        self.counter += 1
        print(f'Frontier at step {self.counter}')
        for state in frontier:
            print()
            _visualizers.get(type(self.problem), _default_visualizer)(self.problem, state)
        print('-' * terminal_width)

def _tom_and_jerry_visualizer(env, state):
    '''Custom visualizer for Tom and Jerry.'''
    tom, jerry, spike = env.state
    for j in range(env.bounds[1] - 1, -1, -1):
        for i in range(env.bounds[0]):
            print('🐱' if (i, j) == tom else '🐭' if (i, j) == jerry else '🐶' if (i, j) == spike else '⬜', end='')
        print()

In [4]:
def action_from_q(env, q, verbose=True):
    '''Get the best action for the current state of the environment from Q-values'''
    return max((action for action in env.actions()), key=lambda action: q.get((env.state, action), 0))

Combining Q-values with TD learning yields an iterative technique that does not need a transition model in learning nor in decision making!

$$Q_{i+1}(s,a) = Q_i(s,a) + \alpha (r_s + \gamma \times \max_{a'} Q_i(s',a') - Q_i(s,a))$$

In [5]:
def q_learning(env, q={}, n={}, f=lambda q, n: (q+1)/(n+1), alpha=lambda n: 60/(n+59), error=1e-6, verbose=False):
    '''Q-learning implementation that trains on an environment till no more actions can be taken'''
    if verbose: visualizer = Visualizer(env)
    while env.state is not None:
        if verbose: visualizer.visualize([env.state])
        state = env.state
        action = max(env.actions(),
                     key=lambda next_action: f(q.get((state, next_action), 0), n.get((state, next_action), 0)))
        n[(state, action)] = n.get((state, action), 0) + 1
        reward = env.apply(action)
        q[(state, action)] = q.get((state, action), 0) \
                           + alpha(n[state, action]) \
                           * (reward
                              + env.discount * max((q.get((env.state, next_action), 0) for next_action in env.actions()), default=0)
                              - q.get((state, action), 0))
    return q, n

Now, we have one question left to answer: what is `f`? That is, _how_ are we going to act in the environment? What actions should be taken in each state to _learn_? Different strategies yield with different properties!

Right now, let's leave it as is (we will come back to it later) and consider an example _(+ a little bit of helper code for simulation)_!

In [6]:
from math import inf
from time import time
from itertools import count

def simulate(env_ctor, n_iterations=inf, duration=inf, **q_learning_params):
    '''A helper function to train for a fixed number of iterations or fixed time'''
    for param in ('q', 'n'): q_learning_params[param] = q_learning_params.get(param, {})
    start_time = time()
    i = count()
    while time() < start_time + duration and next(i) < n_iterations:
        env = env_ctor()
        q, n = q_learning(env, **q_learning_params)
        q.append()
    return q_learning_params['q'], q_learning_params['n']

#ZC Agent

In [52]:
from random import choice, randrange , random
#6 is OSS , 9 is AB Gate A , 8 is AB gate F , 7 is NB , 11 is HB , 13 is AB Gate B , 14 and 10 are cancelled
class ZCRobot(Environment):
    '''Dynamic Tom & Jerry world'''
    def __init__(self, maxReward, discount , movingReward , actionsLimit):
              self.ResultsMovements =  {1:{ 'SE':3 }
                          , 2: { 'SE':4 }
                          , 3 : {'NW':1 , 'NE':4 , 'SW':17}
                          , 4: {'NW':2 , 'SW':3 , 'NE':5}
                          , 5: {'NE':6 , 'SW':4}
                          , 6: {'SW':5 , 'S':9 , 'E':7}
                          , 7: {'W':6 , 'SW':8 }
                          , 8: {'NE':7}
                          , 9: {'N':6}
                          , 11: {'W':16}
                          , 12: {'NW':15 , 'SE':13}
                          , 13:{'NW':12}
                          , 15: {'N':16 , 'SE':12}
                          , 16: {'S':15 , 'E':11 , 'NE':17}
                          , 17: {'NE':3 , 'SW':16 } }

              import random
              self.state = (random.randint(1, 2) , False , 4 , 4 , 0 , 0) 
              #initial place (1 or 2) -> main gate
              #False: not carrying anything
              #First 4: four remaining items in NB
              #Second 4: four remaning items in HB
              #First 0: Zero delivered items to AB
              #Second 0: Zero delivered items to OSS
              self.maxReward = maxReward #Max reward is when you deliver all items
              self.discount = discount #discout rate
              self.movingReward = movingReward #moving reward is when you Pick/Deliver items
              self.actionsLimit = actionsLimit #limit the number of actions taken in one itteration
    def actions(self):
              if self.state is None: return [] #as defined in q_learning function to terminate the learning
              directions = []
              for i in self.ResultsMovements[self.state[0]]: #get all possible movements from a given location
                directions.append(i) #all possible directions to move
              
              #Now we need to check if we are in NB or HB because in this case the action will be only PickUp !
              #As defined, location 7 is NB and location 11 is HB

              if ( (self.state[0] == 7) and (self.state[1] == False) and (self.state[2] == 1 or self.state[2] == 2 or self.state[2] == 3 or self.state[2] == 4 ) ): return['Pick']
              #if you are in NB , and you are not carrying anythinig , and there are 1 or 2 or 3 or 4 items left in NB, then take an item.

              if ( (self.state[0] == 11) and (self.state[1] == False) and (self.state[3] == 1 or self.state[3] == 2 or self.state[3] == 3 or self.state[3] == 4 ) ): return['Pick']
              #if you are in HB , and you are not carrying anythinig , and there are 1 or 2 or 3 or 4 items left in NB, then take an item.


              #Now we need to check if we are in AB or OSS because in this case the action will be only Deliver !
              #As defined, location 6 is One Stop Shop and location 8,9,13 are AB
              
              if ( (self.state[0] == 6) and (self.state[1] == True) and (self.state[4] == 1 or self.state[4] == 2 or self.state[4] == 3 or self.state[4] == 0 ) ): return['Deliver']
              #if you are in OSS , and you are carrying anythinig , and there are 1 or 2 or 3 or 0 items left in OSS, then deliver an item.                            
              
              
              if ( ( (self.state[0] == 8) or (self.state[0] == 9) or (self.state[0] == 13) )and (self.state[1] == True) and (self.state[5] == 1 or self.state[5] == 2 or self.state[5] == 3 or self.state[5] == 0 ) ): return['Deliver']
              #if you are in AB , and you are carrying anythinig , and there are 1 or 2 or 3 or 0 items left in OSS, then deliver an item.  



              #if you are not in any of the previous buildings , or you are carrying an item , then return list of possible actions
              return directions 
          


    def apply(self, action):
              tempState = list(self.state)
              #We have limits in actions (ex: 1000 , for each action appy, reduce the limit by 1)
              self.actionsLimit = self.actionsLimit - 1

              #if the agent consumed the allowable number of actions then return end the game

              if(self.actionsLimit == 0):
                self.state = None
                return -self.maxReward



              #WON THE GAME
              #If you are in either OSS or AB and all are delivered then the agent won the game
              #AB
              if (( (self.state[0] == 8) or (self.state[0] == 9) or (self.state[0] == 13) ) and (self.state[1] == False) and (self.state[2] == 0 ) and (self.state[3] == 0 )  and (self.state[4] == 4 and (self.state[5] == 4 ) )):
                 self.state = None
                 return self.maxReward

              #OSS
              if ((self.state[0] == 6 ) and (self.state[1] == False) and (self.state[2] == 0 ) and (self.state[3] == 0 )  and (self.state[4] == 4 and (self.state[5] == 4 ) )):
                 self.state = None
                 return self.maxReward

              #Else , if you are taking then update the remaining items in NB or HB
              if ((self.state[0] == 7 ) and (action == 'Pick')): #NB
                  tempState[1] = True #Carrying
                  tempState[2] = tempState[2]-1
                  self.state = tuple(tempState)
                  return self.movingReward


              if ( self.state[0] == 11  ) and (action == 'Pick'): #HB
                  tempState[1] = True #Carrying
                  tempState[3] = tempState[3]-1
                  self.state = tuple(tempState)
                  return self.movingReward



              #Else , if you are Delivering then update the remaining items in AB or OSS
              if ((self.state[0] == 6 ) and (action == 'Deliver')): #OSS
                  tempState[1] = False #Delivering
                  tempState[4] = tempState[4]+1
                  self.state = tuple(tempState)
                  return self.movingReward


              if ( (self.state[0] == 8) or (self.state[0] == 9) or (self.state[0] == 13)  ) and (action == 'Deliver'): #AB
                  tempState[1] = False #Delivering
                  tempState[5] = tempState[5]+1
                  self.state = tuple(tempState)
                  return self.movingReward




              #Else, If you are moving between other roads
              else:
                Num = self.ResultsMovements[tempState[0]][action]
                tempState[0] = Num
                self.state = tuple(tempState)
                return -1 #Negative reward equivalent to the cost between both roads

#Random Q-Learning

In [56]:
q, n = {}, {}
#(self, maxReward, discount , movingReward , actionsLimit):
simulate(lambda: ZCRobot(50,0.1,30,3000), duration=60, q=q, n=n)
simulate(lambda: ZCRobot(50,0.1,30,3000), n_iterations=1, q=q, n=n, verbose=True)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Frontier at step 1751

(2, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 1752

(4, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 1753

(2, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 1754

(4, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 1755

(2, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 1756

(4, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 1757

(2, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 1758

(4, False, 4, 4, 0, 0)


({((1, False, 4, 4, 0, 0), 'SE'): -1.1096774193548387,
  ((3, False, 4, 4, 0, 0), 'NW'): -1.1,
  ((3, False, 4, 4, 0, 0), 'NE'): -1.1098360655737705,
  ((4, False, 4, 4, 0, 0), 'NW'): -1.129035003824836,
  ((2, False, 4, 4, 0, 0), 'SE'): -1.1264567742073273,
  ((4, False, 4, 4, 0, 0), 'SW'): -1.098360655737705,
  ((3, False, 4, 4, 0, 0), 'SW'): -1.0967741935483872,
  ((17, False, 4, 4, 0, 0), 'NE'): -1.1,
  ((4, False, 4, 4, 0, 0), 'NE'): -1.098360655737705,
  ((5, False, 4, 4, 0, 0), 'NE'): -1.0,
  ((6, False, 4, 4, 0, 0), 'SW'): -1.0,
  ((5, False, 4, 4, 0, 0), 'SW'): -1.1,
  ((6, False, 4, 4, 0, 0), 'S'): -1.0,
  ((9, False, 4, 4, 0, 0), 'N'): -1.0,
  ((6, False, 4, 4, 0, 0), 'E'): -1.0,
  ((7, False, 4, 4, 0, 0), 'Pick'): 30.0,
  ((7, True, 3, 4, 0, 0), 'W'): -1.0,
  ((6, True, 3, 4, 0, 0), 'Deliver'): 30.0,
  ((6, False, 3, 4, 1, 0), 'SW'): -1.0,
  ((5, False, 3, 4, 1, 0), 'NE'): -1.0,
  ((6, False, 3, 4, 1, 0), 'S'): -1.0,
  ((9, False, 3, 4, 1, 0), 'N'): -1.0,
  ((6, False, 3, 4

#Greedy Q-Learning

In [54]:
q, n = {}, {}
simulate(lambda: ZCRobot(50,0.1,30,3000), duration=60, q=q, n=n, f=lambda q, n: q)
simulate(lambda: ZCRobot(50,0.1,30,3000), n_iterations=1, q=q, n=n, verbose=True, f=lambda q, n: q)

Frontier at step 1

(2, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 2

(4, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 3

(5, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 4

(6, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 5

(7, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 6

(7, True, 3, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 7

(6, True, 3, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 8

(6, False, 3, 4, 1, 0)
--------------------------------------------------------------------------------
Frontier a

({((2, False, 4, 4, 0, 0), 'SE'): -1.107979797979798,
  ((4, False, 4, 4, 0, 0), 'NW'): -1.1108194824871533,
  ((4, False, 4, 4, 0, 0), 'SW'): -1.1102150537634408,
  ((3, False, 4, 4, 0, 0), 'NW'): -1.110574001217066,
  ((1, False, 4, 4, 0, 0), 'SE'): -1.1080110801108012,
  ((3, False, 4, 4, 0, 0), 'NE'): -1.1096096718143424,
  ((4, False, 4, 4, 0, 0), 'NE'): -1.0797979797979798,
  ((5, False, 4, 4, 0, 0), 'NE'): -0.797979797979798,
  ((6, False, 4, 4, 0, 0), 'SW'): -1.1078794288736118,
  ((5, False, 4, 4, 0, 0), 'SW'): -1.1,
  ((3, False, 4, 4, 0, 0), 'SW'): -1.0801108011080112,
  ((17, False, 4, 4, 0, 0), 'NE'): -1.1,
  ((17, False, 4, 4, 0, 0), 'SW'): -0.8011080110801108,
  ((16, False, 4, 4, 0, 0), 'S'): -1.0967741935483872,
  ((15, False, 4, 4, 0, 0), 'N'): -1.0999471179270228,
  ((16, False, 4, 4, 0, 0), 'E'): 1.988919889198892,
  ((11, False, 4, 4, 0, 0), 'Pick'): 29.88919889198892,
  ((11, True, 4, 3, 0, 0), 'W'): -1.1080110801108012,
  ((16, True, 4, 3, 0, 0), 'S'): -1.0801108

#Exploring Q-Learning

In [57]:
q, n = {}, {}
simulate(lambda: ZCRobot(50,0.1,30,3000), duration=60, q=q, n=n, f=lambda q, n: 1/(n+1))
simulate(lambda: ZCRobot(50,0.1,30,3000), n_iterations=1, q=q, n=n, verbose=True, f=lambda q, n: q)

Frontier at step 1

(1, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 2

(3, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 3

(17, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 4

(16, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 5

(11, False, 4, 4, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 6

(11, True, 4, 3, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 7

(16, True, 4, 3, 0, 0)
--------------------------------------------------------------------------------
Frontier at step 8

(15, True, 4, 3, 0, 0)
--------------------------------------------------------------------------------
Front

({((1, False, 4, 4, 0, 0), 'SE'): -1.1080110801108014,
  ((3, False, 4, 4, 0, 0), 'NW'): -1.1108011080110805,
  ((3, False, 4, 4, 0, 0), 'NE'): -1.1079797979797985,
  ((4, False, 4, 4, 0, 0), 'NW'): -1.1107979797979801,
  ((2, False, 4, 4, 0, 0), 'SE'): -1.1079797979797985,
  ((4, False, 4, 4, 0, 0), 'SW'): -1.1080110801108014,
  ((3, False, 4, 4, 0, 0), 'SW'): -1.0801108011080114,
  ((17, False, 4, 4, 0, 0), 'NE'): -1.1080110801108012,
  ((4, False, 4, 4, 0, 0), 'NE'): -1.0797979797979802,
  ((5, False, 4, 4, 0, 0), 'NE'): -0.7979797979797982,
  ((6, False, 4, 4, 0, 0), 'SW'): -1.07979797979798,
  ((5, False, 4, 4, 0, 0), 'SW'): -1.1079797979797983,
  ((17, False, 4, 4, 0, 0), 'SW'): -0.8011080110801109,
  ((16, False, 4, 4, 0, 0), 'S'): -1.0801108011080112,
  ((15, False, 4, 4, 0, 0), 'N'): -0.8011080110801108,
  ((16, False, 4, 4, 0, 0), 'E'): 1.988919889198892,
  ((11, False, 4, 4, 0, 0), 'Pick'): 29.88919889198892,
  ((11, True, 4, 3, 0, 0), 'W'): -1.1080110801108014,
  ((16, True