# Pessimistic Neighbourhood Aggregation for States in Reinforcement Learning

*Author: Maleakhi Agung Wijaya  
Supervisors: Marcus Hutter, Sultan Javed Majeed  
Date Created: 21/12/2017*

In [1]:
import random
import math
import matplotlib.pyplot as plt
import numpy as np

## Mountain Car Environment

**Mountain Car** is a standard testing domain in Reinforcement Learning, in which an under-powered car must drive up a steep hill. Since gravity is stronger than the car's engine, even at full throttle, the car cannot simply accelerate up the steep slope. The car is situated in a valley and must learn to leverage potential energy by driving up the opposite hill before the car is able to make it to the goal at the top of the rightmost hill.

**Technical Details**
- *State:* feature vectors consisting of velocity and position represented by an array [velocity, position]
- *Reward:* -1 for every step taken, 0 for achieving the goal
- *Action:* (left, neutral, right) represented by (-1, 0, 1)
- *Initial state:* velocity = 0.0, position = -0.5 represented by [0.0, -0.5]
- *Terminal state:* position >= 0.6
- *Boundaries:* velocity = (-0.07, 0.07), position = (-1.2, 0.6)
- *Update function:* velocity = velocity + (Action) \* 0.001 + cos(3\*Position) * (-0.0025), position = position + velocity

In [2]:
class MountainCarEnvironment:
    """
    Implementation of Sutton & Barto (1998) Mountain Car Problem environment.
    """
    velocity_boundaries = (-0.07, 0.07)
    position_boundaries = (-1.2, 0.6)  
   
    # Constructor for MountainCarEnvironment
    # Input: agent for the MountainCarEnvironment
    # Output: MountainCarEnvironment object
    def __init__(self, car):
        self.car = car
        self.reset()
        
    # Compute next state (feature)
    # Output: [new velocity, new position]
    def nextState(self, action):
        # Get current state (velocity, position) and the action chosen by the agent
        velocity = self.car.state[0]
        position = self.car.state[1]
        
        # Calculate the new velocity and new position
        velocity += action * 0.001 + math.cos(3*position) * (-0.0025)
        # Consider boundary for velocity
        if (velocity < MountainCarEnvironment.velocity_boundaries[0]):
            velocity = MountainCarEnvironment.velocity_boundaries[0]
        elif (velocity > MountainCarEnvironment.velocity_boundaries[1]):
            velocity = MountainCarEnvironment.velocity_boundaries[1]
            
        position += velocity
        # Consider boundary for position
        if (position < MountainCarEnvironment.position_boundaries[0]):
            position = MountainCarEnvironment.position_boundaries[0]
        elif (position > MountainCarEnvironment.position_boundaries[1]):
            position = MountainCarEnvironment.position_boundaries[1]
        
        new_state = [velocity, position]
        return(new_state)
    
    # Reset to the initial state    
    def reset(self):
        self.car.state[0] = 0.0
        self.car.state[1] = -0.5
        
    # Give reward for each of the chosen action, depending on what the next state that the agent end up in
    # Output: terminal state = 0, non-terminal state = -1
    def calculateReward(self):
        # Get current position of the agent
        position = self.car.state[1]
        
        # Determine the reward given
        if (position >= 0.6):
            return(0)
        else:
            return(-1)

## KNN-TD Agent

**kNN-TD** combines the concept of *K-Nearest Neighbours* and *TD-Learning* to learn and evaluate Q values in both continuous and discrete state space RL problems. This method is especially useful in continuous states RL problems as the number of (state, action) pairs is very large and thus impossible to store and learn this information. By choosing a particular k-values and decided some initial points over continuous states, one can estimate Q values based on calculated the weighted average of Q values of the k-nearest neighbours for the state that the agent are currently in and use that values to decide the next move using some decision methods (i.e. UCB or epsilon-greedy). As for the learning process, one can update all of the k-nearest neighbours that contribute for the Q calculation.

