# Classes simple_actspace, mdp, maze, maze_mdp

The code below provides the classes to represent environments to evaluate various dynamic programming (DP) and reinforcement learning (RL) algorithms. It is given so that students can focus on DP and RL algorithms, but you can modify it to investigate the effect of environment properties on these algorithms (see [lab instructions](lab_instructions.ipynb)).

The code to display the effect of the algorithms in these environment is in [maze_plotter.ipynb](maze_plotter.ipynb).

A Markov Decision Process is used to describe a reinforcement learning environment. It is defined by a tuple $(S, A, P, r, \gamma)$ where $S$ is the state space, $A$ the action space, $P(state_t,  action_t, state_{t+1})$ the transition function, $r(state_t, action_t)$ the reward function and $\gamma \in [0, 1]$ the discount factor.

In our maze environment, the states are the different cells of the grid, and the possible actions for the agent are going north, south, east or west (resp. [0,1,2,3]). In addition to the transition function, we have an additional distribution $P_0$ that defines the distribution of the first state. By default, it is set to 1.0 in state 0 and 0.0 in the rest of the states (meaning that the agent always starts at state 0). With the deterministic transition function defined here, the actions always lead to the same outcome (deterministic outcome), so going north always leads up, unless the agent is on one of the top states or the cell above is a wall, in which case the agent stays at the same state. The reward function consists of a matrix of shape (state, action).

In practice, when launching an episode, you use the __reset()__ function that draws the first state the agent is in according to either $P_0$ probabilities or uniformly over all the states. This second option has proven more efficient for exploration as it enables the agent to start from a different state at each episode. Once initialization is over, you can use the method __step(u)__ which, given an action $u$, draws the next state according to the distribution $P$ and returns it along with several things: the reward of this step, a boolean stating if the episode is over (either because the agent found itself in a terminal state or because the timeout has been reached) and few information that can be used when debugging.

In order to visualize the environment, you use the __new_render()__ function to initialize the rendering, then __render(V, policy, agent_pos)__ to refresh the maze with either the newly calculated V and the policy, or the Q values, and the current position of the agent. The function __save_fig(title)__ is used to save into disk the last render.

Given a list of the different V or Q values, a list of policies, and the number of your frames, you can generate a video (animation) of your results using the function __create_animation()__. It is particularly useful in the RL functions, where the number of episodes is high and outputting the results during the process makes it last longer. 

Whenever you want to show something on your notebook, use the magic __%matplotlib notebook__ at the beginning of your cell, or just __%matplotlib__ if you want the output to be done on a separate window.

You can see an example of these different output methods in the Q-Learning results visualization implemented in the [reinforcement_learning.ipynb](reinforcement_learning.ipynb) notebook.


In [None]:
import numpy as np
from ipynb.fs.defs.toolbox import N, S, E, W, discreteProb
from ipynb.fs.defs.maze_plotter import maze_plotter # used to plot the maze

    
class simple_actspace(): #class describing the action space of the markov decision process
    def __init__(self, action_list=[], nactions=0):
        if len(action_list) == 0:
            self.actions = np.array([a for a in range(nactions)])
        else:
            self.actions = action_list
            
        self.size = len(self.actions)
        
    def sample(self, prob_list=None): #returns an action drawn according to the prob_list distribution, 
        # if the param is not set, then it is drawn from a uniform distribution 
        if prob_list is None :
            prob_list = np.ones((self.size))/self.size
            
        index = discreteProb(prob_list) 
        return  self.actions[index]
    

        
    
