In [1]:
%matplotlib tk
import matplotlib.pyplot as plt
import cvxpy
import math
import numpy as np
import cubic_spline_planner


NX = 4  # x = x, y, v, yaw
NU = 2  # a = [accel, steer]
T = 5  # horizon length

# mpc parameters
R = np.diag([0.01, 0.01])  # input cost matrix
Rd = np.diag([0.01, 1.0])  # input difference cost matrix
Q = np.diag([1.0, 1.0, 0.5, 0.5])  # state cost matrix
Qf = Q  # state final matrix

GOAL_DIS = 1.5  # goal distance
STOP_SPEED = 0.5 / 3.6  # stop speed
MAX_TIME = 500.0  # max simulation time

# iterative paramter
MAX_ITER = 3  # Max iteration
DU_TH = 0.1  # iteration finish param

TARGET_SPEED = 50#10.0 / 3.6  # [m/s] target speed
N_IND_SEARCH = 10  # Search index number

DT = 0.2  # [s] time tick

# Vehicle parameters
WB = 110#2.5  # [m]

MAX_STEER = np.deg2rad(45.0)  # maximum steering angle [rad]
MAX_DSTEER = np.deg2rad(30.0)  # maximum steering speed [rad/s]
MAX_SPEED = 25#55.0 / 3.6  # maximum speed [m/s]
MIN_SPEED = -25#-20.0 / 3.6  # minimum speed [m/s]
MAX_ACCEL = 5.0  # maximum accel [m/ss]

show_animation = True

In [2]:
def iterative_linear_mpc_control(xref, Motion_Status, dref, oa, od,x0):
    """
    MPC contorl with updating operational point iteraitvely
    """  
    
    if oa is None or od is None:
        oa = [0.0] * T
        od = [0.0] * T
    state=Motion_Status.get_state()   
    for i in range(MAX_ITER):
        xbar = Motion_Status.get_trajectory(oa, od,T)
        poa, pod = oa[:], od[:]
        oa, od, ox, oy, oyaw, ov = linear_mpc_control(xref, xbar, state, dref,x0)
        du = sum(abs(oa - poa)) + sum(abs(od - pod))  # calc u change value
        if du <= DU_TH:
            break
    else:
        print("Iterative is max iter")
    
    return oa, od, ox, oy, oyaw, ov,xbar

def get_linear_model_matrix(v, phi, delta):

    A = np.zeros((NX, NX))
    A[0, 0] = 1.0
    A[1, 1] = 1.0
    A[2, 2] = 1.0
    A[3, 3] = 1.0
    A[0, 2] = DT * math.cos(phi)
    A[0, 3] = - DT * v * math.sin(phi)#
    A[1, 2] = DT * math.sin(phi)
    A[1, 3] = DT * v * math.cos(phi)#
    A[3, 2] = DT * math.tan(delta) / WB

    B = np.zeros((NX, NU))
    B[2, 0] = DT
    B[3, 1] = DT * v / (WB * math.cos(delta) ** 2)#

    C = np.zeros(NX)
    C[0] = DT * v * math.sin(phi) * phi
    C[1] = - DT * v * math.cos(phi) * phi
    C[3] = - DT * v * delta / (WB * math.cos(delta) ** 2)
    #C = np.zeros(NX)
    return A, B, C

