# Fitted Q-learning in continuous state spaces

In this assignment, we consider an appliction of neural fitted Q-learning to a navigation problem on a grid. Our agent can move in 4 directions: up, down, left, right. The goal is a randomly picked cell of the grid. Instead of directly perceiving its position, the agent gets a 10-dimensional observation vector from the environment. Note that this way we deal with a continuous state space. Therefore, the goal of the assignment is to implement neural fitted Q-learning.

Hint: use PCA to reduce the dimensionality of the observations.

First, let us import the necessary packages. We use PyBrain for learning, as it has the implementation of NFQ.

In [8]:
from scipy import zeros, clip, asarray
from enum import IntEnum
import random
import numpy as np

from pybrain.rl.environments.environment import Environment
from pybrain.rl.environments import EpisodicTask

In PyBrain, A RL problem can be specified as a pair of classes -- the Task and the Environment. 

Task: This class implements the nature of the task, and focuses on the reward aspect. It decides what is a successful action, and what is a less successful one. 
Environment: This class manages the inputs (observations) and outputs (actions performed) that go to and come from the agent.

We provide our own derivations of these classes to describe the behaviour of our system.

In [9]:
class Action(IntEnum):
    UP = 0
    DOWN = 1
    LEFT = 2
    RIGHT = 3

This class specifies the behaviour of the environment (the grid). 

In [10]:
class NavigationEnv(Environment):
    def __init__(self, height, width, measurement_model):   
        self.width = width
        self.height = height
        self.position = (int(self.width/2), int(self.height/2))
        self.measurement_model = measurement_model

    
    def getSensors(self):
        """ 
        the currently visible state of the world 
            
        """ 
        observation = np.dot(self.measurement_model, np.array(self.position))        
        return  observation
        
        
    
    def getPosition(self):
        return  np.array(self.position)                
                    
                    
    def performAction(self, action):
        """ 
        perform an action on the world that changes it's internal state (makes a move)
            
        """
        #moving the agent with respect to the action 
        if action == Action.UP:
            self.position = (self.position[0], self.position[1] - 1)
        elif action == Action.DOWN:
            self.position = (self.position[0], self.position[1] + 1)
        elif action == Action.LEFT:
            self.position = (self.position[0] - 1, self.position[1])
        elif action == Action.RIGHT:
            self.position = (self.position[0] + 1, self.position[1])
   
        #if the agent has reached the boundary of the grid, it cannot move further    
        if self.position[0] < 0:
            self.position = (0, self.position[1])
        elif self.position[1] < 0:
            self.position = (self.position[0], 0)
        elif self.position[0] >= self.width:
            self.position = (self.width-1, self.position[1])
        elif self.position[1] >= self.height:
            self.position = (self.position[0], self.height-1)
                 
    def reset(self):
        self.position = (int(self.width/2), int(self.height/2))


This class establishes the relationship between the agent and the environment -- i.e., specifies the task.  

In [11]:
class NavigationTask(EpisodicTask):
    def __init__(self, environment, goal):
        self.env = environment
        self.goal = goal
        self.t = 0
        self.episode_length = 250

    def performAction(self, action):
    #updates the timer and changes the position of the agent with respect to the action
        self.t += 1       
        self.env.performAction(action)
        
    def getObservation(self):
    #gets an observation from the environment
    #(you can modify this method if you wish to reduce the dimensionality of the observations)
        sensors = self.env.getSensors()
        return sensors
    
    def getReward(self):
    #gives 10000 points if the goal is reached and -1 otherwise
        position = self.env.getPosition() 
        if position[0] == self.goal[0] and position[1] == self.goal[1]:
            reward = 1000
        else:
            reward = -1
        return reward
 
        
    def reset(self):    
    #resets the timer and puts the agent to the initial position
        self.env.reset()
        self.t = 0 
        
        
    def isFinished(self):
    #this is method should return True when the episode (a walk in the grid) is finished -- i.e., when the goal is reached or the time is out

        position = self.env.getPosition()
        if position[0] == self.goal[0] and position[1] == self.goal[1]:
            return True
        if self.t >= self.episode_length:
            return True
            
        return False


Here we initialize our problem and create the environment

