<a href="https://colab.research.google.com/github/lucasjirwin/Meta-Learning-aspiration-RL-agent/blob/main/RL_aspiration_metaLearner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Senior Thesis Project:** Developing a meta-learning RL agent which sets its own aspiration level 



*   **DeepGridWorld class:** contains the grid-world environment with methods for taking a step in the environment, getting available actions and rewards, checking if a state is terminal and printing the agent on the map.
*   **DeepQ_Agent class:** contains the Q-learning agent with methods for choosing an action, updating the Q-table (learning), and getting the aspiration.  
*   **play function:** Runs iterations of the agent on the environment and updates the Q-table if required. 


**TODO:**
*   **Meta-learner:** PyTorch module, backpropagation through the Q_table (modelled as a tensor). Inputs: aspiration, weight_1, weight_2. Outputs: average cumulative reward 







In [1]:
import numpy as np 
import torch 
import operator
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
class DeepGridWorld:
    ## Initialise starting data
    def __init__(self):
        # Set information about the gridworld
        self.height = 5
        self.width = 5
        self.grid = np.zeros(self.height * self.width) -1
        
        # Set random start location for the agent 
        self.current_location = (np.random.randint(20,24)) #e1
        
        # Set locations for the bomb and the gold
        self.bomb_location = 8 # e2
        self.gold_location = 3 # e3
        self.terminal_states = [self.bomb_location, self.gold_location]
        
        # Set grid rewards for special cells
        self.grid[self.bomb_location] = -10
        self.grid[self.gold_location] = 10
        
        # Set available actions
        self.actions = [0, 1, 2, 3] # e4
    
        
    ## Put methods here:
    def get_available_actions(self):
        """Returns possible actions"""
        return self.actions
    
    def agent_on_map(self):
        """Prints out current location of the agent on the grid (used for debugging)"""
        grid = np.zeros(( self.height, self.width))
        grid[self.current_location] = 1 #e5
        return grid
    
    def get_reward(self, new_location):
        """Returns the reward for an input position"""
        return self.grid[new_location] # e6
        
    
    def make_step(self, action):
        """Moves the agent in the specified direction. If agent is at a border, agent stays still
        but takes negative reward. Function returns the reward for the move."""
        # Store previous location
        last_location = self.current_location
        
        # UP
        if action == 0: # e7
            # If agent is at the top, stay still, collect reward
            if last_location < self.width: #e8 
                reward = self.get_reward(last_location)
            else:
                self.current_location = (self.current_location - self.width) # e9
                reward = self.get_reward(self.current_location)
        
        # DOWN
        elif action == 1:
            # If agent is at bottom, stay still, collect reward
            if last_location >= ((self.width * self.height) - self.width): #e10
                reward = self.get_reward(last_location)
            else:
                self.current_location = (self.current_location + self.width) # e11
                reward = self.get_reward(self.current_location)
            
        # LEFT
        elif action == 2:
            # If agent is at the left, stay still, collect reward
            if last_location % self.height == 0: # TODO e12
                reward = self.get_reward(last_location) 
            else:
                self.current_location = (self.current_location - 1) # e13
                reward = self.get_reward(self.current_location)

        # RIGHT
        elif action == 3:
            # If agent is at the right, stay still, collect reward
            if last_location % self.height == self.height - 1: # TODO e14
                reward = self.get_reward(last_location)
            else:
                self.current_location = (self.current_location + 1) # e15
                reward = self.get_reward(self.current_location)
                
        return reward
    
    def check_state(self):
        """Check if the agent is in a terminal state (gold or bomb), if so return 'TERMINAL'"""
        if self.current_location in self.terminal_states:
            return 'TERMINAL'

In [3]:
class RandomAgent():        
    # Choose a random action
    def choose_action(self, available_actions):
        """Returns a random choice of the available actions"""
        return np.random.choice(available_actions)   

