In [1]:
from lunar_lander_old import modifiedLunarLander
import random
import numpy as np
from control import dlqr

# Fast Identification Demo using the Lunar Lander

In this notebook, we test out a fast identification strategy on the Lunar Lander from OpenAI Gym to enable successful landing in the first episode. Our overal strategy is composed of the following steps.
 - Step 1: Do a few exploration moves to generate a data set (D_x_plus, D_x, D_u) for system identification.
 - Step 2: Use least squares identification to obtain system matrices (A,B) from (D_x_plus, D_x, D_u).
 - Step 3: Obtain an LQR controller for the matrices (A,B).
 - Step 4: Use the LQR controller to stabilize a landing trajectory of constant velocity.

First, we define an algorithm with our fast exploration strategy. This strategy chooses inputs online and interacts with the system to obtain a dataset (D_x_plus, D_x, D_u), where
$$
D_x = (x_0,x_1,\ldots, x_{N-1}), \quad D_u = (u_0,u_1,\ldots, u_{N-1}),\quad D_x^+ = (x_1,x_2,\ldots , x_N).
$$
The strategies for obtaining this dataset is simple:
 - When $x_k$ is not linearly dependent on the previous states, the input is zero and the new state information is observed.
 - When $x_k$ is linearly dependent on the previous states, an input is set to one to perform further information.
 - When all inputs have been set to one and $x_k$ is linearly dependent, no more information can be extracted and the exploration terminates.
 
 Note that this strategy works for linear systems and not necessarily for nonlinear systems.

In [2]:
from scipy.optimize import minimize
from numpy import linalg

def objectiveFunction(u, D_x, D_u):
    X = np.array(D_x).transpose()
    U = np.array(D_u + [u]).transpose()
    XU = np.concatenate((X,U),axis=0)
    if XU.shape[1] > XU.shape[0]:
        matrix = XU @ XU.T
    else:
        matrix = XU.T @ XU
    regularized_matrix = matrix + np.eye(matrix.shape[0])
    det = np.linalg.det(regularized_matrix)
    return -det

def generate_grid_points(resolution=10):
    # Generate grid points between -1 and 1 for both x and y axes
    x = np.linspace(-1, 1, resolution)
    y = np.linspace(-1, 1, resolution)
    
    # Generate all combinations of x and y coordinates
    grid_points = np.array(np.meshgrid(x, y)).T.reshape(-1, 2)
    
    # Keep only the points that lie within the unit circle (norm < 1)
    valid_points = grid_points[np.linalg.norm(grid_points, axis=1) <= 1]
    
    return valid_points
    

def fastExploration(plant, linear_disturbance, use_standard, steps, render):
    s = 0
    k = 0
    D_x = []
    D_u = []
    x = plant.x
    n = plant.n
    m = plant.m
    while True:
        D_x_new = D_x.copy()
        D_x_new.append(x)
        if use_standard:
            u_k = np.zeros((m,))
            if np.linalg.matrix_rank(np.array(D_x)) == np.linalg.matrix_rank(np.array(D_x_new)):
                if s >= m:
                    D_x_plus = D_x[1:]
                    D_x_plus.append(x)
                    break
                elif s >= 0 and s<m:
                    u_k[s] = 1
                    s += 1
                else:
                    s += 1
        else:
            if (k >= steps):
                D_x_plus = D_x[1:]
                D_x_plus.append(x)
                break
            points = generate_grid_points(resolution=15)
            objective_values = np.array([objectiveFunction(p, D_x_new, D_u) for p in points])
            min_index = np.argmin(objective_values)
            u_k = points[min_index]    
        D_x = D_x_new
        
        if linear_disturbance:
            x = plant.step(u_k + np.random.normal(0, 0.1, u_k.shape)) + np.random.normal(0, 0.1, x.shape)
        else:
            x = plant.step(u_k)
        if render:
            plant.render()
        D_u.append(u_k)
        k += 1
    return (D_x_plus,D_x,D_u)

As explained, we identify a system from a dataset (D_x,D_u,D_x_plus). For this identification we just do least squares identification with a tiny bit of regularization.

In [3]:
def fastIdentification(D_x_plus,D_x,D_u,lambda_reg = 1e-6):
    n = D_x[0].shape[0]
    m = D_u[0].shape[0]
    X_plus = np.array(D_x_plus).transpose()
    X = np.array(D_x).transpose()
    U = np.array(D_u).transpose()
    XU = np.concatenate((X,U),axis=0)
    regularization = lambda_reg*np.diag([1,1,1,1,1,1,1,1])
    if lambda_reg == 0:
        AB = X_plus @ np.linalg.pinv(XU)
    else:
        AB = np.transpose(np.linalg.solve((XU@np.transpose(XU) + regularization),XU@np.transpose(X_plus)))
    A = AB[:,0:n]
    B = AB[:,n:]
    return (A,B)