def linear_mpc_control(xref, xbar, state, dref,x0):
    """
    linear mpc control

    xref: reference point
    xbar: operational point
    x0: initial state
    dref: reference steer angle
    """

    x = cvxpy.Variable((NX, T + 1))
    u = cvxpy.Variable((NU, T))

    cost = 0.0
    constraints = []

    for t in range(T):
        cost += cvxpy.quad_form(u[:, t], R)

        if t != 0:
            cost += cvxpy.quad_form(xref[:, t] - x[:, t], Q)

        A, B, C = get_linear_model_matrix(xbar[2, t], xbar[3, t], 0)
        constraints += [x[:, t + 1] == A * x[:, t] + B * u[:, t]+ C]

        if t < (T - 1):
            cost += cvxpy.quad_form(u[:, t + 1] - u[:, t], Rd)
            constraints += [cvxpy.abs(u[1, t + 1] - u[1, t]) <=
                            MAX_DSTEER * DT]

    cost += cvxpy.quad_form(xref[:, T] - x[:, T], Qf)

    constraints += [x[:, 0] == [state.x,state.y,state.v,state.yaw]]
    constraints += [x[2, :] <= MAX_SPEED]
    constraints += [x[2, :] >= MIN_SPEED]
    constraints += [cvxpy.abs(u[0, :]) <= MAX_ACCEL]
    constraints += [cvxpy.abs(u[1, :]) <= MAX_STEER]

    prob = cvxpy.Problem(cvxpy.Minimize(cost), constraints)
    prob.solve(solver=cvxpy.OSQP, warm_start=True)
    #prob.solve(solver=cvxpy.ECOS, verbose=False)

    if prob.status == cvxpy.OPTIMAL or prob.status == cvxpy.OPTIMAL_INACCURATE:
        ox = np.array(x.value[0, :]).flatten()
        oy = np.array(x.value[1, :]).flatten()
        ov = np.array(x.value[2, :]).flatten()
        oyaw = np.array(x.value[3, :]).flatten()
        oa = np.array(u.value[0, :]).flatten()
        odelta = np.array(u.value[1, :]).flatten()

    else:
        print("Error: Cannot solve mpc..")
        oa, odelta, ox, oy, oyaw, ov = None, None, None, None, None, None

    return oa, odelta, ox, oy, oyaw, ov



In [3]:
class State:
     
    def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0):
        self.x = x
        self.y = y
        self.yaw = yaw
        self.v = v
        self.predelta = None

class Motion():

    def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0):
        self.state=State(x=x, y=y, yaw=yaw, v=v)
        
    def get_state(self):
        return State(x=self.state.x, y=self.state.y, yaw=self.state.yaw, v=self.state.v)
        
    def set_state(self,state):
        self.state=state
    def get_next_state(self, a, delta, curr_state=-1):
        if curr_state==-1:
            curr_state=self.state
        # input check
        if delta >= MAX_STEER:
            delta = MAX_STEER
        elif delta <= -MAX_STEER:
            delta = -MAX_STEER

        next_x = curr_state.x + curr_state.v * math.cos(curr_state.yaw) * DT
        next_y = curr_state.y + curr_state.v * math.sin(curr_state.yaw) * DT
        next_yaw = curr_state.yaw + curr_state.v / WB * math.tan(delta) * DT
        next_v = curr_state.v + a * DT
        
        if next_v > MAX_SPEED:
            next_v = MAX_SPEED
        elif next_v < MIN_SPEED:
            next_v = MIN_SPEED

        return State(x=next_x,y=next_y,v=next_v,yaw=next_yaw)


    def get_trajectory(self,accels, deltas,time_step):
        xbar =np.zeros((NX, time_step + 1))
        xbar[:, 0] = self.state.x,self.state.y,self.state.v,self.state.yaw
        #_state=State(x=state.x,y=state.y,v=state.v,yaw=state.yaw)
        _state=Motion.get_state(self)
        for (accel, delta, i) in zip(accels, deltas, range(1, T + 1)):
            next_state = Motion.get_next_state(self,accel, delta, curr_state=_state)
            xbar[:, i] = next_state.x,next_state.y,next_state.v,next_state.yaw
            _state=next_state
        

        return xbar


In [4]:
def gen_points(cx,cy):
    dx=20#2
    dy=10#1
    pts=[[cx-dx,cy+dy],
    [cx+dx,cy+dy],
    [cx+dx,cy-dy],
    [cx-dx,cy-dy],
    [cx-dx,cy+dy]]
    pts=np.array(pts,dtype=np.float64)
    return pts
