In [1]:
import numpy as np
import casadi as cas

class MPC:
    def __init__(self, dt):
        self.dt = dt
        self.k_total = 1.0 # the overall weighting of the total costs
        self.theta_iamb = np.pi/4 # my theta wrt to ambulance
        self.L = 4.5
        self.W = 1.8
        self.n_circles = 2
        ## State Costs Constants
        
        self.k_x = 0
        self.k_y = 0
        self.k_phi = 0
        self.k_delta = 0
        
        self.k_v = 0
        self.k_s = -0.1
    
        ## Control Costs Constants
        self.k_u_delta = 10.0
        self.k_u_v = 1.0
        
        ## Derived State Costs Constants
        self.k_lat = 10.0
        self.k_lon = 1.0
        self.k_phi_error = 1.0
        self.k_phi_dot = 1.0

        self.k_x_dot = 0.0

        self.k_change_u_v = 1.0
        self.k_change_u_delta = 1.0

        self.k_final = 0

        
        # Constraints
        self.max_delta_u = 5 * np.pi/180
        self.max_acceleration = 4
        self.max_v_u = self.max_acceleration
        self.min_v_u = -self.max_v_u        

        self.max_v =  25 * 0.447 # m/s
        self.min_v = 0.0
        self.max_y = np.infty
        self.min_y = -np.infty

        self.max_X_dev = np.infty
        self.max_Y_dev = np.infty        

        self.f = self.gen_f_vehicle_dynamics()
        self.fd = None

        # Distance used for collision avoidance
        self.circle_radius = np.sqrt(2) * self.W/2.0
        self.min_dist = 2 * self.circle_radius   # 2 times the radius of 1.5

    def generate_lateral_cost(self, X, X_desired):
        lateral_cost = np.sum([(-cas.sin(X_desired[2,k]) * (X[0,k]-X_desired[0,k]) + 
            cas.cos(X_desired[2,k]) * (X[1,k]-X_desired[1,k]))**2
           for k in range(X.shape[1])])
        
        return lateral_cost
    
    def generate_longitudinal_cost(self, X, X_desired):
        longitudinal_cost = np.sum([(cas.cos(X_desired[2,k]) * (X[0,k]-X_desired[0,k]) + 
            cas.sin(X_desired[2,k]) * (X[1,k]-X_desired[1,k]))**2
           for k in range(X.shape[1])]) 
        
        return longitudinal_cost
    
    def generate_phidot_cost(self, X):
        phid = X[4,:] * cas.tan(X[3,:]) / self.L
        phid_cost = cas.sumsqr(phid)    
        
        return phid_cost
    
    def generate_costs(self, X, U, X_desired):
        self.u_delta_cost = cas.sumsqr(U[0,:])
        self.u_v_cost = cas.sumsqr(U[1,:])

        self.lon_cost = self.generate_longitudinal_cost(X, X_desired)
        
        self.phi_error_cost = cas.sumsqr(X_desired[2,:]-X[2,:]) 
        X_ONLY = False
        if X_ONLY:
            self.s_cost = cas.sumsqr(X[0,-1])
            self.lat_cost = cas.sumsqr(X[1,:])
        else:
            self.lat_cost = self.generate_lateral_cost(X, X_desired)
            self.s_cost = cas.sumsqr(X[5,-1])   
        self.final_costs = self.generate_lateral_cost(X[:,-5:],X_desired[:,-5:]) + cas.sumsqr(X_desired[2,-5:]-X[2,-5:])
        self.v_cost = cas.sumsqr(X[4, :])
        self.phidot_cost = self.generate_phidot_cost(X)
        N = U.shape[1] 
        self.change_u_delta = cas.sumsqr(U[0,1:N-1] - U[0,0:N-2])
        self.change_u_v = cas.sumsqr(U[1,1:N-1] - U[1,0:N-2])
        self.x_cost = cas.sumsqr(X[0,:])
        self.x_dot_cost = cas.sumsqr(self.dt * X[4, :] * cas.cos(X[2]))

    def total_cost(self):
        all_costs = [
            self.k_u_delta * self.u_delta_cost,
            self.k_u_v * self.u_v_cost,
            self.k_lat * self.lat_cost, 
            self.k_lon * self.lon_cost, 
            self.k_phi_error * self.phi_error_cost,
            self.k_phi_dot * self.phidot_cost,
            self.k_s * self.s_cost,
            self.k_v * self.v_cost,
            self.k_change_u_v * self.change_u_v,
            self.k_change_u_delta * self.change_u_delta, 
            self.k_final * self.final_costs,
            self.k_x * self.x_cost,
            self.k_x_dot * self.x_dot_cost
        ]
        all_costs = np.array(all_costs)
        total_cost = np.sum(all_costs)    
        return total_cost, all_costs

    def add_state_constraints(self, opti, X, U, X_desired, T, x0=None):
        if x0 is None:
            x_start = 0
        else:
            x_start = x0[0]
        
        opti.subject_to( opti.bounded(-1, X[0,:], np.infty )) #Constraints on X, Y
        opti.subject_to( opti.bounded(self.min_y+self.W/2.0, X[1,:], self.max_y-self.W/2.0) )
        opti.subject_to( opti.bounded(-np.pi/2, X[2,:], np.pi/2) ) #no crazy angle

        opti.subject_to(opti.bounded(-self.max_delta_u, U[0,:], self.max_delta_u))            
        opti.subject_to(opti.bounded(self.min_v_u, U[1,:], self.max_v_u)) # 0-60 around 4 m/s^2
        opti.subject_to(opti.bounded(self.min_v, X[4,:], self.max_v))    

        # Lane Deviations
        if self.max_X_dev < np.infty:
            opti.subject_to( opti.bounded(-self.max_X_dev, X[0,:] - X_desired[0,:], self.max_X_dev))
        opti.subject_to( opti.bounded(-self.max_Y_dev, X[1,:] - X_desired[1,:], self.max_Y_dev))


    def add_dynamics_constraints(self, opti, X, U, X_desired, x0):
        if self.fd == None:
            raise Exception("No Desired Trajectory Defined")

        # State Dynamics
        N = U.shape[1]
        for k in range(N):
            # opti.subject_to( X[:, k+1] == F(X[:, k], U[:, k], dt))
            opti.subject_to( X[:, k+1] == self.F_kutta(self.f, X[:, k], U[:, k]))
        
        for k in range(N+1):
            opti.subject_to( X_desired[:, k] == self.fd(X[-1, k]) ) #This should be the trajectory dynamic constraint             

        opti.subject_to(X[:,0] == x0)

    def F_kutta(self, f, x_k, u_k):
        k1 = f(x_k, u_k)
        k2 = f(x_k+self.dt/2*k1, u_k)
        k3 = f(x_k+self.dt/2*k2, u_k)
        k4 = f(x_k+self.dt*k3,   u_k)
        x_next = x_k + self.dt/6*(k1+2*k2+2*k3+k4) 
        return x_next
    
    def gen_f_vehicle_dynamics(self):
        X = cas.MX.sym('X')
        Y = cas.MX.sym('Y')
        Phi = cas.MX.sym('Phi')
        Delta = cas.MX.sym('Delta')
        V = cas.MX.sym('V')
        s = cas.MX.sym('s')

        delta_u = cas.MX.sym('delta_u')
        v_u = cas.MX.sym('v_u')
        x = cas.vertcat(X, Y, Phi, Delta, V, s)
        u = cas.vertcat(delta_u, v_u)

        ode = cas.vertcat(V * cas.cos(Phi),
                        V * cas.sin(Phi),
                        V * cas.tan(Delta) / self.L,
                        delta_u,
                        v_u,
                        V)

        f = cas.Function('f',[x,u],[ode],['x','u'],['ode'])
        return f
    
    def gen_f_desired_lane(self, world, lane_number, right_direction=True):
        if right_direction == False:
            raise Exception("Haven't implemented left lanes")

        s = cas.MX.sym('s')
        xd = s
        yd = world.get_lane_centerline_y(lane_number, right_direction)
        phid = 0
        des_traj = cas.vertcat(xd, yd, phid)
        fd = cas.Function('fd',[s],[des_traj],['s'],['des_traj'])

        return fd

    def get_car_circles(self, X, n_circles=2):
        if n_circles==2:
            x_circle_front = X[0:2,:] + (self.W/2) * cas.vertcat(cas.cos(X[2,:]), cas.sin(X[2,:])) 
            x_circle_rear = X[0:2,:] - (self.W/2) * cas.vertcat(cas.cos(X[2,:]), cas.sin(X[2,:])) 
            # x_circle_front = X[0:2,:] + (self.L/2 - self.W/2) * cas.vertcat(cas.cos(X[2,:]), cas.sin(X[2,:])) 
            # x_circle_rear = X[0:2,:] - (self.L/2 - self.W/2) * cas.vertcat(cas.cos(X[2,:]), cas.sin(X[2,:]))
            radius = 1.5
            min_dist = 2*radius
            centers = [x_circle_rear, x_circle_front]
        elif n_circles==3:
            x_circle_mid = X[0:2,:]
            dist_from_center = self.L/2.0 - self.W/2
            x_circle_rear = X[0:2,:]  -  dist_from_center * cas.vertcat(cas.cos(X[2,:]), cas.sin(X[2,:])) 
            x_circle_front = X[0:2,:] +  dist_from_center * cas.vertcat(cas.cos(X[2,:]), cas.sin(X[2,:])) 
            
            centers = (x_circle_rear, x_circle_mid, x_circle_front)
            radius = 1.1
            min_dist = 2*radius            

        return centers, radius

    def get_car_circles_np(self, X):
        if self.n_circles == 2:
            # x_circle_front = X[0:2,:] + (self.L/2 - self.W/2) * np.array([np.cos(X[2,:]),np.sin(X[2,:])]) 
            # x_circle_rear = X[0:2,:] - (self.L/2 - self.W/2) * np.array([np.cos(X[2,:]),np.sin(X[2,:])]) 
            x_circle_front = X[0:2,:] + (self.W/2) * np.array([np.cos(X[2,:]),np.sin(X[2,:])]) 
            x_circle_rear = X[0:2,:] - (self.W/2) * np.array([np.cos(X[2,:]),np.sin(X[2,:])]) 
            radius = 1.5
            # min_dist = 2*radius
            return [x_circle_rear, x_circle_front], radius
        elif self.n_circles == 3:
            x_circle_mid = X[0:2,:]
            dist_from_center = self.L/2.0 - self.W/2
            x_circle_rear = X[0:2,:]  -  dist_from_center* np.array([np.cos(X[2,:]),np.sin(X[2,:])]) 
            x_circle_front = X[0:2,:] +  dist_from_center* np.array([np.cos(X[2,:]),np.sin(X[2,:])]) 
            
            centers = [x_circle_rear, x_circle_mid, x_circle_front]
            radius = 1.1
            # min_dist = 2*radius
        
        return centers, radius