Now we can run a first experiment to test our implementation. In this case, we use three parameters for our experiment:
 - We specify a desired speed for the landing. This speed is used to generate a very simple reference trajectory.
 - We specify a number of exploration runs. This parameter specifies how often our exploration strategy is called to generate the data set.
 - We specify whether stochastic disturbances are on or off.
 
 Note that we choose the matrices $Q$ and $R$ as
 $$
 Q = 100 I \quad \text{ and } \quad
 R = \begin{pmatrix}
 0.3 & 0\\
 0 & 0.03
 \end{pmatrix}.
 $$
 Here our choice for $Q$ is just a heuristic and the choice for $R$ is motivated by the description of the Lunar Lander, where firing the main engine induces a cost of $0.3$ and firing a side enging induces a cost of $0.03$.

In [4]:
# parameters
speed = 0.15 #speed for the landing trajectory
num_explore = 2 # number of calls of the data exploration strategy
dispersion_on = False #this parameter switches input noise on
use_standard = False # Whether standard or grid-based exploration is used

# initialize the lunar lander
lander = modifiedLunarLander(dispersion_on)
lander.reset()
lander.render()

#desired trajectory
x_des = np.zeros((6,4000))
x_des[1,:] = np.linspace(lander.x[1],lander.x[1]-100*speed,4000)
x_des[3,:] = -speed*np.ones((1,4000))

# step one: exploration
D_x_plus = []
D_x = []
D_u = []
if (use_standard and num_explore > 1):
    for ind in range(num_explore):
        (D_x_plus_n,D_x_n,D_u_n) = fastExploration(lander, linear_disturbance=False, use_standard = True, steps=0, render = True)
        D_x_plus += D_x_plus_n
        D_x += D_x_n
        D_u += D_u_n
else:
    (D_x_plus,D_x,D_u) = fastExploration(lander, linear_disturbance=False, use_standard = use_standard, steps=8, render = True)

# step two: identify system matrices
(A,B) = fastIdentification(D_x_plus,D_x,D_u)

# step three: generate a controller
Q = 100*np.diag([1,1,1,1,1,1])
R = np.diag([0.3,0.03])
(K,S,E) = dlqr(A, B, Q, R)

# step four: simulate the controller until the end of the episode
done = False
k = 0
while not done and k < 4000:
    # u = -np.array([[0.145,0], [0, 1]]) @ K@(lander.x - x_des[:,k])
    u = - K @ (lander.x - x_des[:,k])
    x = lander.step(u)
    lander.render()
    k += 1
    done = lander.done

lander.env.close()
# print the reward
print(lander.summed_reward)

-41.616717347577506


Now it is time to statistically evaluate the performance of the exploration strategy. To this end, we collect data on average performance and succesful landing rates for either stochastic noise on or of and different numbers of exploration cycles.

In [5]:
statistics = {'dispersion_on': [],
              'num_explore': [],
              'average performance': [],
              'percentage of succesful landings': []
}

study_cases = [
    (dispersion_on, use_standard, num_explore)
    for dispersion_on in [True, False]
    for use_standard in [True, False]
    for num_explore in ([1,2] if use_standard else [8, 12, 16])
]
numOfSamples = 100

for (dispersion_on,use_standard,num_explore) in study_cases:
    # fix random seed here
    print('-------------------------------------------------------------------------')
    print('Starting simulation campaign for dispersion_on = {0}, standard = {1} and num_explore = {2}'.format(dispersion_on,use_standard,num_explore))
    print('-------------------------------------------------------------------------')
    performances_current_run = []
    landing_successes = []
    lander = modifiedLunarLander(dispersion_on)
    lander.env.seed(0) # To make the results reproducable
    for kk in range(numOfSamples):
        if kk%100 == 0:
            print('Running simulation {0}'.format(kk))
        
        lander.reset()
        # step one: exploration
        D_x_plus = []
        D_x = []
        D_u = []
        if (use_standard and num_explore > 1):
            for ind in range(num_explore):
                (D_x_plus_n,D_x_n,D_u_n) = fastExploration(lander, linear_disturbance=False, use_standard = True, steps=0, render = False)
                D_x_plus += D_x_plus_n
                D_x += D_x_n
                D_u += D_u_n
        else:
            (D_x_plus,D_x,D_u) = fastExploration(lander, linear_disturbance=False, use_standard = use_standard, steps=num_explore, render = False)

        # step two: identify system matrices
        (A,B) = fastIdentification(D_x_plus,D_x,D_u)

        # step three: generate a controller
        Q = 1000*np.diag([1,1,1,1,1,1])
        R = np.diag([0.3,0.03])
        (K,S,E) = dlqr(A, B, Q, R)

        # step four: simulate the controller until the end of the episode
        done = False
        k = 0
        while not done and k < 4000:
            u = -K@(lander.x - x_des[:,k])
            x = lander.step(u)
            k += 1
            done = lander.done
        if not done:
            continue
        performances_current_run.append(lander.summed_reward)
        landing_successes.append(lander.summed_reward > 150)

    print('Obtained average performance: {0}'.format(np.mean(performances_current_run)))
    print('Percentage of successfull landings: {0} \n'.format(np.mean(landing_successes)))
    statistics['dispersion_on'].append(dispersion_on)
    statistics['num_explore'].append(num_explore)
    statistics['average performance'].append(np.mean(performances_current_run))
    statistics['percentage of succesful landings'].append(np.mean(landing_successes))