class move_points():
    def __init__(self,points,XX,YY,phi):
        self.points=points.copy()
        self.XX=XX
        self.YY=YY
        self.ox=0
        self.oy=0
        phi=(math.pi*phi)/180
        self.rot_mat=np.array([[math.cos(phi),-math.sin(phi)],[math.sin(phi),math.cos(phi)]])
    def move(self):
        self.points[:,0]+=self.XX
        self.points[:,1]+=self.YY
    def get_origin(self):
        ox=0
        oy=0
        oo=np.mean(self.points,axis=0)
        self.ox=oo[0]
        self.oy=oo[1]
        print(self.ox,self.oy)
        return self.ox,self.oy
    def shift_origin(self):
        self.points[:,0]-=np.float64(self.ox)
        self.points[:,1]-=np.float64(self.oy)
        return self.points
    def reverse_shift_origin(self):
        self.points[:,0]+=self.ox
        self.points[:,1]+=self.oy
        return self.points
    def rotate(self):
        for i in range(len(self.points)):
            self.points[i]=self.points[i].dot(self.rot_mat)
        return self.points
    def apply(self):
        self.move()
        self.get_origin()
        self.shift_origin()
        self.rotate()
        self.reverse_shift_origin()
        return self.points

In [None]:
# initial yaw compensation
'''
if state.yaw - cyaw[0] >= math.pi:
        state.yaw -= math.pi * 2.0
elif state.yaw - cyaw[0] <= -math.pi:
        state.yaw += math.pi * 2.0
'''
#goal = [cx[-1], cy[-1]]


from collections import deque
class Data_Streamer():
    
    def __init__(self,qlen=15,_speed=1.0):
        self._stream=[]
        self.point_stream=deque(maxlen=qlen)
        self.qlen=qlen
        self.max_len=50
        self.target_speed=_speed
        self.x=None
        self.y=None
        self.v=None
        self.yaw=None
        self.index=0
        self.dind=0
        self.end=False
        
    def get_spline(self,tx,ty,dl):
        cx=tx
        cy=ty
        #cx, cy, _, _, _ = cubic_spline_planner.calc_spline_course(tx,ty,ds=dl)
        return tx,ty
    
    def get_yaw(self):
        
        dcx=np.gradient(self.x)
        dcy=np.gradient(self.y)
        cyaw=[np.arctan2(y,x) for x,y in zip(dcx,dcy)]
        return cyaw
        #sp = calc_speed_profile(cx, cy, cyaw, TARGET_SPEED)

    def get_speed_profile(self):

        speed_profile = np.ones(len(self.x),dtype=np.float32)
        speed_profile*=self.target_speed
        speed_profile[-1] = 0.0

        return speed_profile


    def get_planned_traj(self,_stream):
        self._stream=_stream
        tx,ty=self._stream[:self.max_len,0],self._stream[:self.max_len,1]
        self.x=tx
        self.y=ty
        self.yaw=self.get_yaw()
        self.v=self.get_speed_profile()
        
        for _ in range(self.qlen):
            ind=self.index
            self.point_stream.append([self.x[ind],self.y[ind],self.v[ind],self.yaw[ind]])
            self.index+=1
        return self.end
    
    def get_trajectory(self):
        return self.point_stream.copy()
    
    def add_points(self,num):
        
        for _ in range(num):
            ind=self.index
            if ind-self.qlen<len(self.x):
                self.point_stream.append([self.x[ind],self.y[ind],self.v[ind],self.yaw[ind]])
                self.index+=1
            else:
                self.end=True
                break
        return self.end
   
    def get_origin_shift(self,state):
        
        traj_points=self.get_trajectory()
        d = [(state.x-idx) ** 2 + (state.y-idy) ** 2 for (idx, idy, _, _) in traj_points]
        mind = min(d)
        self.dind = d.index(mind)
        return self.dind
    
    def shift_trajectory(self):
        
        if self.dind:
            self.end=self.add_points(dind)
            
        return self.end
            
        
    def get_ref_trajectory(self,state,_dl,_dt,t_step):
        
        traj_points=self.get_trajectory()
        xref = np.zeros((NX, t_step + 1))
        dref = np.zeros((1, t_step + 1))
        ncourse = len(self.x)

        xref[:, 0] = traj_points[0]
        dref[0, 0] = 0.0  # steer operational point should be 0

        travel = 0.0

        for i in range(t_step + 1):
            travel += abs(state.v) * _dt
            dind = int(round(travel / _dl))

            if (dind) < len(traj_points)-1:
                xref[:, i] = traj_points[i]
                dref[0, i] = 0.0
            else:
                xref[:, i] = traj_points[-1]
                dref[0, i] = 0.0

        return xref,dref