def load_state(file_name):

    x1 = np.load(file_name + "x1.npy",allow_pickle=False)
    u1 = np.load(file_name + "u1.npy",allow_pickle=False)
    x1_des = np.load(file_name + "x1_des.npy", allow_pickle=False)
    x2 = np.load(file_name + "x2.npy",allow_pickle=False)
    u2 = np.load(file_name + "u2.npy",allow_pickle=False)
    x2_des = np.load(file_name + "x2_des.npy",allow_pickle=False)
    xamb = np.load(file_name + "xamb.npy",allow_pickle=False)
    uamb = np.load(file_name + "uamb.npy",allow_pickle=False)
    xamb_des = np.load(file_name + "xamb_des.npy",allow_pickle=False)

    return x1, u1, x1_des, x2, u2, x2_des, xamb, uamb, xamb_des
    


def save_state(file_name, x1, u1, x1_des, x2, u2, x2_des, xamb, uamb, xamb_des):

    np.save(file_name + "x1", x1,allow_pickle=False)
    np.save(file_name + "u1", u1,allow_pickle=False)
    np.save(file_name + "x1_des", x1_des, allow_pickle=False)
    np.save(file_name + "x2", x2,allow_pickle=False)
    np.save(file_name + "u2", u2,allow_pickle=False)
    np.save(file_name + "x2_des", x2_des, allow_pickle=False)
    np.save(file_name + "xamb", xamb,allow_pickle=False)
    np.save(file_name + "uamb", uamb,allow_pickle=False)
    np.save(file_name + "xamb_des", xamb_des, allow_pickle=False)
    return file_name