-------------------------------------------------------------------------
Starting simulation campaign for dispersion_on = True, standard = True and num_explore = 1
-------------------------------------------------------------------------
Running simulation 0
Obtained average performance: -515.571135468037
Percentage of successfull landings: 0.06 

-------------------------------------------------------------------------
Starting simulation campaign for dispersion_on = True, standard = True and num_explore = 2
-------------------------------------------------------------------------
Running simulation 0
Obtained average performance: -323.5708419343986
Percentage of successfull landings: 0.11 

-------------------------------------------------------------------------
Starting simulation campaign for dispersion_on = True, standard = False and num_explore = 8
-------------------------------------------------------------------------
Running simulation 0
Obtained average performance: -329.5

Let us plot a table with the results.

In [6]:
import pandas as pd
statistics_table = pd.DataFrame(statistics)
print(statistics_table)

   dispersion_on  num_explore  average performance  \
0           True            1          -515.571135   
1           True            2          -323.570842   
2           True            8          -329.527685   
3           True           12          -321.874965   
4           True           16          -234.696635   
5          False            1          -176.969405   
6          False            2           227.209289   
7          False            8           154.662203   
8          False           12           213.301637   
9          False           16           131.186865   

   percentage of succesful landings  
0                              0.06  
1                              0.11  
2                              0.12  
3                              0.07  
4                              0.10  
5                              0.45  
6                              0.98  
7                              0.75  
8                              0.93  
9                        

Finally, we do some tests with linear systems to verify that the stabilization works with a noisefree linear system. To this end, we first define a simple linear system class and try out the identification.

In [7]:
class linearSystem():
    def __init__(self,A,B):
        (n,m) = B.shape
        self.n = n
        self.m = m
        self.A = A
        self.B = B
        
        if np.random.randint(0,2,(1,)) == 0:
            self.x = np.random.randn(n)
        else:
            self.x = np.zeros((n,))
        # self.x = np.random.randn(n)
    
    def step(self,u):
        self.x = A@self.x + B@u
        return self.x

In [8]:
statistics = {'dispersion_on': [],
              'use_standard': [],
              'num_explore': [],
              'number of succesful landings': [],
              'number of unsuccesful landings': []
}

from control import ctrb
# define data
n = 6 # full state dimension
n_unc = 3 # number of uncontrollable modes
m = 2 # number of inputs

numOfSamples = 1000 # number of trial examples

successful_stabilizations = 0
unsuccessful_stabilizations = 0

study_cases = [
    (dispersion_on, use_standard, num_explore)
    for dispersion_on in [True, False]
    for use_standard in [True, False]
    for num_explore in ([8] if use_standard else [8,9,10,11,12,13,14,15,16])
]

performances_current_run = []
landing_successes = []