class mdp(): #defines a Markov Decision Process

    def __init__(self, observation_space, action_space, start_distribution, transition_matrix,
                  reward_matrix, plotter, gamma=0.9, terminal_states=[], timeout=50):
        
        self.observation_space = observation_space
        self.terminal_states = terminal_states
        self.action_space = action_space
        self.current_state = -1 #current position of the agent in the maze, it's set by the method reset()
        self.timeout = timeout #maximum length of an episode
        self.timestep = 0 
        self.P0 = start_distribution #distribution used to draw the first state of the agent, used in method reset()
        self.P = transition_matrix
        self.r = reward_matrix
        self.plotter = plotter #used to plot the maze
        self.gamma = gamma #discount factor
        self.last_action_achieved = False #used to tell whether the last state has been reached or not (see done())
    
    

    def reset(self, uniform=False): #initializes an episode and returns the state of the agent
        #if uniform is set to False, the first state is drawn according to the P0 distribution, 
        #else it's drawn on a uniform distribution over all the states
        
        if uniform :
            prob = np.ones((self.observation_space.size))/self.observation_space.size
            self.current_state = discreteProb(prob)
        else :
            self.current_state = discreteProb(self.P0)
            
        self.timestep = 0
        self.last_action_achieved = False
        
        return self.current_state
 
    
    def step(self,u,deviation=0): # performs a step forward in the environment, 
        # if you want to add some noise to the reward, give a value to the deviation param 
        # which represents the mean μ of the normal distribution used to draw the noise 
        
        noise = 0 # = deviation*np.random.randn() # generate noise, see an exercize in mbrl.ipynb
        reward = self.r[self.current_state,u] +noise # r is the reward of the transition, you can add some noise to it 
        
        # the state reached when performing action u from state x is sampled 
        # according to the discrete distribution self.P[x,u,:]
        observation = discreteProb(self.P[self.current_state,u,:]) 
        
        self.timestep += 1 
        
        
        info = {} #can be used when debugging
        info["State transition probabilities"] = self.P[self.current_state,u,:]
        info["reward's noise value"] = noise
        
        self.current_state = observation
        done = self.done() #checks if the episode is over
        
        return [observation,reward,done,info]
    
    
    def done(self): #returns True if the episode is over
        if self.last_action_achieved :
            return True
        if self.current_state in self.terminal_states: #done when a terminal state is reached
            #the terminal states are actually a set of states from which any action leads to an added imaginary state, 
            #the "well", with a reward of 1. To know if the episode is over, we have to check
            #whether the agent is on one of these last states and performed the action that gives it its last reward 
            self.last_action_achieved = True
            
        return self.timestep == self.timeout #done when timeout reached
    
    
    def new_render(self): #initializes a new environment rendering (a plot defined by a figure, an axis...)
        self.plotter.new_render()
    
    def render(self, V=[], policy=[], agent_pos=-1): #outputs the agent in the environment with values V (or Q)
        
        if agent_pos > -1:
            self.plotter.render(agent_state=agent_pos, V=V, policy=policy)
        elif self.current_state > -1:# and not self.last_action_achieved:
            self.plotter.render(agent_state=self.current_state, V=V, policy=policy)
        else :
            self.plotter.render(V=V, policy=policy)
        
    def save_fig(self, title): #saves the current output into the disk
        self.plotter.save_fig(title)
            
    def create_animation(self,V_list=[],policy_list=[],nframes=0): #given a list of V or Q values, a list of policies, 
        # and eventually the number of frames wanted, it generates a video of the different steps
        return self.plotter.create_animation(V_list,policy_list,nframes)
    

class maze(): #describes a maze-like environment
    def __init__(self, width, height, walls=[]):
        self.width = width
        self.height = height
        self.states = np.array([s for s in range(width*height)])
        self.walls = walls
        self.size = width*height
     

    
