In [7]:
import numpy as np

class RL_system(object):
    
    def __init__(self, _environment):
        self.environment = _environment
        self.learning_vector = initial_learning_vector() # the list of parameters to learn
        self.eps_greedy = 0 # probability to play a random action
        self.discount_factor = 0.1 # specifies how much long term reward is kept
        self.learning_rate = 1 
        
        
    def learn(self, iterations):
        """
        Performs the learning steps a specified number of times.
        """
        
        for i in range(iterations):
            learning_step()
            if(environment.game_over):
                break
            
        return learning_vector
    
    
    def learning_step(self):
        """
        This is the Q-learning routine.
        """
        
        s = current_state_vector() # current state
        a = policy(s) # action to perform according to the policy
        environment.perform_action(a)
        r = environment.getReward() # gained reward
        update_learning_vector(s,a,r)
    
    
    def policy(self, s):
        """
        Returns the action to perform in the state s according to a policy.
        """ 
        return best_Q_policy(s)
    
    
    def Q(self, s,a):
        """
        This is the Q function, it returns the expected future discounted reward
        for taking action a ∈ A in state s ∈ S.
        """
        return learning_vector.dot(action_state_features_vector(s,a))
    
    
    def update_learning_vector(self, s,a,r):
        """
        This function updates the learning vector.
            a is the last performed action,
            s is the previous state,
            r is the reward that has been generated by performing a in state s. """
        
        max_Q = max([Q(current_state_vector(),action) for action in environment.getActions()])
        difference = r + discount_factor*max_Q - Q(s,a)
        learning_vector += learning_rate*difference*action_state_features_vector(s,a)
         
            
    def best_Q_policy(self, s):
        """
        For a given state It returns the action that maximize the Q_function, but it
        can also return a random action with probability = eps_greedy. 
        """
        
        actions = environment.getActions()
        
        if random_boolean(eps_greedy):
            return np.random.choice(actions)
        
        i = np.argmax([Q(s,a) for a in actions])
        return actions[i]
    
    
    def random_boolean(self, probability_of_true):
        """
        It returns true with the given probability, false otherwise.
        """
        return np.random.random_sample()<probability_of_true
    
    
    def initial_learning_vector(self):
        """
        It returns the initial configuration of the learning vector.
        """
        return np.zeros(len(action_state_features_vector(current_state_vector(),environment.getActions()[0])))
    
    
    def current_state_vector(self):
        """
        It returns the vector of numerical values representing the current state.
        It basically extract the values from the state dictionary of the environment.
        """
        return list(environment.getState().values())
    
    
    def action_state_features_vector(self, a, s):
        """
        It returns a vector of numerical values representing relevant features of the state-action pair.
        It basically extract the values from the psi(a,s) dictionary of the environment.
        """
        return np.array(list(environment.psi(a,s).values()))
    
    
    def reset_environment(self):
        """
        It sets the environment to the initial configuration.
        """
        environment.restart()
    
    
    def reset_learning_vector(self):
        """
        It sets the environment to the initial predefined value.
        """
        learning_vector = initial_learning_vector()
        

In [3]:
from abc import ABC, abstractmethod

class RL_Environment(ABC):
    
    @abstractmethod
    def getState(self):
        """
        Should return the current environment state as a dictionary of (feature name - feature value).
        """
        pass
    
    @abstractmethod
    def getActions(self):
        """
        Should return the list of all possible actions.
        """
        pass
    
    @abstractmethod
    def psi(self, s,a):
        # maybe this function should belong to the RL_system, since what is relevant may depend
        # on the learning procedure more than being an absolute property of the environment.
        """
        Should return relevant features of the given state-action pair 
        as a dictionary of (feature name - feature value).
        """
        pass
    
    @abstractmethod
    def getReward(self):
        """
        Should return the last reward received.
        """
        pass
    
    @abstractmethod
    def perform_action(self, a):
        """
        The environment perform the action a and it's state changes.
        """
        pass
    
    @abstractmethod
    def restart(self):
        """
        Set the environment to the initial configuration.
        """
        pass
    
    @abstractmethod
    def game_over(self):
        """
        Returns true if the game is over.
        """
        pass


In [9]:
import gym

class pacman_RL_environment(RL_Environment):
    
    def __init__(self):
        self.env = gym.make('MsPacman-ram-v0')
        
        self.state = self.env.reset() # env ram representation of the current state
        self.current_reward = 0 # last reward received
        self.game_over = False
        
        # the features_extractor is here because it has (and need) a state
        self.features_extractor = Pacman_features_extractor(self.getCurrentScreen()) 
        
        
    def getState(self):
        """ 
        Returns the current environment state as a dictionary of (feature name - feature value).
        """
        
        # some examples of state features 
        features = {} 
        
        for entity in ['ghosts', 'foods', 'special_food']:
            for movement in ['up', 'down' , 'right', 'left']:
                features["nearest_" + entity + "_distance_after_going_" + movement] \
                = features_extractor.nearest_entity_distance_from_pacman_after_movement(movement, entity)
                
        #features["ghost_are_scared"] = features_extractor.ghost_are_scared()
        # features["actual_time_step"] =
        # features["last_scared_ghost_time_step"] = 
        
        return features
    
    
    def getActions(self):
        """
        Returns the list of all possible actions as strings.
        """
        actions_dict().keys()
    
    
    def psi(self, s, a):
        """
        Returns relevant features of the given state-action pair 
        as a dictionary of (feature name - feature value).
        """
        
        # these are just examples taken from the paper
        features = {}
        
        for entity in ['ghosts', 'foods', 'special_food']:
            features["distance_of_the_closest_" + entity] \
            = s["nearest_" + entity + "_distance_after_going_" + a]
            
            
        #features["distance_of_the_closest_food"] = distance_of_the_next_closest_food(s,a)
        #features["distance_of_the_closest_ghost"] = distance_of_the_closest_ghost(s,a)
        
        #features["food_will_be_eaten"] = food_will_be_eaten(s,a)
        #features["ghost_collision_is_possible"] = ghost_collision_is_possible(s,a)
        
        return features
    
    
    def getReward(self):
        """
        Returns the last reward received.
        """
        return self.current_reward
    
    
    def perform_action(self, a):
        """
        The environment perform the action a (given as a string) and it's state changes.
        """
        
        encoded_action = actions_dict()[a] # translate the action from string to number
        self.state, self.current_reward, self.game_over, info = env.step(encoded_action)
        
        # then we have to update the features extractor, 
        # since features extraction doesn't depend only on the current screen
        self.features_extractor.update(self.getCurrentScreen())
        
    
    def restart(self):
        """
        Set the environment to the initial configuration.
        """
        
        self.state = env.reset()
        initial_screen = env.env.ale.getScreen().reshape(210, 160)
        self.features_extractor = Pacman_features_extractor(initial_screen)
        
    
    def game_over(self):
        """
        Returns true if the game is over.
        """
        return self.game_over
        
        
    print("Warning: actions_dict() method has to be corrected. ")
    def actions_dict(self):
        """
        Returns a dictionary of (action name - action encoded).
        The encoding is needed to give the commands to the env.
        """
        
        # I don't remember the corret mapping, please correct me
        actions_d = {"up":1, "down":4, "right":2, "left":0}
        # we really need the others hybrid directions-commands ?
        return actions_d
    
    
    def getCurrentScreen(self):
        """
        Returns the current game screen.
        """
        return env.env.ale.getScreen().reshape(210, 160)
        

