In [1]:
import numpy as np
import math
import tensorflow as tf
T=5
NX=4
NU=2
DT=0.2
WB=110

MAX_STEER = np.deg2rad(45.0)  # maximum steering angle [rad]
MAX_DSTEER = np.deg2rad(30.0)  # maximum steering speed [rad/s]
MAX_SPEED = 25#55.0 / 3.6  # maximum speed [m/s]
MIN_SPEED = -25#-20.0 / 3.6  # minimum speed [m/s]
MAX_ACCEL = 5.0  # maximum accel [m/ss]
class State:
     
    def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0):
        self.x = x
        self.y = y
        self.yaw = yaw
        self.v = v
        self.predelta = None

class Motion():

    def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0):
        self.state=State(x=x, y=y, yaw=yaw, v=v)
        
    def get_state(self):
        return State(x=self.state.x, y=self.state.y, yaw=self.state.yaw, v=self.state.v)
        
    def set_state(self,state):
        self.state=state
    def get_next_state(self, a, delta, curr_state=-1):
        if curr_state==-1:
            curr_state=self.state
        # input check
        if delta >= MAX_STEER:
            delta = MAX_STEER
        elif delta <= -MAX_STEER:
            delta = -MAX_STEER

        next_x = curr_state.x + curr_state.v * math.cos(curr_state.yaw) * DT
        next_y = curr_state.y + curr_state.v * math.sin(curr_state.yaw) * DT
        next_yaw = curr_state.yaw + curr_state.v / WB * math.tan(delta) * DT
        next_v = curr_state.v + a * DT
        print(curr_state.v,a,DT,next_v)
        if next_v > MAX_SPEED:
            next_v = MAX_SPEED
        elif next_v < MIN_SPEED:
            next_v = MIN_SPEED

        return State(x=next_x,y=next_y,v=next_v,yaw=next_yaw)
    
    def _get_next_state(self, a, delta, curr_state=-1):
        if curr_state==-1:
            curr_state=self.state
        # input check
        if delta >= MAX_STEER:
            delta = MAX_STEER
        elif delta <= -MAX_STEER:
            delta = -MAX_STEER
            
        A, B, C = get_linear_model_matrix(curr_state.v, curr_state.yaw, delta)
        
        X=np.array([curr_state.x,curr_state.y,curr_state.v,curr_state.yaw],dtype=np.float64)
        U=np.array([a,delta],dtype=np.float64)
        [next_x,next_y,next_v,next_yaw]=np.matmul(A,X) + np.matmul(B,U)+ C
        
        
        if next_v > MAX_SPEED:
            next_v = MAX_SPEED
        elif next_v < MIN_SPEED:
            next_v = MIN_SPEED

        return State(x=next_x,y=next_y,v=next_v,yaw=next_yaw),[A,B,C]


    def get_trajectory(self,accels, deltas,time_step):
        xbar =np.zeros((NX, time_step + 1))
        xbar[:, 0] = self.state.x,self.state.y,self.state.v,self.state.yaw
        #_state=State(x=state.x,y=state.y,v=state.v,yaw=state.yaw)
        _state=Motion.get_state(self)
        for (accel, delta, i) in zip(accels, deltas, range(1, T + 1)):
            next_state = Motion._get_next_state(self,accel, delta, curr_state=_state)
            xbar[:, i] = next_state.x,next_state.y,next_state.v,next_state.yaw
            _state=next_state
        

        return xbar


    
def get_linear_model_matrix(v, phi, delta):
        A = np.zeros((NX, NX))
        A[0, 0] = 1.0
        A[1, 1] = 1.0
        A[2, 2] = 1.0
        A[3, 3] = 1.0
        A[0, 2] = DT * math.cos(phi)
        A[0, 3] = - DT * v * math.sin(phi)#
        A[1, 2] = DT * math.sin(phi)
        A[1, 3] = DT * v * math.cos(phi)#
        A[3, 2] = DT * math.tan(delta) / WB

        B = np.zeros((NX, NU))
        B[2, 0] = DT
        B[3, 1] = DT * v / (WB * math.cos(delta) ** 2)#

        C = np.zeros(NX)
        C[0] = DT * v * math.sin(phi) * phi
        C[1] = - DT * v * math.cos(phi) * phi
        C[3] = - DT * v * delta / (WB * math.cos(delta) ** 2)
        #C = np.zeros(NX)
        return A, B, C
    