for (dispersion_on, use_standard, num_explore) in study_cases:
    np.random.seed(616)
    for ii in range(numOfSamples):
        # sample a random stabilizable system in Kalman decomposition
        A_11 = np.random.randn(n - n_unc,n - n_unc)
        A_12 = np.random.randn(n - n_unc,n_unc)
        A_21 = np.zeros((n_unc,n - n_unc))
        A_22 = np.diag(1-2*np.random.rand(n_unc))
        B_11 = np.random.randn(n - n_unc,m)
        B_21 = np.zeros((n_unc,m))

        A = np.block([[A_11,A_12],[A_21,A_22]])
        B = np.block([[B_11],[B_21]])

        T = np.random.randn(n,n) #random transformation

        # check controllability
        if np.linalg.matrix_rank(ctrb(A_11, B_11)) < n - n_unc:
            continue
        # check conditioning of the transformation matrix
        if np.linalg.cond(T) > 1e6:
            continue
        
        # transform matrices
        A = T@A@np.linalg.inv(T)
        B = T@B

        # define linear system
        sys = linearSystem(A,B)

        # explore system, but ensure reproducibility by storing random state
        state = np.random.get_state()  # Save the current random state
        (D_x_plus,D_x,D_u) = fastExploration(sys, dispersion_on, use_standard, num_explore, False)
        np.random.set_state(state)  # Restore the random state

        try:
            # identify system
            (A_est,B_est) = fastIdentification(D_x_plus,D_x,D_u,lambda_reg = 1e-8)

            # stabilize system
            Q = np.identity(n)
            R = np.identity(m)
        
            (K,S,E) = dlqr(A_est, B_est, Q, R)
            
            # check stability
            (W,V) = np.linalg.eig(A - B@K)

            # safe result
            if np.max(np.abs(W)) < 1:
                successful_stabilizations += 1
            else:
                unsuccessful_stabilizations += 1
        except:
            n = D_x[0].shape[0]
            m = D_u[0].shape[0]
            X_plus = np.array(D_x_plus).transpose()
            X = np.array(D_x).transpose()
            U = np.array(D_u).transpose()
            XU = np.concatenate((X,U),axis=0)
            
            unsuccessful_stabilizations += 1
            continue

        

    landing_successes.append(successful_stabilizations)
    statistics['dispersion_on'].append(dispersion_on)
    statistics['use_standard'].append(use_standard)
    statistics['num_explore'].append(np.mean(num_explore))
    statistics['number of succesful landings'].append(successful_stabilizations)
    statistics['number of unsuccesful landings'].append(unsuccessful_stabilizations)
    print('{0} stabilizations have been successful and {1} stabilizations have been unsuccessful.'.format(successful_stabilizations,unsuccessful_stabilizations))
    successful_stabilizations = 0
    unsuccessful_stabilizations = 0
    

291 stabilizations have been successful and 709 stabilizations have been unsuccessful.
476 stabilizations have been successful and 524 stabilizations have been unsuccessful.
676 stabilizations have been successful and 324 stabilizations have been unsuccessful.
745 stabilizations have been successful and 255 stabilizations have been unsuccessful.
788 stabilizations have been successful and 212 stabilizations have been unsuccessful.
829 stabilizations have been successful and 171 stabilizations have been unsuccessful.
859 stabilizations have been successful and 141 stabilizations have been unsuccessful.
864 stabilizations have been successful and 136 stabilizations have been unsuccessful.
870 stabilizations have been successful and 130 stabilizations have been unsuccessful.
880 stabilizations have been successful and 120 stabilizations have been unsuccessful.
999 stabilizations have been successful and 1 stabilizations have been unsuccessful.
1000 stabilizations have been successful and 

  G = solve(B.T @ X @ B + R, B.T @ X @ A + S.T)
  G = solve(B.T @ X @ B + R, B.T @ X @ A + S.T)


916 stabilizations have been successful and 84 stabilizations have been unsuccessful.


  G = solve(B.T @ X @ B + R, B.T @ X @ A + S.T)
  G = solve(B.T @ X @ B + R, B.T @ X @ A + S.T)


901 stabilizations have been successful and 99 stabilizations have been unsuccessful.


  G = solve(B.T @ X @ B + R, B.T @ X @ A + S.T)
  G = solve(B.T @ X @ B + R, B.T @ X @ A + S.T)
  G = solve(B.T @ X @ B + R, B.T @ X @ A + S.T)


858 stabilizations have been successful and 142 stabilizations have been unsuccessful.


  G = solve(B.T @ X @ B + R, B.T @ X @ A + S.T)


836 stabilizations have been successful and 164 stabilizations have been unsuccessful.


The above script shows that our algorithm is able to stabilize random stabilizable linear systems. We suspect that the few unsuccessful trials are caused by numerical problems.

In [9]:
import pandas as pd
statistics_table = pd.DataFrame(statistics)
print(statistics_table)

    dispersion_on  use_standard  num_explore  number of succesful landings  \
0            True          True          8.0                           291   
1            True         False          8.0                           476   
2            True         False          9.0                           676   
3            True         False         10.0                           745   
4            True         False         11.0                           788   
5            True         False         12.0                           829   
6            True         False         13.0                           859   
7            True         False         14.0                           864   
8            True         False         15.0                           870   
9            True         False         16.0                           880   
10          False          True          8.0                           999   
11          False         False          8.0                    