In [12]:
#set the size of the grid
width = 10
height = 10
#set the dimensionality of the distorted observations
measurement_dimension = 10
#the distortion model -- a random vector
measurement_model = np.random.rand(measurement_dimension, 2)
#create the environment
env = NavigationEnv(height, width, measurement_model)
#set the goal
goal = (random.randint(0, width-1), random.randint(0, height-1))
#initialize the task
task = NavigationTask(env, goal) 

In [None]:
As an example of a function approximator, we provide you with an implementation of a linear regressor:

In [13]:
class LinearRegressor(object):

    def __init__(self, measurement_dimension):

        self.p = np.zeros((4, measurement_dimension+1))
    
    def fit(self, measurements, actions, total_rewards):

        for i in range(0, 4):
            inds = np.where(actions == i)[0]
            N = len(inds)
            A = np.hstack((measurements[inds], np.ones((N, 1))))
            b = total_rewards[inds]
            self.p[i] = np.linalg.lstsq(A, b)[0]

    def predict(self, measurement):

        predicted_rewards = np.dot(self.p, np.append(measurement, 1.0))
        return predicted_rewards


With that done, we can now apply fitted Q-learning with a linear regressor. 
Below we perform 100 episodes, where each epsiode consists of a random walk (consisting of no more than N steps). 
If the goal is reached, we interrupt the episode.

In [30]:
epsilon = 0.1 # the epsilon value for greedy exploration
measurements = None # this will contain all of our measurements
actions = None # this will contain all of our actions
total_rewards = None # this will contain all of our future rewards
regressor = LinearRegressor(measurement_dimension)
reward_history = []

for n in range(0, 100):
   run_measurements = []
   run_actions = []
   total_reward = 0
   # just do a random walk
   while True:
       u = random.random()
       measurement = env.getSensors()
       if u > epsilon and n > 5:
          predicted_rewards = regressor.predict(measurement)
          action = Action(np.argmax(predicted_rewards))
       else:
          action = Action(random.randint(0, 3))
       task.performAction(action)
       reward = task.getReward()
       total_reward += reward
       run_actions.append(int(action))  
       run_measurements.append(measurement)
       if task.isFinished():
          task.reset()
          break

   episode_len = len(run_measurements)

   if measurements is None:
      measurements = np.vstack(run_measurements)
      actions = np.array(run_actions, dtype=int)
      total_rewards = total_reward*np.ones((episode_len,))
   else:
      measurements = np.concatenate((measurements, np.vstack(run_measurements)), 0)
      actions = np.concatenate((actions, np.array(run_actions, dtype=int)), 0)
      total_rewards = np.concatenate((total_rewards, total_reward*np.ones((episode_len,))), 0)

   reward_history.append(total_reward)
   # here, we should update our estimates of the q-values
   regressor.fit(measurements, actions, total_rewards)

print("Reward history:")
print(reward_history)
print("Mean reward:")
print(np.mean(reward_history))

Reward history:
[942, -250, 845, 891, -250, -250, 778, -250, 865, 973, 812, 806, 875, -250, -250, 897, 821, 896, 865, 963, 784, 955, -250, 986, 933, 927, 974, 992, 978, 987, 987, 992, 990, 992, 990, 986, 990, 988, 988, 990, 992, 990, 990, 992, 990, 988, 992, 988, 992, 990, 990, 992, 992, 990, 990, 992, 990, 983, 945, 963, 955, 939, 990, 986, 992, 990, 976, 976, 978, 937, 980, 968, 908, 979, 919, 992, 992, 977, 979, 982, 961, 980, 972, 974, 899, 926, 992, 925, 987, 955, 932, 986, 984, 968, 988, 951, 974, 979, 988, 974]
Mean reward:
873.09


Now, your task is to apply neural fitted Q-learning to the problem. 

Hint 1: use PyBrain -- it has all you need! Class ActionValueNetwork implements a neural network, and class NFQ implements the algorithm itself.

Hint 2: You might want to make the episodes longer to give your network more experience. For that, you can modify 
the value episode_length of the NavigationTask class

Hint 3: You might also want to make your task simpler by reducing the dimensionality of your measurements (e.g., use PCA). For that, you can possibly modify the getObservation() method of the NavigationTask class.

Hint 4: Finally, try to play around with the reward function -- method getReward of the NavigationTask class.