**Algorithm:**
1. Cover the whole state space by some initial Q(s,a) pairs, possibly scatter it uniformly across the whole state space and give an initial value of 0  
2. When an agent in a particular state, get the feature vectors representing the state and possible actions from the state
3. For each possible action from the state, calculate Q(s,a) pairs by taking the expected value from previous Q values based on k-nearest neighbours of a particular action.  
*Steps for k-nearest neighbours:*
    - Standardise every feature in the feature vectors to (-1, 1) or other ranges to make sure that 1 feature scaling not dominate the distance calculation
    - Calculate the distance between current state and all of other points using distance formula (i.e. Euclidean distance) and store the k-nearest neighbours to knn vector, and it's distance
    - Determine the weight (p(x)) for the expected value by using the inverse of the distance
    - Estimate the Q(s,a) pairs using expectation formula using the weight and previous Q values of the kNN (average method)
4. Using epsilon greedy decision method choose the next move
5. Observe the reward and update the Q values for all of the neighbours using SARSA or Q Learning. (on the code below, I use Q Learning)
6. Repeat step 2-5

In [3]:
class KNNAgent:
    """
    Implementation of agent (car) that will be used in the Mountain Car Environment using the kNN-TD underlying algorithm
    """
    
    # Constructor
    # Input: size of the storage for previous Q values, parameters for how many neighbours which the agent will choose
    def __init__(self, size, k):
        self.state = [0.0, -0.5]
        self.actions = [-1, 0, 1]
        self.q_storage = []
        self.k = k # fixed number of nearest neighbours that we will used
        self.alpha = 0.5 # choose fixed alpha, but we can varied alpha later
        self.gamma = 1
        
        # Storage of the k nearest neighbour (data) and weight (inverse of distance) for a particular step
        self.knn = []
        self.weight = []
        
        # Initialise the storage with random point 
        for i in range(size):
            initial_value = -1
            initial_action = random.randint(-1, 1)
            initial_state = [random.uniform(-0.07, 0.07), random.uniform(-1.2, 0.6)]
            
            # Each data on the array will consist of state, action pair + value
            data = {"state": initial_state, "value": initial_value, "action": initial_action}
            self.q_storage.append(data)
    
    # Find all index for a given value
    # Input: value, list to search
    # Output: list of all index where you find that value on the list
    def findAllIndex(self, value, list_value):
        indices = []
        for i in range(len(list_value)):
              if (value == list_value[i]):
                    indices.append(i)
        
        return indices
    
    # Standardise feature vector given
    # Input: feature vector to be standardised
    # Output: standardised feature vector
    def standardiseState(self, state):
        standardised_state = []
        standardised_velocity = 2 * ((state[0]+0.07) / (0.07+0.07)) - 1
        standardised_position = 2 * ((state[1]+1.2) / (0.6+1.2)) - 1
        standardised_state.append(standardised_velocity)
        standardised_state.append(standardised_position)
        
        return(standardised_state)
    
    # Calculate Euclidean distance between 2 vectors
    # Input: 2 feature vectors
    # Output: distance between them
    def calculateDistance(self, vector1, vector2):
        return(math.sqrt((vector1[0]-vector2[0])**2 + (vector1[1]-vector2[1])**2))
    
    # Calculate total weight
    # Input: list of weights
    # Output: total weight
    def calculateTotalWeight(self, weight_list):
        total_weight = 0
        for i in range(len(weight_list)):
            total_weight += weight_list[i][2]
        
        return(total_weight)
    
    # Apply the kNN algorithm for feature vector and store the data point on the neighbours array
    # Input: feature vector of current state, actions array consisting of all possible actions, list that will store knn data and weights data
    # Output: vector containing the value of taking each action (left, neutral, right)
    def kNNTD(self, state, actions, knn_list, weight_list):
        approximate_action = []
        
        # Get the standardised version of state
        standardised_state = self.standardiseState(state)
        
        # Loop through every element in the storage array and only calculate for particular action
        for action in actions:
            temp = [] # array consisting of tuple (distance, original index, weight) for each point in the q_storage
            for i in range(len(self.q_storage)):
                data = self.q_storage[i]
                # Only want to calculate the nearest neighbour state which has the same action
                if (data["action"] == action):
                    vector_2 = data["state"]
                    standardised_vector_2 = self.standardiseState(vector_2)
                    distance = self.calculateDistance(standardised_state, standardised_vector_2)
                    index = i
                    weight = 1 / (1+distance**2)
            
                    # Create the tuple and append that to temp
                    temp.append(tuple((distance, index, weight)))
                else:
                    continue
        
            # After we finish looping through all of the point and calculating the standardise distance,
            # Sort the tuple based on the distance and only take k of it and append that to the neighbours array
            # We also need to calculate the total weight to make it into valid probability that we can compute it's expectation
            sorted_temp = sorted(temp, key=lambda x: x[0])
            for i in range(self.k):
                try:
                    weight_list.append(sorted_temp[i])
                    knn_list.append(self.q_storage[sorted_temp[i][1]])
                except IndexError:
                    sys.exit(0)
            
            # Calculate the expected value of the action and append it to the approximate_action array
            expected_value = 0
            total_weight = self.calculateTotalWeight(weight_list[(action+1)*self.k:(action+1)*self.k + self.k])
            for i in range((action+1)*self.k, (action+1)*self.k + self.k):
                weight = weight_list[i][2]
                probability = weight / total_weight
                expected_value += probability * knn_list[i]["value"]
                
            approximate_action.append(expected_value)
        
        return(approximate_action)
    
    # Select which action to choose, whether left, neutral, or right (using epsilon greedy)
    # Output: -1 (left), 0 (neutral), 1 (right)
    def selectAction(self):
        # First call the knn-td algorithm to determine the value of each Q(s,a) pairs
        action_value = self.kNNTD(self.state, self.actions, self.knn, self.weight)
        
        # Use the epsilon-greedy method to choose value
        epsilon = 0.1
        random_number = random.uniform(0.0, 1.0)
        if (random_number <= epsilon):
            action_chosen = random.randint(-1, 1)
        else:
            # Return the action with highest Q(s,a)
            possible_index = self.findAllIndex(max(action_value), action_value)
            action_chosen = possible_index[random.randrange(len(possible_index))] - 1
        
        # Only store chosen data in the knn and weight list
        # Clearance step
        chosen_knn = []
        chosen_weight = []
        for i in range(self.k*(action_chosen+1), self.k*(action_chosen+1)+self.k):
            chosen_knn.append(self.knn[i])
            chosen_weight.append(self.weight[i])
        self.knn = chosen_knn
        self.weight = chosen_weight

        return action_chosen
    
    # Calculate TD target based on Q Learning/ SARSAMAX
    # Input: Immediate reward based on what the environment gave
    # Output: TD target based on off policy Q learning
    def calculateTDTarget(self, immediate_reward):
        # Consider condition on the final state, return 0 immediately
        if (immediate_reward == 0):
            return(immediate_reward)
        
        knn_prime = []
        weight_prime = []
        action_value = self.kNNTD(self.state, self.actions, knn_prime, weight_prime)
        
        return(immediate_reward + self.gamma*max(action_value))
    
    # Q learning TD updates on every neighbours on the kNN based on the contribution that are calculated using probability weight
    # Input: Immediate reward based on what the environment gave
    def TDUpdate(self, immediate_reward):
        # First, calculate the TD target
        td_target = self.calculateTDTarget(immediate_reward)
        
        # Iterate every kNN and update using Q learning method based on the weighting
        total_weight = self.calculateTotalWeight(self.weight)
        for i in range(len(self.weight)):
            index = self.weight[i][1]
            probability = self.weight[i][2] / total_weight
            
            # Begin updating
            td_error = td_target - self.q_storage[index]["value"]
            self.q_storage[index]["value"] = self.q_storage[index]["value"] + self.alpha*td_error*probability
        
        self.cleanList() # clean list to prepare for another step
            
    # Clear the knn list and also the weight list
    def cleanList(self):
        self.knn = []
        self.weight = []