In [None]:
%matplotlib tk
import gym
from matplotlib import pyplot as plt
import cv2
import numpy as np
from car_env import CarRacing
import time
import cubic_spline_planner
import pickle

class Car_Env():
    
    def __init__(self):
        self.ENV=CarRacing()
        
        self._save_path=False
        self._read_path=True
        self._state=State(x=0,y=0,yaw=0,v=0)
        self.points=[[0,0]]
        self.frame=np.zeros((100,200,3),dtype=np.uint8)
        self.Dstr=Data_Streamer(15,TARGET_SPEED)
        
    def read_path(self):
        f=open('path-1','rb')
        track=pickle.load(f)
        f.close()
        self.ENV.track=track
    
    
    def save_path(self):
        f=open('path-2','wb')
        pickle.dump(self.ENV.track,f)
        f.close()
    
    def accl_to_gas(self,accl,vel):
        dt=1.0
        SIZE = 0.02
        ENGINE_POWER= 100000000*SIZE*SIZE
        WHEEL_MOMENT_OF_INERTIA = 4000*SIZE*SIZE
        WHEEL_R  = 27 
        wheel_rad=WHEEL_R*SIZE
        mass=(2*WHEEL_MOMENT_OF_INERTIA)/wheel_rad
        #mass=.25
        power=4*mass*accl*(2*vel+accl*dt)/2#
        #power=4*mass*accl*(vel+accl/2)
        gas=power/ENGINE_POWER
        return gas
    
    def step(self,action,show_plot=False):

       
        
        action[1]=self.accl_to_gas(action[1],self._state.v)
        #action[0]=action[0]-np.deg2rad(90)
        action[0]*=-1
        
        
        self.gas=action[1]
        self.steer=action[0]
        self.brake=action[2]
        
        nst,rw,ter,info=self.ENV.step(action) # take a random action
        self.frame[0:96,0:96,:]=np.array(nst)

        pos=self.ENV.car.hull.position
        vel=np.sqrt(self.ENV.car.hull.linearVelocity[0]**2+self.ENV.car.hull.linearVelocity[1]**2)
        ang=self.ENV.car.hull.angle
        #ang*=-1
        ang=ang+np.deg2rad(90)#-ang
        
            
        self._state= State(x=pos[0],y=pos[1],yaw=ang,v=vel) 
        self.Dstr.get_origin_shift(self._state)
        self.Dstr.shift_trajectory()
        
        return [pos[0],pos[1],ang,vel],rw,ter,info

    
    
    def render(self):
            
            font = cv2.FONT_HERSHEY_SIMPLEX
            plt.cla()
            img = cv2.putText(np.array(self.frame), 'vel %f'%vel, (100,10), font,.3, (255,255,0),1, cv2.LINE_AA)
            img = cv2.putText(np.array(img), 'ang %f'%ang, (100,20), font,.3, (255,0,255),1, cv2.LINE_AA)
            image = cv2.putText(np.array(img), 'x,y %f,%f'%(pos[0],pos[1]), (100,30), font,.2, (255,0,255),1, cv2.LINE_AA)

            plt.imshow(image)
            plt.show()
            plt.pause(0.0001)


    def reset(self):
        
        self.ENV.reset()
        
        if self._save_path==True:
            self.save_path()
        if self._read_path ==True:
            self.read_path()
        
        points=[[x,y] for a,b,x,y in self.ENV.track]
        self.points=np.array(points)
        self.Dstr.get_planned_traj(self.points)
        
        plt.plot(self.points[:,0],self.points[:,1])
        plt.show()
        
    
       
        return [self.step([0,0,0]),self.points]

#st,points=env_reset(envc)
'''
for i in range(100):
    st,rw,ter,info=env_step(envc,[0,1,0])
    time.sleep(0.1)
    #print(st)
    '''
#envc.close()



'\nfor i in range(100):\n    st,rw,ter,info=env_step(envc,[0,1,0])\n    time.sleep(0.1)\n    #print(st)\n    '

