## Learning Algorithm - Single agent

This script contains the algorithm referred to as 'Algorithm 3: Model free Q-Function Estimation and Policy Improvement' in the report

### 'Continuous_car' 

Instantiated with a controller K, and boolean variables defining whether you want system noise or controller noise

##### 'MatrixMaker'

    Will return 3 matrices used for the computation of the Q-function approximation weights

##### 'LSOptimiser'

    Uses the three above matrices to compute and return the weight vector

##### 'PolicyMaximiser'

    Uses the learned weight vector to calculate the new controller

### 'PolicyImprovementAlgo'

Carries out a desired number of updates of the controller by calling the above functions







In [1]:
import numpy as np
import pdb
import matplotlib.pyplot as plt


# class for the agent
class Continuous_car():

    
    def __init__(self,K,noisy_model = False, noisy_controller=False):
        
        #system
        self.A = np.array([[1,1],[0, 1]])
        self.B = np.array([[0.5],[1]])

        #cost matrices
        self.E = np.array([[1,0],[0, 0.5]])
        self.F = np.array([1])

        # Arbitrary gain choice used (stability checked)
        self.K = K
        
        # Optimal Gains found using dLQR on Matlab:
        # K_Optimal = [ 0.4634, 1.0170 ]

        self.disc_fact = 0.99
        self.n_states = 2
        
        if noisy_model == True:
            self.sigma_model = 0.25
        else:
            self.sigma_model = 0
            
        self.sigma_controller = 0.2
        self.noisy_controller = noisy_controller
    
    def GetPolicyInput(self,x):
        #For a given state, return the input u according to a defined policy
        # x: 2x1 array
        # K: 1x2 array
        
        inp = np.matmul(self.K,x)[0] #scalar
        
        if self.noisy_controller:
            inp += self.GetControllerNoise() #scalar
        
        return inp #scalar
    
    def GetControllerNoise(self):
        # returns scalar value of noise
        
        contr_noise = self.sigma_controller * np.random.randn(1)[0] # scalar
        
        return contr_noise
    
    def GetCost(self, x, u):
        #For a given state, return the one step cost of this new state
        # x is a 2x1 array
        # u is a scalar
        
        # x'Ex
        cost1 = np.matmul(np.matmul(x.transpose(),self.E),x)[0][0]
        
        # u'Fu
        cost2 = u*u*self.F[0]
        
        return cost1+cost2 #scalar
        
    def GetNoise(self):
        # Returns a vector with noise for the model only for velocity state
        
        w = np.array([0,self.sigma_model*np.random.randn(1)[0]]).reshape(2,1)
        
        return w # 2x1 array
    
    def GetNextState(self,current_state,current_input):
        # returns the next state, x using the model given a current state and input           
        
        x_next_1 = np.matmul(self.A,current_state)
        
        x_next_2 = self.B * current_input
        x_next_3 = self.GetNoise()
        #pdb.set_trace()
        x_next = x_next_1 + x_next_2 + x_next_3
        return x_next
    

    def RunEpisode(self, episode_length, state_initial):
        #function will return lists of the states, inputs and costs for a trajectory of chosen length given an initial state
        '''
        length: integer
        state_initial: list form, e.g. [3,2] for position of 3 and velocity of 2
        '''

        x = np.array(state_initial).reshape(2,1)

        state_list = [x]
        cost_list = []
        input_list = []
        pos_list = [x[0][0]]
        vel_list = [x[1][0]]
            


       
        #input_list.append(sys.GetInput(x))

        for k in range(episode_length):
            
            u = self.GetPolicyInput(x)
                

            input_list.append(u)
            
            #pdb.set_trace()

            x = self.GetNextState(x,u)
            cost = self.GetCost(x,u)

            state_list.append(x)
            cost_list.append(cost)


            pos_list.append(x[0][0])
            vel_list.append(x[1][0])
            
            #pdb.set_trace()

        return state_list, cost_list, input_list, pos_list, vel_list