In [3]:
# import numpy as np
# import casadi as cas
# import src.MPC_Casadi as mpc




class IterativeBestResponseMPCMultiple:
    ### We always assume that car1 is the one being optimized
    def __init__(self, responseMPC, ambulanceMPC, otherMPClist):

        self.responseMPC = responseMPC
        self.otherMPClist = otherMPClist   
        self.ambMPC = ambulanceMPC

        self.opti = cas.Opti()
        self.min_dist = 2 * 1.5   # 2 times the radius of 1.5
        self.k_slack = 99999
        self.k_CA = 0
        self.collision_cost = 0


    def generate_optimization(self, N, T, x0, x0_amb, x0_other, print_level=5, slack=True):
        
        # t_amb_goal = self.opti.variable()
        n_state, n_ctrl = 6, 2
        #Variables
        self.x_opt = self.opti.variable(n_state, N+1)
        self.u_opt  = self.opti.variable(n_ctrl, N)
        self.x_desired  = self.opti.variable(3, N+1)
        
        p = self.opti.parameter(n_state, 1)

        # Presume to be given...and we will initialize soon
        if self.ambMPC:
            self.xamb_opt = self.opti.variable(n_state, N+1)
            self.uamb_opt = self.opti.parameter(n_ctrl, N)
            self.xamb_desired = self.opti.variable(3, N+1)
            pamb = self.opti.parameter(n_state, 1)

        self.allother_x_opt = [self.opti.variable(n_state, N+1) for i in self.otherMPClist] 
        self.allother_u_opt = [self.opti.parameter(n_ctrl, N) for i in self.otherMPClist] 
        self.allother_x_desired = [self.opti.variable(3, N+1) for i in self.otherMPClist] 
        self.allother_p = [self.opti.parameter(n_state, 1) for i in self.otherMPClist]

        #### Costs
            
        self.responseMPC.generate_costs(self.x_opt, self.u_opt, self.x_desired)
        self.car1_costs, self.car1_costs_list = self.responseMPC.total_cost()

        if self.ambMPC:
            self.ambMPC.generate_costs(self.xamb_opt, self.uamb_opt, self.xamb_desired)
            self.amb_costs, self.amb_costs_list = self.ambMPC.total_cost()
        else:
            self.amb_costs, self.amb_costs_list = 0, []

        ## We don't need the costs for the other vehicles

        # self.slack1, self.slack2, self.slack3 = self.generate_slack_variables(slack, N)

        # We will do collision avoidance for ego vehicle with all other vehicles
        self.slack_vars_list = self.generate_slack_variables(slack, N, len(self.otherMPClist), n_circles = self.responseMPC.n_circles)
        
        if self.ambMPC:    
            self.slack_amb = self.generate_slack_variables(slack, N, 1)[0]

        self.slack_cost = 0
        for slack_var in self.slack_vars_list:
            self.slack_cost += cas.sumsqr(slack_var)
        
        if self.ambMPC:    
            self.slack_cost += cas.sumsqr(self.slack_amb)


        self.response_svo_cost = np.cos(self.responseMPC.theta_iamb)*self.car1_costs
        self.other_svo_cost = np.sin(self.responseMPC.theta_iamb)*self.amb_costs
        self.total_svo_cost = self.response_svo_cost + self.other_svo_cost + self.k_slack * self.slack_cost + self.k_CA * self.collision_cost
        ######## optimization  ##################################
        
        self.opti.minimize(self.total_svo_cost)    
        ##########################################################

        #constraints
        self.responseMPC.add_dynamics_constraints(self.opti, self.x_opt, self.u_opt, self.x_desired, p)
        self.responseMPC.add_state_constraints(self.opti, self.x_opt, self.u_opt, self.x_desired, T, x0)
        
        if self.ambMPC:        
            self.ambMPC.add_dynamics_constraints(self.opti, self.xamb_opt, self.uamb_opt, self.xamb_desired, pamb)
        
        for i in range(len(self.otherMPClist)):
            self.otherMPClist[i].add_dynamics_constraints(self.opti, 
                                                        self.allother_x_opt[i], self.allother_u_opt[i], self.allother_x_desired[i], 
                                                        self.allother_p[i])

        ## Generate the circles
        


        # Proxy for the collision avoidance points on each vehicle
        self.c1_vars = [self.opti.variable(2, N+1) for c in range(self.responseMPC.n_circles)]
        if self.ambMPC:  
            ca_vars = [self.opti.variable(2, N+1) for c in range(self.responseMPC.n_circles)]  

        self.other_circles = [[self.opti.variable(2, N+1) for c in range(self.responseMPC.n_circles)] for i in range(len(self.allother_x_opt))]
        self.collision_cost = 0
        # Collision Avoidance
        for k in range(N+1):
            # center_offset
            centers, response_radius = self.responseMPC.get_car_circles(self.x_opt[:,k]) 
            for c1_circle in centers:
                for i in range(len(self.allother_x_opt)):
                    other_centers, other_radius = self.otherMPClist[i].get_car_circles(self.allother_x_opt[i][:,k])    
                    for ci in range(len(other_centers)):
                        dist_sqr = cas.sumsqr(c1_circle - other_centers[ci])
                        self.opti.subject_to(dist_sqr - (response_radius + other_radius)**2 > 0 - self.slack_vars_list[i][ci,k])
                        dist_btw_object = cas.sqrt(dist_sqr) - 1.1*(response_radius + other_radius)

                        self.collision_cost += 1/dist_btw_object**4
                # Don't forget the ambulance
                if self.ambMPC:    
                    amb_circles, amb_radius = self.ambMPC.get_car_circles(self.xamb_opt[:,k])    
                    for ci in range(len(amb_circles)):                    
                        self.opti.subject_to(cas.sumsqr(c1_circle - amb_circles[ci]) > (response_radius + amb_radius)**2 - self.slack_amb[ci,k])
                        dist_btw_object = cas.sqrt(cas.sumsqr(c1_circle - amb_circles[ci])) - 1.1*(response_radius + amb_radius)
                        self.collision_cost += 1/dist_btw_object**4
                        

        self.opti.set_value(p, x0)

        for i in range(len(self.allother_p)):
            self.opti.set_value(self.allother_p[i], x0_other[i])

        if self.ambMPC:    
            self.opti.set_value(pamb, x0_amb) 

        self.opti.solver('ipopt',{'warn_initial_bounds':True},{'print_level':print_level, 'max_iter':10000})


    def solve(self, uamb, uother):

        if self.ambMPC:    
            self.opti.set_value(self.uamb_opt, uamb)

        for i in range(len(self.allother_u_opt)):
            self.opti.set_value(self.allother_u_opt[i], uother[i])
        # self.opti.set_value(self.u2_opt, u2) 

        self.solution = self.opti.solve()         

    def get_bestresponse_solution(self):
        x1, u1, x1_des, = self.solution.value(self.x_opt), self.solution.value(self.u_opt), self.solution.value(self.x_desired) 

        return x1, u1, x1_des

    def get_solution(self):
        x1, u1, x1_des, = self.solution.value(self.x_opt), self.solution.value(self.u_opt), self.solution.value(self.x_desired) 
        if self.ambMPC:    
            xamb, uamb, xamb_des, = self.solution.value(self.xamb_opt), self.solution.value(self.uamb_opt), self.solution.value(self.xamb_desired) 
        else:
            xamb, uamb, xamb_des, = None, None, None

        other_x = [self.solution.value(self.allother_x_opt[i]) for i in range(len(self.allother_x_opt))]
        other_u = [self.solution.value(self.allother_u_opt[i]) for i in range(len(self.allother_u_opt))]
        other_des = [self.solution.value(self.allother_x_desired[i]) for i in range(len(self.allother_x_desired))]
        return x1, u1, x1_des, xamb, uamb, xamb_des, other_x, other_u, other_des

    def generate_slack_variables(self, slack, N, number_slack_vars=3, n_circles=2):
        if slack == True:
            slack_vars = [self.opti.variable(n_circles, N+1) for i in range(number_slack_vars)]
            for slack in slack_vars:
                self.opti.subject_to(cas.vec(slack)>=0)
                # self.opti.subject_to(slack<=1.0)
        else:
            slack_vars = [self.opti.parameter(n_circles, N+1) for i in range(number_slack_vars)]
            for slack in slack_vars:
                self.opti.set_value(slack, np.zeros((n_circles,N+1)))
        return slack_vars