In [None]:
#cx, cy, cyaw, ck = get_switch_back_course(dl)
dl=1.0
Env=Car_Env()
[init_state,traj_points]=Env.reset()
ix,iy,iyaw,ivel=init_state[0]


Motion_Status = Motion(x=ix, y=iy, yaw=iyaw, v=ivel)
state=Motion_Status.get_state()
print(state)

Track generation: 1173..1470 -> 297-tiles track
<__main__.State object at 0x7f110f5b9048>


In [None]:
time = 0.0
x = [state.x]
y = [state.y]
yaw = [state.yaw]
v = [state.v]
t = [0.0]
d = [0.0]
a = [0.0]





#index, _ = get_closest_point(state, _cx, _cy, cyaw, 0)
odelta, oa = None, None
_index=0
#cyaw = smooth_yaw(cyaw)
accl,gas,steer=0,0,0
state_str=''
act_str=''
xref_str=''
xbar_str=''

sensor_stream=Data_Streamer(qlen=15,_speed=TARGET_SPEED)
_get_new=True
mapped=True
while MAX_TIME >= time:
        if _get_new==True:
            print('Planning trajectory ... ')
            _get_new=sensor_stream.get_planned_traj(traj_points)
            
                
        state=Motion_Status.get_state()
        x0 = [state.x, state.y, state.v, state.yaw]
        
        if mapped==True:
            dind=  sensor_stream.get_origin_shift(state)
            _get_new =  sensor_stream.shift_trajectory()
            if _get_new==True:
                break
        
        xref, dref = sensor_stream.get_ref_trajectory(state,dl,DT,T)
    
        _index+=dind
        
        oa, odelta, ox, oy, oyaw, ov ,xbar= iterative_linear_mpc_control(xref, Motion_Status, dref, oa, odelta,x0)
        
        state_str=str(state.x)+','+str(state.y)+','+str(state.v)+','+str(np.rad2deg(state.yaw))+'\n'
        xref_str=str(xref[0][0])+','+str(xref[1][0])+','+str(xref[2][0])+','+str(np.rad2deg(xref[3][0]))+','+str(_index)+'\n'
        xbar_str=str(xbar[0][0])+','+str(xbar[1][0])+','+str(xbar[2][0])+','+str(np.rad2deg(xbar[3][0]))+'\n'
        act_str=str(accl)+','+str(gas)+','+str(np.rad2deg(steer))+'\n'
    
        #xref_str=str(xref[3][0])+'\n'
        #xbar_str=str(xbar[3][:])+'\n'
        if odelta is not None:
            di, ai = odelta[0], oa[0]
        accl=ai
        steer=di
        
        #act_str=str(steer)+'\n'
        nst,rw,ter,info=Env.step([steer,accl,0])
        gas=Env.gas
        
        next_state=State(x=nst[0],y=nst[1],yaw=nst[2],v=nst[3])
        Motion_Status.set_state(next_state)
        
        time = time + DT

        x.append(state.x)
        y.append(state.y)
        yaw.append(state.yaw)
        v.append(state.v)
        t.append(time)
        d.append(di)
        a.append(ai)

        
        cx=traj_points[:50,0]
        cy=traj_points[:50,1]
        if show_animation:  # pragma: no cover
            plt.cla()
            plt.text(50,50,'state : '+state_str,fontsize=12)
            plt.text(50,40,'xref : '+xref_str,fontsize=12)
            plt.text(50,30,'xbar : '+xbar_str,fontsize=12)
            plt.text(50,20,'act : '+act_str,fontsize=12)
            
            if ox is not None:
                plt.plot(ox, oy, "xr", label="MPC")
            plt.plot(cx, cy, "-r", label="course")
            plt.plot(x, y, "ob", label="trajectory")
            plt.plot(xref[0, :], xref[1, :], "xk", label="xref")
            plt.plot(cx[_index], cy[_index], "xg", label="target")
            #plot_car(state.x, state.y, state.yaw, steer=di)
            
            points=gen_points(state.x,state.y)
            ang_yaw=-(state.yaw*180.0)/math.pi
            rp=move_points(points,0,0,ang_yaw)
            r_points=rp.apply()
            points=np.array(points,dtype=np.float64)
            r_points=np.array(r_points,dtype=np.float64)
            plt.plot(r_points[:,0].flatten(),r_points[:,1].flatten(),"-r")
            
            plt.axis("equal")
            plt.grid(True)
            plt.title("Time[s]:" + str(round(time, 2))
                      + ", speed[km/h]:" + str(round(state.v * 3.6, 2))
                      + '\nyaw'+str(state.yaw)
                      + '\nxref_yaw'+str(xref[3][0])
                      + '\nsteer'+str(steer))
            plt.pause(0.0001)