In [1]:
def MatrixMaker(sys, number_of_episodes=100, episode_length=10):
    '''
    For the defined system, this will run a certain number of episodes from random 
    initial states and return the matrices used for LSTD estimation of the value function
    
    The output for this version will give a Theta vector of 6 elements which are the coefficients for the terms:
    
    x1^2
    x1 * x2
    x1 * u
    x2^2
    x2 * u
    u^2
    1 (constant)
    '''
    df = sys.disc_fact
    
    X1_list = []
    X2_list = []
    Costs_list = []
    Inputs_list = []
    
    mu_position = 0
    sigma_position = 10
    mu_velocity = 0
    sigma_velocity = 2

    
    def GetRandomSample():
        #returns a random initial state
        x1 = sigma_position * np.random.randn(1)[0] + mu_position
        x2 = sigma_velocity * np.random.randn(1)[0] + mu_velocity
        
        return [x1,x2]
    

        
    
    for episode in range(number_of_episodes):
        
        x_init = GetRandomSample()
        
        state_list, cost_list, input_list, pos_list, vel_list = sys.RunEpisode(episode_length,x_init)
        
        X1_list.append(pos_list)
        X2_list.append(vel_list)
        Costs_list.append(cost_list)
        Inputs_list.append(input_list)
        
        if episode%1000 == 0:
            print(f'...running, iteration {episode} completed')
        
    # The lists for states, costs and inputs are now compiled
    
    # Each list contains p episodes
    # Each episode contains k transitions and k+1 states
    # There are therefore in total, p*k transitions = N
    
    transitions_done = 0
    for episode in Costs_list:
        transitions_done += len(episode)
        
    N = transitions_done
    
    Beta_k = np.zeros((7,N))
    Beta_kplus1 = np.zeros((7,N))
    C_k = np.zeros((N)).reshape(N,1)
    
    transition_counter = 0
    skipped_samples = 0
    # for every transition, we can create an equation and hence populate a column of our Beta matrices (and row of Ck)
    
    for idx_episode,episode in enumerate(Costs_list):
        for idx_transition,transition in enumerate(episode):
            
           
            x1_current = X1_list[idx_episode][idx_transition]
            x1_next = X1_list[idx_episode][idx_transition+1]
            
            x2_current = X2_list[idx_episode][idx_transition]
            x2_next = X2_list[idx_episode][idx_transition+1]
            
            input_current = Inputs_list[idx_episode][idx_transition]
            
            x_next_vector = np.array([[x1_next],[x2_next]])
            
            input_next = np.matmul(sys.K,x_next_vector)  
            
            trans_cost = Costs_list[idx_episode][idx_transition]
            
            
            #Populate Beta_k matrix
            #x1^2
            Beta_k[0][transition_counter] = x1_current * x1_current
            #x1 * x2
            Beta_k[1][transition_counter] = x1_current * x2_current
            #x1 * u
            Beta_k[2][transition_counter] = x1_current * input_current
            #x2^2
            Beta_k[3][transition_counter] = x2_current * x2_current
            #x2* u
            Beta_k[4][transition_counter] = x2_current * input_current
            #u^2
            Beta_k[5][transition_counter] = input_current * input_current
            #1
            Beta_k[6][transition_counter] = 1
            
            
            #Populate Beta_k+1
           #x1^2
            Beta_kplus1[0][transition_counter] = df* x1_next * x1_next
            #x1 * x2
            Beta_kplus1[1][transition_counter] = df* x1_next * x2_next
            #x1 * u
            Beta_kplus1[2][transition_counter] = df* x1_next * input_next
            #x2^2
            Beta_kplus1[3][transition_counter] = df* x2_next * x2_next
            #x2* u
            Beta_kplus1[4][transition_counter] = df* x2_next * input_next
            #u^2
            Beta_kplus1[5][transition_counter] = df* input_next * input_next
            #1
            Beta_kplus1[6][transition_counter] = df* 1
            
            
            #populate Ck
            C_k[transition_counter] = trans_cost
            
            transition_counter +=1
            
        #pdb.set_trace()
            
    if transition_counter != transitions_done:
        print('Error!!! Matrices do not match!')
    print(f'{skipped_samples} samples were skipped')
    
    return Beta_k, Beta_kplus1, C_k