def load_state(file_name, n_others):
    xamb = np.load(file_name + "xamb.npy",allow_pickle=False)
    uamb = np.load(file_name + "uamb.npy",allow_pickle=False)
    xamb_des = np.load(file_name + "xamb_des.npy",allow_pickle=False)


    xothers, uothers, xothers_des = [], [], []
    for i in range(n_others):
        x = np.load(file_name + "x%0d.npy"%i, allow_pickle=False)
        u = np.load(file_name + "u%0d.npy"%i, allow_pickle=False)
        x_des = np.load(file_name + "x_des%0d.npy"%i, allow_pickle=False)
        xothers += [x]
        uothers += [u]
        xothers_des += [x_des]

    return xamb, uamb, xamb_des, xothers, uothers, xothers_des


def save_state(file_name, xamb, uamb, xamb_des, xothers, uothers, xothers_des):
    np.save(file_name + "xamb", xamb,allow_pickle=False)
    np.save(file_name + "uamb", uamb,allow_pickle=False)
    np.save(file_name + "xamb_des", xamb_des, allow_pickle=False)

    for i in range(len(xothers)):
        x, u, x_des = xothers[i], uothers[i], xothers_des[i]    
        np.save(file_name + "x%0d"%i, x, allow_pickle=False)
        np.save(file_name + "u%0d"%i, u, allow_pickle=False)
        np.save(file_name + "x_des%0d"%i, x_des, allow_pickle=False)
    
    return file_name