def np_quad_form(a,b):
        res=a @ b @ a.T
        return res
def tf_quad_form(_a,_b):
        _at=tf.transpose(_a)
        _res=tf.matmul(_at,_b)
        return tf.matmul(_res,_a)
def cost( x, u,xref,t,_A,_B,_C):
        
        R = np.diag([0.01, 0.01])  # input cost matrix
        Rd = np.diag([0.01, 1.0])  # input difference cost matrix
        Q = np.diag([1.0, 1.0, 0.5, 0.5])  # state cost matrix
        Qf = Q  # state final matrix
        
        u_t=tf.Variable(u,dtype=tf.float64)
        u_t=tf.reshape(u_t,[NU,1])
        x_t=tf.Variable(x,dtype=tf.float64)
        x_t=tf.reshape(x_t,[NX,1])
        xref_t1=tf.Variable(xref,dtype=tf.float64)
        xref_t1=tf.reshape(xref_t1,[NX,1])
        x_t1=tf.matmul(_A,x_t) + tf.matmul(_B,u_t) #+ tf.reshape(_C,shape=[4,1])
        l = tf_quad_form(u_t, R) + tf_quad_form(xref_t1 - x_t1, Q)
            
        
        init=tf.global_variables_initializer()
        gg=tf.gradients(l,u_t)
        return l,init,x_t1,u_t,gg

In [2]:
_xref=np.ones((4,5+1),dtype=np.float64)
_xref[0,:]=5
_xref[1,:]=0
_xref[2,:]=5
_xref[3,:]=1
_dref=np.zeros((5),dtype=np.float64)
#oa=[0]*5
#od=[0]*5
_x0=np.zeros((NX),dtype=np.float64)
_x0[2]=1.0

_u=np.zeros((NU,T),dtype=np.float64)
_MS=Motion()

def sim_tn_steps(x0,U,xref):
    X=np.zeros((NX,T+1),dtype=np.float64)
    X[:,0]=x0
    for t in range(T):
        _state=State(x=X[0,t],y=X[1,t],v=X[2,t],yaw=X[3,t])
        _MS.set_state(_state)
        nst,Const=_MS._get_next_state(U[0,t],U[1,t])
        A,B,C=Const
        #nst=_MS.get_state()
        mm=np.array([nst.x,nst.y,nst.v,nst.yaw])
        x_t=X[:,t].copy()
        #x_t=mm
        
        #print("...",mm,X[:,t])
        u_t=U[:,t]
        xref_t=xref[:,t]
        l,init,_x_t1,_u_t,g=cost(x_t,u_t,xref_t,t,A,B,C)
        with tf.Session() as sess:
            sess.run(init)
            x_t1,u_t,l,g=sess.run([_x_t1,_u_t,l,g])
        print(l)
        X[:,t+1]=x_t1.reshape(4).copy()
        #print(X)
    return X,l,g

ret=sim_tn_steps(_x0,_u,_xref)
print(ret[0],'\n\n',ret[1],'\n',ret[2])

[[31.54]]
[[29.66]]
[[27.86]]
[[26.14]]
[[24.5]]
[[0.  0.2 0.4 0.6 0.8 1. ]
 [0.  0.  0.  0.  0.  0. ]
 [1.  1.  1.  1.  1.  1. ]
 [0.  0.  0.  0.  0.  0. ]] 

 [[24.5]] 
 [array([[-0.8       ],
       [-0.00181818]])]


In [3]:
def sim_tn_steps(x0,U,xref):
    X=np.zeros((NX,T+1),dtype=np.float64)
    X[:,0]=x0
    for t in range(T):
        _state=State(x=X[0,t],y=X[1,t],v=X[2,t],yaw=X[3,t])
        _MS.set_state(_state)
        nst,Const=_MS._get_next_state(U[0,t],U[1,t])
        B=Const[1]
        #nst=_MS.get_state()
        x_t1=[nst.x,nst.y,nst.v,nst.yaw]
        #print("...",x_t)
        u_t=U[:,t]
        xref_t=xref[:,t]
        cc=tf_cost(x_t1,u_t,xref_t,t,B)
        #print(cc[0])
        X[:,t+1]=x_t1.copy()
    return X,cc