## KNN Main Function

**KNN Main function** is responsible for initiating the KNN agent, environment and handling agent-environment interaction. It consists of a non-terminate inner loop that direct agent decision while also giving reward and next state from the environment. This inner loop will only break after the agent successfully get out of the environment, which in this case the mountain or if it is taking too long to converge. The outer loop can also be created to control the number of episodes which the agent will perform before the main function ends.

In [None]:
"""
Main Function script to test kNN agent
"""

# Initialise the environment and the agent
size = 1000 # size of the q_storage 
k = 50 # knn parameter
agent = KNNAgent(size, k)
mountain_car_environment = MountainCarEnvironment(agent)

# Iterate the process, train the agent (training_iteration episodes)
training_iteration = 1000
for i in range(training_iteration):
    step = 0
    mountain_car_environment.reset()
    while (True):
        action = agent.selectAction()
        step += 1
        next_state = mountain_car_environment.nextState(action)
    
        # Change agent current state and getting reward
        agent.state = next_state
        immediate_reward = mountain_car_environment.calculateReward()
    
        # Test for successful learning
        if (immediate_reward == 0):
            print("Successfully get out of the mountain {} times. Steps taken for this episode: {}".format(i + 1, step))
            agent.TDUpdate(immediate_reward)
            break
        
        # Update using Q Learning and kNN
        agent.TDUpdate(immediate_reward)
        
        # Prevent too long convergence        
        if (step > 100000):
            print("Too many steps in this episode.")
            break