def LSOptimiser(sys, A,B,C):
    '''
    Takes in matrices A,B and vector C from 'Matrix Maker' and returns the estimated value function weights 
    '''
    
    # V_hat = (A* (A-B)_t)^-1  *  A   *  C
    
    #a = (A-B)' 
    a = (A-B)
    a = a.transpose()
    
    #b = A* (A-B)_t
    b = np.matmul(A,a)
    
    c = np.matmul(A,C)
    
        
    # Theta = (A* (A-B)_t)^-1  *  A   *  C 
    Theta = np.linalg.solve(b,c)
    
    return Theta

def PolicyMaximiser(Theta):
    # for a found theta (coefficients of the Q function), this will minimise Q over u to find the optimal gains, K
    
    k_1 = Theta[2]/(2*Theta[5])
    k_2 = Theta[4]/(2*Theta[5])
    
    K_updated = -np.array([k_1[0],k_2[0]])
    
    return K_updated

In [None]:
def PolicyImprovementAlgo(K_initial=np.array([-1, -1.6]),i=5000,T=50,k=5):
    '''
    Function that will take an initial controller gain in the form above:
     - simulate many trajectories
     - compute the action value function using LS (Q function)
     - minimises over the action space to find a new improved policy 
     - carries out the algo again to improve upon this controller for as many updates as desired
     
     Repeat the above for a specified number of steps until optimal gains are found
    
    # i: number of episodes (samples) run before improvement
    # T: length (in time steps) of each episode
    # k: number of policy improvements  
    '''
    
    K_stored = [K_initial]
    K = K_initial
    t=0
    
    while t<k:
        
        print(f'K currently is {K}')
        sys = Continuous_car(K, True,True)
        print(sys.K)
        
        Bk, Bk1, Ck = MatrixMaker(sys, number_of_episodes=i,episode_length=T)

        Theta, _, __ = LSOptimiser(sys, Bk, Bk1, Ck)

        K_new = PolicyMaximiser(Theta)

        K_stored.append(K_new)
        
        t += 1
        K = K_new
        print(f'\nRunning, now onto iteration {t}')
            
    print('All done!')        
    return K_stored #returns list of controllers


# The following functions are used to extract the elements of the gains from the list and plot them

def GainExtractor(Klist):
    K1 = []
    K2 = []
    for i,val in enumerate(Klist):
        K1.append(Klist[i][0])
        K2.append(Klist[i][1])
    return K1, K2

def PlotGains(LoK, save=False, figname=None):
    
    K1,K2 = GainExtractor(LoK)
    K1_opt = -0.4634*np.ones(len(K1))
    K2_opt = -1.0170*np.ones(len(K2))
    
    
    fig = plt.figure(dpi = 150)
    axes=fig.add_axes([0,0,1,1])

    X = range(len(K1))

    axes.plot(X,K1,'tab:blue',label='K[1]')
    axes.plot(X,K1_opt,'tab:red', linewidth=0.75)
    axes.plot(X,K2,'tab:green',label='K[2]')
    axes.plot(X,K2_opt,'tab:red', linewidth=0.75, label='LQR optimal gain')
    axes.set_xlabel('Iteration Number')
    axes.set_ylabel('Gain')
    axes.legend()
    
    if save:
        if figname is None:
            raise NotImplementedError('No file name given')
        plt.savefig(f'{figname}.png', bbox_inches='tight')
    
def Plot2Gains(LoK1, LoK2, save=False, figname=None):
    
    K1,K2 = GainExtractor(LoK1)
    K1_opt = -0.4634*np.ones(len(K1))
    K2_opt = -1.0170*np.ones(len(K2))
    K1_, K2_ = GainExtractor(LoK2)
    
    
    fig = plt.figure(dpi=200)
    axes=fig.add_axes([0,0,1,1])

    X = range(len(K1))

    axes.plot(X,K1,'b',label='K_initial(1)')
    axes.plot(X,K1_opt,'y')
    axes.plot(X,K2,'b')
    axes.plot(X,K2_opt,'y', label='LQR optimal gain')
    axes.plot(X,K1_,'r',label='K_initial(2)')
    axes.plot(X,K2_,'r')
    axes.set_xlabel('Iteration Number')
    axes.set_ylabel('Gain')
    axes.legend()
    
    if save:
        if figname is None:
            raise NotImplementedError('No file name given')
        plt.savefig(f'{figname}.png')