In [4]:
import numpy as np
Q=np.ones((4,4),dtype=np.float64)
Q=np.diag((1,1,1,1))
print(Q)
X=np.ones((4),dtype=np.float64)
print(X.T)
ret=np.matmul(X.T,Q)
print(ret)
np.matmul(ret,X)

[[1 0 0 0]
 [0 1 0 0]
 [0 0 1 0]
 [0 0 0 1]]
[1. 1. 1. 1.]
[1. 1. 1. 1.]


4.0

In [11]:
a=np.array([-1,-2,0,1,2],dtype=np.float64)
b=np.maximum(0,a)
a,b

(array([-1., -2.,  0.,  1.,  2.]), array([0., 0., 0., 1., 2.]))

In [5]:
a=np.ones((4))
b=2*np.ones((4))
c=3*np.ones((4))
a*b*c*5.0

array([30., 30., 30., 30.])

In [None]:
import numpy as np
from copy import copy, deepcopy

from model import TwoWheeledCar

class iLQRController():
    
    def ilqr(self, x0, U=None): 
        """ use iterative linear quadratic regulation to find a control 
        sequence that minimizes the cost function 
        
        Parameters
        --------------
        x0 : numpy.ndarray, shape(STATE_SIZE, )
            the initial state of the system
        U : numpy.ndarray(TIME, INPUT_SIZE)
            the initial control trajectory dimension
        """
        U = np.zeros((NX,T),dtype=np.float64) if U is None else U

        lamb = 1.0 # regularization parameter
        sim_new_trajectory = True
        tN = U.shape[0] # number of time steps

        for ii in range(self.max_iter):

            if sim_new_trajectory == True: 
                # simulate forward using the current control trajectory
                X, cost = self.simulate(x0, U)
                oldcost = np.copy(cost) # copy for exit condition check

                # 
                f_x = np.zeros((tN, self.STATE_SIZE, self.STATE_SIZE)) # df / dx
                f_u = np.zeros((tN, self.STATE_SIZE, self.INPUT_SIZE)) # df / du
                # for storing quadratized cost function 

                l = np.zeros((tN,1)) # immediate state cost 
                l_x = np.zeros((tN, self.STATE_SIZE)) # dl / dx
                l_xx = np.zeros((tN, self.STATE_SIZE, self.STATE_SIZE)) # d^2 l / dx^2
                l_u = np.zeros((tN, self.INPUT_SIZE)) # dl / du
                l_uu = np.zeros((tN, self.INPUT_SIZE, self.INPUT_SIZE)) # d^2 l / du^2
                l_ux = np.zeros((tN, self.INPUT_SIZE, self.STATE_SIZE)) # d^2 l / du / dx
                # for everything except final state
                for t in range(tN-1):
                    # x(t+1) = f(x(t), u(t)) = x(t) + dx(t) * dt
                    # linearized dx(t) = np.dot(A(t), x(t)) + np.dot(B(t), u(t))
                    # f_x = (np.eye + A(t)) * dt
                    # f_u = (B(t)) * dt
                    # continuous --> discrete
                    A, B = self.finite_differences(X[t], U[t])
                    f_x[t] = np.eye(self.STATE_SIZE) + A * self.dt
                    f_u[t] = B * self.dt

                    """ NOTE: why multiply dt in original program ??  
                    So the dt multiplication and + I is because we’re actually taking the derivative of the state with respect to the previous state. Which is not
                    dx = Ax + Bu,
                    but rather
                    x(t) = x(t-1) + (Ax(t-1) + Bu(t-1))*dt
                    And that’s where the identity matrix and dt come from!
                    So that part’s in the comments of the code, but the *dt on all the cost function stuff is not commented on at all!
                    So here the dt lets you normalize behaviour for different time steps (assuming you also scale the number of steps in the sequence).
                    So if you have a time step of .01 or .001 you’re not racking up 10 times as much cost function in the latter case.
                    And if you run the code with 50 steps in the sequence and dt=.01 and 500 steps in the sequence
                    and dt=.001 you’ll see that you get the same results, which is not the case at all when you don’t take dt into account in the cost function!
                    """
                    
                    (l[t], l_x[t], l_xx[t], l_u[t], l_uu[t], l_ux[t]) = self.cost(X[t], U[t])
                    
                    l[t] *= self.dt
                    l_x[t] *= self.dt
                    l_xx[t] *= self.dt
                    l_u[t] *= self.dt
                    l_uu[t] *= self.dt
                    l_ux[t] *= self.dt

                # and for final state
                l[-1], l_x[-1], l_xx[-1] = self.cost_final(X[-1])

                sim_new_trajectory = False

            V = l[-1].copy() # value function
            V_x = l_x[-1].copy() # dV / dx
            V_xx = l_xx[-1].copy() # d^2 V / dx^2
            k = np.zeros((tN, self.INPUT_SIZE)) # feedforward modification
            K = np.zeros((tN, self.INPUT_SIZE, self.STATE_SIZE)) # feedback gain

            # work backwards to solve for V, Q, k, and K
            for t in range(self.tN-2, -1, -1):
                Q_x = l_x[t] + np.dot(f_x[t].T, V_x) 
                Q_u = l_u[t] + np.dot(f_u[t].T, V_x)

                Q_xx = l_xx[t] + np.dot(f_x[t].T, np.dot(V_xx, f_x[t])) 
                Q_ux = l_ux[t] + np.dot(f_u[t].T, np.dot(V_xx, f_x[t]))
                Q_uu = l_uu[t] + np.dot(f_u[t].T, np.dot(V_xx, f_u[t]))

                Q_uu_evals, Q_uu_evecs = np.linalg.eig(Q_uu)
                Q_uu_evals[Q_uu_evals < 0] = 0.0
                Q_uu_evals += lamb
                Q_uu_inv = np.dot(Q_uu_evecs, np.dot(np.diag(1.0/Q_uu_evals), Q_uu_evecs.T))

                k[t] = -1. * np.dot(Q_uu_inv, Q_u)
                K[t] = -1. * np.dot(Q_uu_inv, Q_ux)

                V_x = Q_x - np.dot(K[t].T, np.dot(Q_uu, k[t]))
                V_xx = Q_xx - np.dot(K[t].T, np.dot(Q_uu, K[t]))

            U_new = np.zeros((tN, self.INPUT_SIZE))
            x_new = x0.copy()
            for t in range(tN - 1): 
                # use feedforward (k) and feedback (K) gain matrices 
                # calculated from our value function approximation
                U_new[t] = U[t] + k[t] + np.dot(K[t], x_new - X[t])
                _,x_new = self.plant_dynamics(x_new, U_new[t])

            # evaluate the new trajectory 
            X_new, cost_new = self.simulate(x0, U_new)

            if cost_new < cost: 
                # decrease lambda (get closer to Newton's method)
                lamb /= self.lamb_factor

                X = np.copy(X_new) # update trajectory 
                U = np.copy(U_new) # update control signal
                oldcost = np.copy(cost)
                cost = np.copy(cost_new)

                sim_new_trajectory = True # do another rollout

                # check to see if update is small enough to exit
                if ii > 0 and ((abs(oldcost-cost)/cost) < self.eps_converge):
                    print("Converged at iteration = %d; Cost = %.4f;"%(ii,cost_new) + 
                            " logLambda = %.1f"%np.log(lamb))
                    break

            else: 
                # increase lambda (get closer to gradient descent)
                lamb *= self.lamb_factor
                # print("cost: %.4f, increasing lambda to %.4f")%(cost, lamb)
                if lamb > self.lamb_max: 
                    print("lambda > max_lambda at iteration = %d;"%ii + 
                        " Cost = %.4f; logLambda = %.1f"%(cost, 
                                                          np.log(lamb)))
                    break

        return X, U, cost

    def plant_dynamics(self, x, u):
       
        self.simulator.initialize_state(x)

        # apply the control signal
        x_next = self.simulator.update_state(u, self.dt)
        
        # calculate the change in state
        xdot = ((x_next - x) / self.dt).squeeze()

        return xdot, x_next

    

In [59]:
ret[2][3]

array([0.02, 0.  ])