## PNA Agent

In [None]:
class PNAAgent:
    """
    Implementation of agent (car) that will be used in the Mountain Car Environment using the PNA underlying algorithm
    """
    
    # Constructor
    # Input: size of the storage for previous Q values, parameters for how many neighbours which the agent will choose
    def __init__(self, size):
        self.state = [0.0, -0.5]
        self.actions = [-1, 0, 1]
        self.q_storage = []
        self.k = 0 # fixed number of nearest neighbours that we will used
        self.alpha = 0.5 # choose fixed alpha, but we can varied alpha later
        
        # Storage of the k nearest neighbour (data) and weight (inverse of distance) for a particular step
        self.knn = []
        self.weight = []
        self.c = 0.5 # UCB selection constant
        self.k_history = [] # used to store history of k chosen for each action
        
        # Initialise the storage with random point 
        for i in range(size):
            initial_value = -1
            initial_action = random.randint(-1, 1)
            initial_state = [random.uniform(-0.07, 0.07), random.uniform(-1.2, 0.6)]
            
            # Each data on the array will consist of state, action pair + value
            data = {"state": initial_state, "value": initial_value, "action": initial_action}
            self.q_storage.append(data)
    
    # Find all index for a given value
    # Input: value, list to search
    # Output: list of all index where you find that value on the list
    def findAllIndex(self, value, list_value):
        indices = []
        for i in range(len(list_value)):
              if (value == list_value[i]):
                    indices.append(i)
        
        return indices
    
    # Standardise feature vector given
    # Input: feature vector to be standardised
    # Output: standardised feature vector
    def standardiseState(self, state):
        standardised_state = []
        standardised_velocity = 2 * ((state[0]+0.07) / (0.07+0.07)) - 1
        standardised_position = 2 * ((state[1]+1.2) / (0.6+1.2)) - 1
        standardised_state.append(standardised_velocity)
        standardised_state.append(standardised_position)
        
        return(standardised_state)
    
    # Calculate Euclidean distance between 2 vectors
    # Input: 2 feature vectors
    # Output: distance between them
    def calculateDistance(self, vector1, vector2):
        return(math.sqrt((vector1[0]-vector2[0])**2 + (vector1[1]-vector2[1])**2))
    
    # Calculate total weight
    # Input: list of weights
    # Output: total weight
    def calculateTotalWeight(self, weight_list):
        total_weight = 0
        for i in range(len(weight_list)):
            total_weight += weight_list[i][2]
        
        return(total_weight)
            
    # Clear the knn list and also the weight list
    def cleanList(self):
        self.knn = []
        self.weight = []
        self.k_history = []
    
    # Choose the appropriate k by minimising variance and maximising the number of data to learn
    # Input: sorted neighbourhood list based on distance (distance, index, weight)
    # Output: k (numbers of nearest neighbour) that minimise neighbourhood variance function
    def chooseK(self, neighbourhood_list):
        data_list = []
        # Extract the data (Q value from the neighbourhood_list) and append it to the data_list
        for data in neighbourhood_list:
            data_list.append(self.q_storage[data[1]])
            
        # Initialise minimum variance
        minimum_k = 2 # Variable that will be return that minimise the variance of the neighbourhood
        minimum_var = self.neighbourhoodVariance(1, data_list[:2])
        
        # Iterate to find optimal k that will minimise the neighbourhood variance function
        for i in range(3, self.size):
            var = self.neighbourhoodVariance(1, data_list[:i])
            k = i
            
            # Update the k value and minimum var value if find parameter which better minimise than the previous value
            if (var <= minimum_var):
                minimum_k = k
                minimum_var = var
        
        # Return the k which minimise neighbourhood variance function
        return(minimum_k)
    
    # PNA variance function that needed to be minimise
    # Input: constant c, list containing data points
    # Output: calculation result from the neighbourhood variance function
    def neighbourhoodVariance(self, c, data_list):
        return(Math.sqrt(np.var(data_list)/ len(data_list)))
    
    # Get starting index for the weight list
    # Input: action
    # Output: starting index for the weight list
    def getStartingIndex(self, action):
        count_action = action + 1
        if (count_action == 0):
            return(0)
        else:
            index = 0
            for i in range(count_action):
                index += self.k_history[i]
            return(index)
        
    # Apply the PNA algorithm for feature vector and store the data point on the neighbours array
    # Input: feature vector of current state, actions array consisting of all possible actions, list that will store knn data and weights data
    # Output: vector containing the value of taking each action (left, neutral, right)
    def PNA(self, state, actions, knn_list, weight_list):
        approximate_action = []
        
        # Get the standardised version of state
        standardised_state = self.standardiseState(state)
        
        # Loop through every element in the storage array and only calculate for particular action
        for action in actions:
            temp = [] # array consisting of tuple (distance, original index, weight) for each point in the q_storage
            for i in range(len(self.q_storage)):
                data = self.q_storage[i]
                # Only want to calculate the nearest neighbour state which has the same action
                if (data["action"] == action):
                    vector_2 = data["state"]
                    standardised_vector_2 = self.standardiseState(vector_2)
                    distance = self.calculateDistance(standardised_state, standardised_vector_2)
                    index = i
                    weight = 1 / (1+distance**2)
            
                    # Create the tuple and append that to temp
                    temp.append(tuple((distance, index, weight)))
                else:
                    continue
        
            # After we finish looping through all of the point and calculating the standardise distance,
            # Sort the tuple based on the distance and only take k of it and append that to the neighbours array
            sorted_temp = sorted(temp, key=lambda x: x[0])
            # Get the value of the k dynamically
            self.k = self.chooseK(sorted_temp)
            self.k_history.append(self.k)
            
            for i in range(self.k):
                weight_list.append(sorted_temp[i])
                knn_list.append(self.q_storage[sorted_temp[i][1]])
            
            # Calculate the expected value of the action and append it to the approximate_action array
            expected_value = 0
            # We also need to calculate the total weight to make it into valid probability that we can compute it's expectation
            total_weight = self.calculateTotalWeight(weight_list[self.getStartingIndex(action):])
            for i in range(self.getStartingIndex(action), self.getStartingIndex(action)+self.k):
                weight = weight_list[i][2]
                probability = weight / total_weight
                expected_value += probability * knn_list[i]["value"]
                
            approximate_action.append(expected_value)
        
        return(approximate_action)
    
    # Calculate TD target based on Q Learning/ SARSAMAX
    # Input: Immediate reward based on what the environment gave
    # Output: TD target based on off policy Q learning
    def calculateTDTarget(self, immediate_reward):
        knn_prime = []
        weight_prime = []
        action_value = self.PNA(self.state, self.actions, knn_prime, weight_prime)
        
        return(immediate_reward + max(action_value))
    
    # Q learning TD updates on every neighbours on the kNN based on the contribution that are calculated using probability weight
    # Input: Immediate reward based on what the environment gave
    def TDUpdate(self, immediate_reward):
        # First, calculate the TD target
        td_target = self.calculateTDTarget(immediate_reward)
        
        # Iterate every kNN and update using Q learning method based on the weighting
        total_weight = self.calculateTotalWeight(self.weight)
        for i in range(len(self.weight)):
            index = self.weight[i][1]
            probability = self.weight[i][2] / total_weight
            
            # Begin updating
            td_error = td_target - self.q_storage[index]["value"]
            self.q_storage[index]["value"] = self.q_storage[index]["value"] + self.alpha*td_error*probability
        
        self.cleanList() # clean list to prepare for another step
    
    # Getting the maximum of the ucb method
    # Input: action_value list, bonus_variance list
    # Output: action which maximise
    def maximumUCB(self, action_value, bonus_variance):
        max_index = 0
        max_value = action_value[0] + bonus_variance[0]
        
        for i in range(1, 3):
            value = action_value[i] + bonus_variance[i]
            
            if (value >= max_value):
                max_value = value
                max_index = i
        
        return(max_index - 1) # return the action which maximise
        
    # Select which action to choose, whether left, neutral, or right (using UCB)
    # Output: -1 (left), 0 (neutral), 1 (right)
    def selectAction(self):
        action_value = self.PNA(self.state, self.actions, self.knn, self.weight)
        
        # Second term of ucb, calculate the bonus variance
        bonus_variance = []
        start_index = [] # used to calculate start index for each action
        finish_index = [] # used to calculate end index for each action
        for action in self.actions:
            data_list = []
            # Prevent index out of bound
            if (action != 1):
                # Data extraction
                start_index.append(self.getStartingIndex(action))
                finish_index.append(self.getStartingIndex(action+1))
                for i in range(self.getStartingIndex(action), self.getStartingIndex(action+1)):
                    data_list.append(self.q_storage[self.weight[i][1]])
                bonus_variance.append(self.neighbourhoodVariance(self.c, data_list))
            else:
                # Data extraction
                start_index.append(self.k_history[2])
                finish_index.append(len(self.weight))
                for i in range(self.k_history[2], len(self.weight):
                    data_list.append(self.q_storage[self.weight[i][1]])
                bonus_variance.append(self.neighbourhoodVariance(self.c, data_list))
        
        # Choose the action based on ucb method
        action_chosen = self.maximumUCB(action_value, bonus_variance)
                               
        # Only store chosen data in the knn and weight list
        # Clearance step
        chosen_knn = []
        chosen_weight = []
        for i in range(start_index[action_chosen+1], finish_index[action_chosen+1]):
            chosen_knn.append(self.knn[i])
            chosen_weight.append(self.weight[i])
        self.knn = chosen_knn
        self.weight = chosen_weight
        
        return action_chosen