if show_animation:  # pragma: no cover
            plt.close("all")
            plt.subplots()
            plt.plot(cx, cy, "-r", label="spline")
            plt.plot(x, y, "-g", label="tracking")
            plt.grid(True)
            plt.axis("equal")
            plt.xlabel("x[m]")
            plt.ylabel("y[m]")
            plt.legend()

            plt.subplots()
            plt.plot(t, v, "-r", label="speed")
            plt.grid(True)
            plt.xlabel("Time [s]")
            plt.ylabel("Speed [kmh]")

            plt.show()


Planning trajectory ... 
214.88699340820312 -52.55186462402344
214.8873748779297 -52.54826736450195
214.8880615234375 -52.54189682006836
214.88909912109375 -52.53269958496094
214.89060974121094 -52.520748138427734
214.8927001953125 -52.506107330322266
214.89553833007812 -52.488861083984375
214.899169921875 -52.469058990478516
214.9036407470703 -52.44675064086914
214.90899658203125 -52.421966552734375
214.91529846191406 -52.394744873046875
214.9224395751953 -52.365047454833984
214.9304656982422 -52.33287811279297
214.9393768310547 -52.29823303222656
214.94895935058594 -52.26097869873047
214.9591064453125 -52.22102355957031
214.9696502685547 -52.17827606201172
214.98028564453125 -52.13256072998047
214.99085998535156 -52.08382797241211
215.00120544433594 -52.03203201293945
215.01123046875 -51.97714614868164
215.0208282470703 -51.91918182373047
215.03009033203125 -51.85817337036133
215.0393524169922 -51.7942008972168
215.04824829101562 -51.727210998535156
215.05709838867188 -51.65725708007

In [None]:
len(cx)

In [None]:
cx, cy, cyaw, ck, s = cubic_spline_planner.calc_spline_course(tx,ty,ds=1.0)
i_yaw=cyaw[0]
cyaw=[yaw-i_yaw for yaw in cyaw]
cyaw

In [None]:
#%matplotlib inline
plt.plot(cx,cy)
plt.show()

In [None]:
plt.plot(range(len(cyaw)),cyaw)
plt.show()

In [None]:
dcx=np.gradient(cx)
dcy=np.gradient(cy)
cyaw=[np.arctan2(y,x) for x,y in zip(dcx,dcy)]

In [None]:
cx, cy, cyaw, ck, s = cubic_spline_planner.calc_spline_course(tx,ty,ds=dl)
yaw0=np.deg2rad(90)
cyaw=[yaw-yaw0 for yaw in cyaw]

In [None]:
import numpy as np
np.rad2deg(2.64)

In [None]:
cyaw

In [None]:
import cvxpy as cp
import numpy as np

# Problem data.
m = 1
n = 1
np.random.seed(1)
A = 10#np.random.randn(m, n)
b = 5#np.random.randn(m)

# Construct the problem.
x = cp.Variable(n)
objective = cp.Minimize(cp.sum_squares(A*x - b))
constraints = [0 <= x, x <= 1]
prob = cp.Problem(objective, constraints)

# The optimal objective value is returned by `prob.solve()`.
result = prob.solve()
# The optimal value for x is stored in `x.value`.
print(x.value)
# The optimal Lagrange multiplier for a constraint is stored in
# `constraint.dual_value`.
print(constraints[0].dual_value)

In [None]:
print(x.value)

In [None]:
x=np.array(range(0,100))
x=x/100

In [None]:
y=10*x-5
y=np.square(y)
plt.plot(x,y)
plt.show()