def save_costs(file_name, ibr):
    ### Get the value for each cost variable
    car1_costs_list = np.array([ibr.opti.debug.value(cost) for cost in ibr.car1_costs_list])
    amb_costs_list = np.array([ibr.opti.debug.value(cost) for cost in ibr.amb_costs_list])
    svo_cost = ibr.opti.debug.value(ibr.response_svo_cost)
    other_svo_cost = ibr.opti.debug.value(ibr.other_svo_cost)
    total_svo_cost = ibr.opti.debug.value(ibr.total_svo_cost)


    np.save(file_name + "car1_costs_list", car1_costs_list, allow_pickle=False)
    np.save(file_name + "amb_costs_list", amb_costs_list, allow_pickle=False)
    np.save(file_name + "svo_cost", svo_cost, allow_pickle=False)
    np.save(file_name + "other_svo_cost", other_svo_cost, allow_pickle=False)
    np.save(file_name + "total_svo_cost", total_svo_cost, allow_pickle=False)
    return file_name

def load_costs(file_name):

    car1_costs_list = np.load(file_name + "car1_costs_list.npy", allow_pickle=False)
    amb_costs_list = np.load(file_name + "amb_costs_list.npy", allow_pickle=False)
    svo_cost = np.load(file_name + "svo_cost.npy", allow_pickle=False)
    other_svo_cost = np.load(file_name + "other_svo_cost.npy", allow_pickle=False)
    total_svo_cost = np.load(file_name + "total_svo_cost.npy", allow_pickle=False)


    return car1_costs_list, amb_costs_list, svo_cost, other_svo_cost , total_svo_cost


def load_costs_int(i):

    car1_costs_list = np.load("%03dcar1_costs_list.npy"%i, allow_pickle=False)
    amb_costs_list = np.load("%03damb_costs_list.npy"%i, allow_pickle=False)
    svo_cost = np.load("%03dsvo_cost.npy"%i, allow_pickle=False)
    other_svo_cost = np.load("%03dother_svo_cost.npy"%i, allow_pickle=False)
    total_svo_cost = np.load("%03dtotal_svo_cost.npy"%i, allow_pickle=False)


    return car1_costs_list, amb_costs_list, svo_cost, other_svo_cost , total_svo_cost


    