class maze_mdp(mdp): #defines a Markov Decision Process which observation space is a maze

    def __init__(self, width, height, walls=[], action_list=[], nactions=4,
                 gamma=0.9, timeout=50, start_states=[0], terminal_states=[]):
        #width, height : int numbers defining the maze attributes
        #walls : list of the states that represent walls in our maze environment
        #action_list : list of possible actions
        #nactions : used when action_list is empty, by default there are 4 of them (go north, south, eat or west)
        #gamma : the discount factor of our mdp
        #timeout : defines the length of an episode (max timestep) --see done() function
        #start_states : list that defines the states where the agent can be at the beginning of an episode
        #terminal_states : list that defines the states corresponding to the end of an episode
        #                  (agent reaches a terminal state) --cf. done() function
        
        ###################### State Space ######################
        
        observation_space = maze(width, height, walls)
        
        ###################### Action Space ######################
        
        action_space = simple_actspace(action_list=action_list, nactions=nactions)    
        
        
        ###################### Distribution Over Initial States ######################
        
        start_distribution = np.zeros((observation_space.size)) #distribution over initial states
        
        for state in start_states:
            start_distribution[state] = 1.0/len(start_states)

        ###################### Transition Matrix ######################
        
        transition_matrix = np.empty((observation_space.size+1,action_space.size,observation_space.size+1)) #a "well" state is added that only the terminal states can get into
        
        # Transition Matrix when going north
        transition_matrix[:,N,:] = np.zeros((observation_space.size+1,observation_space.size+1))
        for i in observation_space.states : 
            if i == 0 or i%observation_space.height == 0 or i-1 in observation_space.walls or i in observation_space.walls: #the state doesn't change (highest cells + cells under a wall)
                transition_matrix[:,N,:][i][i] = 1.0
            else : #it goes up
                transition_matrix[:,N,:][i][i-1] = 1.0
        
        # Transition Matrix when going south
        transition_matrix[:,S,:] = np.zeros((observation_space.size+1,observation_space.size+1))
        for i in observation_space.states : 
            if i%observation_space.height == observation_space.height-1 or i+1 in observation_space.walls or i in observation_space.walls: #the state doesn't change (lowest cells + cells above a wall)
                transition_matrix[:,S,:][i][i] = 1.0
            else : #it goes down
                transition_matrix[:,S,:][i][i+1] = 1.0
    
        #self.P[:,S,:][49][50] = 0.2 #example for hacking local probabilities
        #self.P[:,S,:][49][48] = 0.8


        # Transition Matrix when going west
        transition_matrix[:,W,:] = np.zeros((observation_space.size+1,observation_space.size+1))
        for i in observation_space.states : 
            if i<observation_space.height or i-observation_space.height in observation_space.walls or i in observation_space.walls: #state doesn't change (cells on the right side of a wall)
                transition_matrix[:,W,:][i][i] = 1.0
            else : #it goes left
                transition_matrix[:,W,:][i][i-height] = 1.0
        

        # Transition Matrix when going east
        transition_matrix[:,E,:] = np.zeros((observation_space.size+1,observation_space.size+1))
        for i in observation_space.states : 
            if i>observation_space.size-observation_space.height-1 or i+observation_space.height in observation_space.walls or i in observation_space.walls: #state doesn't change (cells on the left side of a wall)
                transition_matrix[:,E,:][i][i] = 1.0
            else : #it goes right
                transition_matrix[:,E,:][i][i+height] = 1.0
                
        # Transition Matrix of final states 
        well = observation_space.size # all the final states' transitions go there
        for s in terminal_states:
            transition_matrix[s,:,:] = 0
            transition_matrix[s,:,well] = 1
            
        
        # Transition Matrix when not moving (action removed from the current version)
        #transition_matrix[:,NoOp,:] = np.eye(observation_space.size)

        ###################### Reward Matrix ######################

        reward_matrix = np.zeros((observation_space.size, action_space.size)) 
        for s in terminal_states:
            reward_matrix[s,:] = 1 # leaving a final state gets the agent a reward of 1
        #reward_matrix[-1][NoOp] = 1.0
        #reward_matrix[25][NoOp] = 0.9
        
        plotter = maze_plotter(observation_space, terminal_states) #renders the environment
        mdp.__init__(self, observation_space, action_space, start_distribution, transition_matrix,
                 reward_matrix, plotter, gamma=gamma, terminal_states=terminal_states, timeout=timeout)

    
    def reset(self, uniform=False): #initializes an episode
        #if uniform is set to False, the first state is drawn from the P0 distribution, 
        #else it is drawn from a uniform distribution over all the states except for walls
        if uniform:
            prob = np.ones((self.observation_space.size))/(self.observation_space.size-len(self.observation_space.walls))
            for state in self.observation_space.walls:
                prob[state]= 0.0 
            self.current_state = discreteProb(prob)
        else :
            self.current_state = discreteProb(self.P0)

        self.timestep = 0
        self.last_action_achieved = False
        return self.current_state
        
        

### Example of using the maze_mdp class

In [None]:
%matplotlib notebook
from ipynb.fs.defs.maze_plotter import maze_plotter # used to visualize the state value and policy evolution

walls = [7,8,9,10,21,27,30,31,32,33,45,46,47]
height = 6
width = 9
m = maze_mdp(width, height, walls=walls) # maze-like MDP definition
m.render()
#m.save_fig("sample_maze.png") #used to save a picture of the maze as a png file