In [4]:
class DeepQ_Agent():
    # Intialise
    def __init__(self, environment, epsilon=0.05, alpha=0.1, gamma=1, aspiration = 0, weight_1 = 1, weight_2 = 1): # add default aspiration and weights
        self.environment = environment
        # self.q_table = dict() # Store all Q-values in dictionary of dictionaries 
        # for x in range(environment.height): # Loop through all possible grid spaces, create sub-dictionary for each
        #     for y in range(environment.width):
        #         self.q_table[(x,y)] = {'UP':0, 'DOWN':0, 'LEFT':0, 'RIGHT':0} # Populate sub-dictionary with zero values for possible moves

        self.q_table = torch.zeros((environment.height*environment.width, 4)) #q1
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.aspiration = aspiration 
        self.weight_1 = weight_1
        self.weight_2 = weight_2
    
    # Choose the action e-greedily 
    def choose_action(self, available_actions):
        """Returns the optimal action from Q-Value table. If multiple optimal actions, chooses random choice.
        Will make an exploratory random action dependent on epsilon."""
        if np.random.uniform(0,1) < self.epsilon:
            action = available_actions[np.random.randint(0, len(available_actions))]
        else:
            q_values_of_state = self.q_table[self.environment.current_location] # change to 1D (e1 in env)
            maxValue = torch.max(q_values_of_state) # q2
            #action = np.random.choice([k for k, v in q_values_of_state.items() if v == maxValue]) # tricky index of torch.max value and random choice
            res = (q_values_of_state == maxValue).nonzero().numpy() # q3 get indices of highest value actions and convert to np array
            action = np.random.choice(res[:,0]) # choose the action randomly if there is a tie (slice for 1D array)
        
        return action
    
    # Update the Q-table based on the current observation 
    def learn(self, old_state, reward, new_state, action):
        """Updates the Q-value table using Q-learning"""
        q_values_of_state = self.q_table[new_state] # index into table self.q_table[] single index numbers not tuples
        max_q_value_in_new_state = torch.max(q_values_of_state) # q4
        current_q_value = self.q_table[old_state][action]
        # define reward with aspiration = w_1 * reward + w_2 * (reward - aspiration)
        reward = self.get_aspiration(reward)
        self.q_table[old_state][action] = (1 - self.alpha) * current_q_value + self.alpha * (reward + self.gamma * max_q_value_in_new_state)
    
    # Transform the reward function using the aspiration formula 
    def get_aspiration(self, reward):
      return self.weight_1 * reward + self.weight_2 * (reward - self.aspiration)


In [5]:
def play(environment, agent, trials=500, max_steps_per_episode=1000, learn=False):
    """The play function runs iterations and updates Q-values if desired."""
    reward_per_episode = [] # Initialise performance log
    
    for trial in range(trials): # Run trials
        cumulative_reward = 0 # Initialise values of each game
        step = 0
        game_over = False
        while step < max_steps_per_episode and game_over != True: # Run until max steps or until game is finished
            old_state = environment.current_location
            action = agent.choose_action(environment.actions) 
            reward = environment.make_step(action)
            new_state = environment.current_location
            
            if learn == True: # Update Q-values if learning is specified
                agent.learn(old_state, reward, new_state, action)
                
            cumulative_reward += reward
            step += 1
            
            if environment.check_state() == 'TERMINAL': # If game is in terminal state, game over and start next trial
                environment.__init__()
                game_over = True     
                
        reward_per_episode.append(cumulative_reward) # Append reward for current trial to performance log
        
    return reward_per_episode # Return performance log

In [7]:
environment = DeepGridWorld()
agent = DeepQ_Agent(environment, aspiration = 0.08)
av_reward = 0 
for iteration in range(100):
  reward_per_episode = play(environment, agent, trials = 500, learn = True)

  #plt.plot(reward_per_episode)
  av_reward += sum(reward_per_episode)

print(av_reward/100)

2171.42


### Testing aspiration (100 iterations, 500 trials)  
2184.75, aspiration = 0.5 \\
2182.38       , aspiration = 0.2 \\
2195.28      , aspiration = 0.08 \\
2181.2      , aspiration = 0.05 \\
2188.52, aspiration = 0 
