In [None]:
!pip install gurobipy
!pip install cvxpylayers
!pip install gurobipy

In [2]:
import torch
import gurobipy as gp
from gurobipy import GRB
import cvxpy as cp
from cvxpylayers.torch import CvxpyLayer
import torch 
import torch.nn as nn
import time

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [4]:
import torch 
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import csv

stepNum = 200
hxCoefficient = 1.0
dt = 0.1
leadVelocity = 16
kd = 0.1
stableVelocity = 30
inputBounds = [1.] # r1, r2
inputBoundsMatrix = [torch.tensor([[1.0]])]
n_in, n_h, n_out = 3, 10, 1
uLimit = 1.
learningRate = 0.01
trainingEpoch = 1000
DPChorizon = 100
distCoefficient = 0.0
controlCoefficient = 1.0
velocityCoeffcient = 1.0

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        dt = 0.1
        nnA = np.matrix([[1., dt, 0.], [0., - kd * dt + 1, 0.], [0., - dt, 1.]])
        nnB = np.matrix([[0.], [dt * 2.5], [0.]])
        nnC = np.matrix([[0.], [0.], [leadVelocity * dt]])
        self.nnA = nn.Linear(3,3, bias=False)
        self.nnB = nn.Linear(1,3, bias=False)
        self.nnC = nn.Linear(1,3, bias=False)
        self.stack = nn.Sequential(
            nn.Linear(3, 30),
            nn.Sigmoid(),
            nn.Linear(30, 30),
            nn.Sigmoid(),
            nn.Linear(30, 1),
            nn.Tanh()
        )

        with torch.no_grad():
            self.nnA.weight.copy_(torch.from_numpy(nnA))
            self.nnB.weight.copy_(torch.from_numpy(nnB))
            self.nnC.weight.copy_(torch.from_numpy(nnC))
        
        child_counter = 0
        for child in self.children():
            child_counter += 1
            if child_counter <= 3:
                for param in child.parameters():
                    param.requires_grad = False   

    def forward(self, x):
        next_u = self.stack(x)
        next_x = self.nnA(x) + self.nnB(next_u) #+ self.nnC(torch.eye(1))
        return next_u, next_x

def cbfh(x):
   return (x[2] - 1.8 * x[1]) * hxCoefficient

def Lfh(x):
   return 1.8 * kd * x[1] - x[1] + leadVelocity

def Lgh(x):
   return -1.8 * 2.5

def trajCost(traj, input):
   cost = 0
   for i in range(len(input)):
      cost += ((traj[i][0][1] - stableVelocity) ** 2) * velocityCoeffcient 
      cost += ((traj[i][0][2] - 0.0) ** 2) * distCoefficient
      cost += (5 * input[i] ** 2) * controlCoefficient
   return cost / 100

def DPCcost(next_x, next_u):
    cost = 0
    cbfValues = next_x[:, 2] - 1.8 * next_x[:, 1]
    cbfValues = torch.where(cbfValues < 0.0, cbfValues, torch.tensor(0.0))
    cost += torch.sum((next_x[:, 1] - stableVelocity) ** 2) * velocityCoeffcient 
    cost += torch.sum((next_x[:, 2] - 0.0) ** 2) * distCoefficient
    safeLoss = torch.sum((- cbfValues)) * 100
    cost += safeLoss
    cost += torch.sum(5 * next_u[:, 0] ** 2) * controlCoefficient
    return cost / 100

def DPCcostOne(next_x, next_u):
    cost = 0
    cbfValues = next_x[2] - 1.8 * next_x[1]
    cbfValues = torch.where(cbfValues < 0.0, torch.tensor(cbfValues), torch.tensor(0.0))
    cost += torch.sum((next_x[1] - stableVelocity) ** 2) * velocityCoeffcient 
    cost += torch.sum((next_x[2] - 0.0) ** 2) * distCoefficient
    cost += torch.sum((- cbfValues)) * 1
    cost += torch.sum(5 * next_u[0] ** 2) * controlCoefficient
    return cost / 100

# dynamics evolution
def dynamic(currentX, input):
   dt = 0.1
   A = torch.tensor([[1., dt, 0.], [0., - kd * dt + 1, 0.], [0., - dt, 1.]])
   B = torch.tensor([[0.], [dt * 2.5], [0.]]).reshape((3,1))
   C = torch.tensor([[0.], [0.], [leadVelocity * dt]]).reshape((3,1))
   nextX = A @ currentX   + B @ input + C
   return nextX

# write MPC-generated data to csv
def writedata(position, velocity, distance, inputlist):
    with open('MPCdata.csv', 'a', encoding='UTF8', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(position)
        writer.writerow(velocity)
        writer.writerow(distance)
        writer.writerow(inputlist)
    return

# read existing training data
# return is (position, velocity, distance, inputList)
def readdata():
    dir = '/content/drive/My Drive/Colab Notebooks'
    #plt.savefig(f"{dir}/QP-CBFvalue.png")
    with open(f"{dir}/data.csv", newline='') as f:
        reader = csv.reader(f)
        data = list(reader)
    position, velocity, distance, input = [], [], [], []
    state = [position, velocity, distance, input]
    for rowNum in range(int(len(data) / 4)):
        for i in range(4):
            for j in range(len(data[rowNum * 4 + i])):
                state[i].append(float(data[rowNum * 4 + i][j]))
    return state[0], state[1], state[2], state[3] 

def plotSafeFigure(cbfValue, methodName):
    figurex = np.linspace(0, len(cbfValue), num=len(cbfValue))
    safeLine = np.linspace(0, 0, num=len(cbfValue))
    plt.plot(figurex, cbfValue)
    plt.plot(figurex, safeLine, c = 'r')
    plt.xlabel('time')
    plt.ylabel("CBF Value")
    #plt.savefig("%s-cbfValue.png" % methodName)
    plt.show()

def plotVelocityFigure(velocity, methodName):
    figurex = np.linspace(0, len(velocity), num=len(velocity))
    leadVelocityLine = np.linspace(leadVelocity, leadVelocity, num=len(velocity))
    plt.plot(figurex, velocity)
    plt.plot(figurex, leadVelocityLine, c = 'r')
    plt.xlabel('time')
    plt.ylabel("Velocity")
    #plt.savefig("%s-velocityValue.png" % methodName)
    plt.show()

def plotDistGoalFigure(position_x, position_y, position_z, methodName):
    distanceList = []
    for i in range(len(position_x)):
        distance = (position_x[i]) * (position_x[i]) + (position_y[i]) * (position_y[i]) + (position_z[i]) * (position_z[i])
        distanceList.append(distance)
    figurex = np.linspace(0, 10, num=len(distanceList))
    safeLine = np.linspace(0, 0, num=len(distanceList))
    plt.plot(figurex, distanceList)
    plt.plot(figurex, safeLine, c = 'r')
    plt.xlabel('time')
    plt.ylabel("distance to goal")
    #plt.savefig("%s-DistGoal.png" % methodName)
    plt.show()

def dataPreProcess():
    
    position, velocity, distance, inputList = readdata()
    batchSize = len(velocity)
    assert len(position) == len(velocity)
    assert len(velocity) == len(distance)
    assert len(distance) == len(inputList)

    x = torch.tensor([[position[0], velocity[0], distance[0]]])
    currentX = x.clone()
    for i in range(batchSize - 1):
        currentX = torch.tensor([[position[i + 1], velocity[i + 1], distance[i + 1]]])
        x = torch.cat((x, currentX), 0)
    
    y = torch.tensor([inputList[0]]) / uLimit
    for i in range(batchSize - 1):
        currentY = torch.tensor([inputList[i + 1]]) / uLimit
        y = torch.cat((y, currentY), 0)
    
    return x, y

In [None]:
## NN train

import torch
import time

torch.manual_seed(0)
stepNum = 200
leadVelocity = 16
kd = 0.1
stableVelocity = 30
uLimit = 1.
DPChorizon = 10
# data pre-process
x, y = dataPreProcess()

# NN model
model = NeuralNetwork()
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.01)
epochList, lossList  = [], []


# training
train_start_time = time.time()
data_num = x.shape[0]
print("data num: ", data_num)
#print(type(x))
for epoch in range(2000):
   loss = 0
   current_x = x
   for horizon in range(DPChorizon):
      next_u, next_x = model(current_x)
      loss += DPCcost(current_x, next_u)
      current_x = dynamic(torch.transpose(current_x, 0, 1), torch.transpose(next_u, 0, 1))
      current_x = torch.transpose(current_x, 0, 1)
   loss /= data_num
   if epoch % 10 == 0:
      print('epoch: ', epoch,' loss: ', loss.item())
   out = optimizer.zero_grad()
   loss.backward()
   optimizer.step()
print("total NN training time: ", time.time()-train_start_time)
print("-------------------------------")

In [None]:
# test NN

currentState = torch.tensor([0., 30., 100.]) # initial testing state
prevState = currentState
position, velocity, distance = [], [], []
position.append(currentState[0].item())
velocity.append(currentState[1].item())
distance.append(currentState[2].item())
inputList, stateList = [], []
test_start_time = time.time()

# stepNum is the simulation steps number
# system evolution under NN controller
currentState = currentState.reshape((1,3))
#print(currentState.shape)
for i in range(200):
   input, next_x = model(currentState)
   #print(input.shape)
   #print(currentState.shape)
   inputList.append(input[0].item())
   stateList.append(currentState)
   currentState = dynamic(torch.transpose(currentState, 0, 1), input)
   #print("curr x: ", currentState, currentState.shape)
   position.append(currentState[0].item())
   velocity.append(currentState[1].item())
   distance.append(currentState[2].item())
   currentState = torch.transpose(currentState, 0, 1)
print("total NN solve time --- %s seconds ---" % (time.time() - test_start_time))
print("-------------------------------")


cbfValue = []
for i in range(len(distance)):
    cbfValue.append(distance[i] - 1.8 * velocity[i])
methodName = "NN"
#print("distance: ", distance)
#print("-------------------------------")
#print("velocity: ", velocity)
#print("-------------------------------")
#print("cbfValue: ", cbfValue)
#print("state list: ", stateList)
#print("input: ", inputList)
print("-------------------------------")
print("min cbfValue: ", min(cbfValue))
print("-------------------------------")
print("traj cost: ", trajCost(stateList, inputList))
plotSafeFigure(cbfValue, methodName)
plotVelocityFigure(velocity, methodName)

In [None]:
# test NN-QP

def project(input, state):

   QP_start_time = time.time()
   
   # cbfLfh + cbfLgh * u >= - cbfhx
   cbfhx = cbfh(state.reshape((3)))
   cbfLfh = Lfh(state.reshape((3)))
   cbfLgh = Lgh(state.reshape((3)))
   input = input.detach().numpy()

   m = gp.Model("QP")
   m.setParam('OutputFlag', 0)
   m.params.NonConvex = 2
   
   # control limit
   umin = np.array([-uLimit])
   umax = np.array([uLimit])
   umin = np.tile(umin, (1))
   umax = np.tile(umax, (1))
   
   # constraints
   u = m.addMVar(shape=(1,1), lb=umin, ub=umax, name='u')
   m.addConstr(- cbfhx - cbfLfh <= u[:, 0] * cbfLgh) # CBF constraint

   obj = (u[0, 0] - input[0]) * (u[0, 0] - input[0]) # QP objective
   m.setObjective(obj, GRB.MINIMIZE)

   m.optimize()

   uModified = u.getAttr('x')[0]

   #print("QP time: ", time.time() - QP_start_time)
   #print("--------------------------")

   return uModified

currentState = torch.tensor([0., 35., 100.]) # initial state
prevState = currentState
position, velocity, distance = [], [], []
position.append(currentState[0].item())
velocity.append(currentState[1].item())
distance.append(currentState[2].item())
inputList, stateList = [], []
test_start_time = time.time()



currentState = currentState.reshape((1,3))
#print(currentState.shape)
for i in range(200):
   input, next_x = model(currentState)
   #print(input.shape)
   #print(currentState.shape)
   input = torch.tensor(project(input, currentState)).float()
   inputList.append(input[0].item())
   stateList.append(currentState)
   #print("input: ", input, input.shape)
   currentState = dynamic(torch.transpose(currentState, 0, 1), input.reshape((1,1)))
   #print("curr x: ", currentState, currentState.shape)
   position.append(currentState[0, 0].item())
   velocity.append(currentState[1, 0].item())
   distance.append(currentState[2, 0].item())
   currentState = torch.transpose(currentState, 0, 1)
print("total NN solve time --- %s seconds ---" % (time.time() - test_start_time))
print("-------------------------------")

# collect CBF values
cbfValue = []
for i in range(len(distance)):
    cbfValue.append(distance[i] - 1.8 * velocity[i])

methodName = "NN-CBF-QP"
#print("cbfValue: ", cbfValue)
#print("--------------------------")
#print("velocity: ", velocity)
#print("--------------------------")
print("min cbfValue: ", min(cbfValue))
print("--------------------------")
print("traj cost: ", trajCost(stateList, inputList))
plotSafeFigure(cbfValue, methodName)
plotVelocityFigure(velocity, methodName)

In [None]:
torch.manual_seed(0)

def project(input, state):

   QP_start_time = time.time()
   
   # cbfLfh + cbfLgh * u >= - cbfhx
   #print(state.reshape((3)).shape)
   cbfhx = cbfh(state.reshape((3)))
   cbfLfh = Lfh(state.reshape((3)))
   cbfLgh = Lgh(state.reshape((3)))
   input = input.detach().numpy()

   m = gp.Model("QP")
   m.setParam('OutputFlag', 0)
   m.params.NonConvex = 2
   
   # control limit
   umin = np.array([-uLimit])
   umax = np.array([uLimit])
   umin = np.tile(umin, (1))
   umax = np.tile(umax, (1))
   
   # constraints
   u = m.addMVar(shape=(1,1), lb=umin, ub=umax, name='u')
   m.addConstr(- cbfhx - cbfLfh <= u[:, 0] * cbfLgh) # CBF constraint

   obj = (u[0, 0] - input[0]) * (u[0, 0] - input[0]) # QP objective
   m.setObjective(obj, GRB.MINIMIZE)

   m.optimize()

   uModified = u.getAttr('x')[0]

   #print("QP time: ", time.time() - QP_start_time)
   #print("--------------------------")

   return uModified




# data pre-process
x, y = dataPreProcess()

# initialize NN model
model = NeuralNetwork()
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learningRate)
epochList, lossList = [], []

trainingDataNum = y.shape[0]
print("training data num: ", trainingDataNum)
## trainF * u <= trainG
## trainF = -Lgh, trainG = Lfh + cbfh
trainF = torch.rand(trainingDataNum) # initialize 
trainG = torch.rand(trainingDataNum) # initialize 
for i in range(trainingDataNum):
    cbfF = - Lgh(x[i,:])
    cbfg = Lfh(x[i,:]) + cbfh(x[i,:])
    trainF[i] = cbfF
    trainG[i] = cbfg.item()

## Differentiabl QP setup
u_dim = 1
u_mod = cp.Variable(u_dim+1)
u_init = cp.Parameter(u_dim)
cbf_f = cp.Parameter(u_dim)
cbf_g = cp.Parameter(1)
unsafe_penalty = 1000.0
constraints = [u_mod[0] >= -1., u_mod[0] <= 1., cbf_f @ u_mod[0:1] <= cbf_g + u_mod[1]] # control input bound and CBF constraint
objective = cp.Minimize(cp.pnorm(u_mod[0] - u_init, p=2) + unsafe_penalty * u_mod[1]) # minimize the QP
problem = cp.Problem(objective, constraints)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learningRate)
cvxpylayer = CvxpyLayer(problem, parameters=[u_init, cbf_f, cbf_g], variables=[u_mod])

data_num = x.shape[0]
## training
train_start_time = time.time()
for epoch in range(70):
    loss = 0
    current_x = x
    for horizon in range(50):
        next_u, next_x = model(current_x)

        trainF = torch.randn(trainingDataNum)
        trainG = torch.randn(trainingDataNum)
        trainF[:] = 1.8 * 2.5
        #print(trainF)
        trainG[:] = (current_x[:, 2] - 1.8 * current_x[:, 1]) * hxCoefficient + 1.8 * kd * current_x[:, 1] - current_x[:, 1] + leadVelocity
        #print(trainG)
        # cvxpylayer does not support batch computation, so we must do it one by one
        for each_data_index in range(trainingDataNum):
            solution, = cvxpylayer(next_u[each_data_index, :], trainF[each_data_index].unsqueeze(0), trainG[each_data_index].unsqueeze(0))
            loss += DPCcostOne(current_x[each_data_index, :], solution)
        current_x = dynamic(torch.transpose(current_x, 0, 1), torch.transpose(next_u, 0, 1))
        current_x = torch.transpose(current_x, 0, 1)
    loss /= data_num
    #if epoch % 10 == 0:
    print('epoch: ', epoch,' loss: ', loss.item())
    out = optimizer.zero_grad()
    loss.backward()
    optimizer.step()
print("training time: ", time.time()-train_start_time)
print("--------------------------")


In [None]:
# NN-diff-QP testing
currentState = torch.tensor([0., 30., 130.]) # initial state
position, velocity, distance = [], [], []
position.append(currentState[0].item())
velocity.append(currentState[1].item())
distance.append(currentState[2].item())
inputList = []
stateList = []
test_start_time = time.time()
currentState = currentState.reshape((1,3))
#print(currentState.shape)
for i in range(200):
   input, next_x = model(currentState)
   #print(input.shape)
   #print(currentState.shape)
   input = torch.tensor(project(input, currentState)).float()
   inputList.append(input[0].item())
   stateList.append(currentState)
   #print("input: ", input, input.shape)
   currentState = dynamic(torch.transpose(currentState, 0, 1), input.reshape((1,1)))
   #print("curr x: ", currentState, currentState.shape)
   position.append(currentState[0, 0].item())
   velocity.append(currentState[1, 0].item())
   distance.append(currentState[2, 0].item())
   currentState = torch.transpose(currentState, 0, 1)
print("total NN solve time --- %s seconds ---" % (time.time() - test_start_time))
print("-------------------------------")

# collect CBF values
cbfValue = []
for i in range(len(distance)):
    cbfValue.append(distance[i] - 1.8 * velocity[i])

methodName = "NN-Diff-CBF-QP"
#print("distance: ", distance)
#print("-------------------------------")
#print("velocity: ", velocity)
#print("-------------------------------")
#print("cbfValue: ", cbfValue)
#print("-------------------------------")
print("min cbfValue: ", min(cbfValue))
print("-------------------------------")
print("traj cost: ", trajCost(stateList, inputList))
plotSafeFigure(cbfValue, methodName)
plotVelocityFigure(velocity, methodName)

In [None]:
def chebyshevCenter(x):
   ## Differentiabl LP setup
   u_dim = 2
   center_delta = cp.Variable(u_dim+1)
   cbf_f = cp.Parameter(u_dim)
   cbf_g = cp.Parameter(1)
   unsafe_penalty = 1000.0
   constraints = [center_delta[1] >= 0.0, center_delta[0] + center_delta[1] <= 1.0, - center_delta[0] + center_delta[1] <= 1.0, - cbf_f @ center_delta[0:2] <= cbf_g + center_delta[2]] # control input bound and CBF constraint
   objective = cp.Minimize(- center_delta[1] + center_regularizer * center_delta[0] ** 2 + unsafe_penalty * center_delta[2]) # maximize the radius
   problem = cp.Problem(objective, constraints)
   cvxpylayer = CvxpyLayer(problem, parameters=[cbf_f, cbf_g], variables=[center_delta])

   cbfhx = torch.tensor(cbfh(x)).reshape((1,1))
   cbfLfh = torch.tensor(Lfh(x)).reshape((1,1))
   cbfLgh = torch.tensor(Lgh(x)).reshape((1,1))
   cbf_f_square = cbfLgh ** 2
   cbfLgh_new = torch.cat((cbfLgh, cbf_f_square), 1)
   solution , = cvxpylayer(cbfLgh_new, cbfhx + cbfLfh)
   print("solution: ", solution)
   
   control_center = solution[0, 0].reshape((1,1))
   print(control_center)
   return control_center, cbfhx, cbfLfh, cbfLgh

In [None]:
# train NN-Gauge

import torch
import numpy as np
import time
import gurobipy as gp
from gurobipy import GRB
import cvxpy as cp 
from cvxpylayers.torch import CvxpyLayer
torch.manual_seed(0)

center_regularizer = 0.0001

def chebyshevCenter(x):
   ## Differentiabl LP setup
   u_dim = 2
   center = cp.Variable(u_dim)
   cbf_f = cp.Parameter(u_dim)
   cbf_g = cp.Parameter(1)
   unsafe_penalty = 1000
   constraints = [center[1] >= 0.0, center[0] + center[1] <= 1.0, - center[0] + center[1] <= 1.0, - cbf_f @ center <= cbf_g] # control input bound and CBF constraint
   objective = cp.Minimize(- center[1] + center_regularizer * center[0] ** 2) # maximize the radius
   problem = cp.Problem(objective, constraints)
   cvxpylayer = CvxpyLayer(problem, parameters=[cbf_f, cbf_g], variables=[center])

   cbfhx = torch.tensor(cbfh(x)).reshape((1,1))
   cbfLfh = torch.tensor(Lfh(x)).reshape((1,1))
   cbfLgh = torch.tensor(Lgh(x)).reshape((1,1))
   cbf_f_square = cbfLgh ** 2
   cbfLgh_new = torch.cat((cbfLgh, cbf_f_square), 1)
   solution , = cvxpylayer(cbfLgh_new, cbfhx + cbfLfh)
   print("solution: ", solution)
   raise ValueError("stop here")
   control_center = solution[0, 0].reshape((1,1))
   
   return control_center, cbfhx, cbfLfh, cbfLgh


def chebyshevCenterTest(x):
   
   # cbfLfh + cbfLgh * u >= - cbfhx
   cbfhx = cbfh(x)
   cbfLfh = Lfh(x)
   cbfLgh = Lgh(x)

   m = gp.Model("LP")
   m.setParam('OutputFlag', 0)
   m.params.NonConvex = 2

   R = m.addVars(1, name="R") # Chebyshev radius
   delta = m.addVars(1, name="delta")
   unsafe_penalty = 1000
   # control limit
   umin = np.array([-uLimit])
   umax = np.array([uLimit])
   umin = np.tile(umin, (1))
   umax = np.tile(umax, (1))

   # contraints
   u = m.addMVar(shape=(1,1), lb=umin, ub=umax, name='u')
   m.addConstr(- R[0] <= 0) # radius >= 0
   m.addConstr(- u[0, 0] * cbfLgh + cbfLgh ** 2 * R[0] <= cbfhx + cbfLfh + delta[0]) # CBF constraint adapted to Chebyshev
   m.addConstr(u[0, 0] + R[0] <= uLimit) # control limit constraint adapted to Chebyshev
   m.addConstr(- u[0, 0] + R[0] <= uLimit) # control limit constraint adapted to Chebyshev

   # maximize radius
   m.setObjective(-R[0] + center_regularizer * u[0, 0] ** 2 + unsafe_penalty * delta[0], GRB.MINIMIZE)

   m.optimize()

   # get optimized result
   control = u.getAttr("x")
   control = torch.tensor([[control[0][0]]])

   return control, cbfhx, cbfLfh, cbfLgh

## gauge map used for training
def gaugeMap(v, F, g, uCenter):
   maxv = torch.max(abs(v), 1)[0].unsqueeze(1)
   result = maxv / (gaugeFunction(v, F, g, uCenter)) * v
   return result

def gaugeFunction(v, F, g, uCenter):
   vF = torch.sum(v * F, 1).unsqueeze(1)
   gSub = torch.sum(uCenter.float() * F, 1).unsqueeze(1)
   result = vF / (torch.transpose(g, 0, 1) - gSub)
   for i in range(len(inputBoundsMatrix)): # take the input constraints inequality into consideration
      inputConstraint = v @ inputBoundsMatrix[i]
      constraint = torch.max(inputConstraint / (inputBounds[i] - uCenter.float() @ inputBoundsMatrix[i]), - inputConstraint / inputBounds[i] - uCenter.float() @ inputBoundsMatrix[i])
      result = torch.max(constraint, result)
   return result 

## gauge map used for testing
## (mostly the same with the above gauge for training, only F and g dim differences)
def gaugeMapPred(v, F, g, uCenter):
   maxv = torch.max(abs(v))
   result = maxv / (gaugeFunctionPred(v, F, g, uCenter)) * v
   '''
   if F * (result + uCenter) > g:
      raise ValueError("CBF violation!")
   '''
   return result

def gaugeFunctionPred(v, F, g, uCenter):
   vF = torch.sum(v * F, 0)
   uCenter = torch.transpose(uCenter, 0, 1)
   #print("u center: ", uCenter, uCenter.shape)
   #print(F, F.shape)
   gSub = (uCenter * F).item()
   result = vF / (g - gSub)
   for i in range(len(inputBoundsMatrix)): # take the input constraints inequality into consideration
      inputConstraint = v @ inputBoundsMatrix[i]
      constraint1 = inputConstraint / (inputBounds[i] - uCenter.float() @ inputBoundsMatrix[i])
      constraint2 = - inputConstraint / (inputBounds[i] - uCenter.float() @ inputBoundsMatrix[i])
      constraint = torch.max(constraint1, constraint2)
      result = torch.max(constraint, result)
   return result 

# calculate a series of centers for a batch of data
# used for calculate the chebyshev centers list for the whole training data
# return is the all centers.
def chebyCenterList(x):
   uValue,_ ,_ ,_ = chebyshevCenterTest([x[0][0], x[1][0], x[2][0]])
   for i in range(len(x[0]) - 1):
      newUValue,_,_,_ = chebyshevCenterTest([x[0][i+1], x[1][i+1], x[2][i+1]])
      uValue = torch.cat((uValue, newUValue), 1)
   return torch.transpose(uValue, 0, 1)

def chebyCenterListHorizon(x):
   uValue,_ ,_ ,_ = chebyshevCenterTest([x[0, 0], x[0, 1], x[0, 2]])
   for i in range(x.shape[0] - 1):
      newUValue,_,_,_ = chebyshevCenterTest([x[i+1, 0], x[i+1, 1], x[i+1, 2]])
      uValue = torch.cat((uValue, newUValue), 1)
   return torch.transpose(uValue, 0, 1)


def cbfh(x):
   return (x[2] - 1.8 * x[1]) * hxCoefficient

def Lfh(x):
   return 1.8 * kd * x[1] - x[1] + leadVelocity

def Lgh(x):
   return -1.8 * 2.5


In [None]:
# train NN-gauge
  
position, velocity, distance, inputList = readdata()
allState = [position, velocity, distance]

# data pre-process
x, y = dataPreProcess()

uCenter = chebyCenterList(allState)
model = NeuralNetwork()
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learningRate)
#print()
data_num = x.shape[0]

## training
train_start_time = time.time()
for epoch in range(10):
  loss = 0
  current_x = x
  for horizon in range(DPChorizon):
      F = torch.randn(1, len(position))
      g = torch.randn(1, len(position))
      F[0, :] = 1.8 * 2.5
      g[0, :] = (current_x[:, 2] - 1.8 * current_x[:, 1]) * hxCoefficient + 1.8 * kd * current_x[:, 1] - current_x[:, 1] + leadVelocity
      uCenter = chebyCenterListHorizon(current_x)
      #print("center: ", uCenter)
      #print("------------------------------")
      next_u, next_x = model(current_x)
      next_u = gaugeMap(next_u, F, g, uCenter) + uCenter
      loss += DPCcost(current_x, next_u)
      current_x = dynamic(torch.transpose(current_x, 0, 1).float(), torch.transpose(next_u, 0, 1).float())
      current_x = torch.transpose(current_x, 0, 1)
  loss /= data_num
  #if epoch % 10 == 0:
  print('epoch: ', epoch,' loss: ', loss.item())
  out = optimizer.zero_grad()
  loss.backward()
  optimizer.step()

print("training time: ", time.time()-train_start_time)
print("--------------------------")
   

In [None]:

# testing


currentState = torch.tensor([0., 25., 100.]).unsqueeze(1)
position, velocity, distance = [], [], []
position.append(currentState[0].item())
velocity.append(currentState[1].item())
distance.append(currentState[2].item())
inputList, stateList = [], []
test_start_time = time.time()
currentState = torch.transpose(currentState, 0, 1)
for i in range(200):
  input, next_x = model(currentState)
  yCen, cbfhx, cbfLfh, cbfLgh = chebyshevCenterTest(currentState.reshape((3)))
  cbfF = - cbfLgh
  cbfg = cbfhx + cbfLfh
  input = gaugeMapPred(input, cbfF, cbfg, yCen)[0] + torch.transpose(yCen.float(), 0, 1)[0]
  inputList.append(input[0].item())
  stateList.append(currentState)
  currentState = dynamic(torch.transpose(currentState, 0, 1), input.reshape((1,1)))
  position.append(currentState[0, 0].item())
  velocity.append(currentState[1, 0].item())
  distance.append(currentState[2, 0].item())
  currentState = torch.transpose(currentState, 0, 1)
print("total NN solve time --- %s seconds ---" % (time.time() - test_start_time))
print("-------------------------------")

cbfValue = []
for i in range(len(distance)):
  cbfValue.append(distance[i] - 1.8 * velocity[i])
methodName = "NN-CBF-Gauge"
#print("distance: ", distance)
#print("-------------------------------")
#print("velocity: ", velocity)
#print("-------------------------------")
#print("cbfValue: ", cbfValue)
#print("-------------------------------")
print("min cbfValue: ", min(cbfValue))
print("-------------------------------")
print("traj cost: ", trajCost(stateList, inputList))
plotSafeFigure(cbfValue, methodName)
plotVelocityFigure(velocity, methodName)

In [None]:
# testing \pi_cen


currentState = torch.tensor([0., 30., 100.]).unsqueeze(1)
position, velocity, distance = [], [], []
position.append(currentState[0].item())
velocity.append(currentState[1].item())
distance.append(currentState[2].item())
inputList, stateList = [], []
test_start_time = time.time()
currentState = torch.transpose(currentState, 0, 1)
for i in range(200):
  yCen, cbfhx, cbfLfh, cbfLgh = chebyshevCenterTest(currentState.reshape((3)))
  input = yCen.float()
  inputList.append(input[0].item())
  stateList.append(currentState)
  currentState = dynamic(torch.transpose(currentState, 0, 1), input.reshape((1,1)))
  position.append(currentState[0, 0].item())
  velocity.append(currentState[1, 0].item())
  distance.append(currentState[2, 0].item())
  currentState = torch.transpose(currentState, 0, 1)
print("total NN solve time --- %s seconds ---" % (time.time() - test_start_time))
print("-------------------------------")

cbfValue = []
for i in range(len(distance)):
  cbfValue.append(distance[i] - 1.8 * velocity[i])
methodName = "center"
#print("distance: ", distance)
#print("-------------------------------")
#print("velocity: ", velocity)
#print("-------------------------------")
#print("cbfValue: ", cbfValue)
#print("-------------------------------")
print("min cbfValue: ", min(cbfValue))
print("-------------------------------")
print("traj cost: ", trajCost(stateList, inputList))
plotSafeFigure(cbfValue, methodName)
plotVelocityFigure(velocity, methodName)

In [None]:
import torch 
import numpy as np
import time
import gurobipy as gp
from gurobipy import GRB
import cvxpy as cp 
from cvxpylayers.torch import CvxpyLayer

torch.manual_seed(0)

def project(input, state):

   QP_start_time = time.time()
   
   # cbfLfh + cbfLgh * u >= - cbfhx
   #print(state.reshape((3)).shape)
   cbfhx = cbfh(state.reshape((3)))
   cbfLfh = Lfh(state.reshape((3)))
   cbfLgh = Lgh(state.reshape((3)))
   input = input.detach().numpy()

   m = gp.Model("QP")
   m.setParam('OutputFlag', 0)
   m.params.NonConvex = 2
   
   # control limit
   umin = np.array([-uLimit])
   umax = np.array([uLimit])
   umin = np.tile(umin, (1))
   umax = np.tile(umax, (1))
   
   # constraints
   u = m.addMVar(shape=(1,1), lb=umin, ub=umax, name='u')
   m.addConstr(- cbfhx - cbfLfh <= u[:, 0] * cbfLgh) # CBF constraint

   obj = (u[0, 0] - input[0]) * (u[0, 0] - input[0]) # QP objective
   m.setObjective(obj, GRB.MINIMIZE)

   m.optimize()

   uModified = u.getAttr('x')[0]

   #print("QP time: ", time.time() - QP_start_time)
   #print("--------------------------")

   return uModified


# data pre-process
x, y = dataPreProcess()

# initialize NN model
model = NeuralNetwork()
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learningRate)
epochList, lossList = [], []

trainingDataNum = y.shape[0]
## trainF * u <= trainG
## trainF = -Lgh, trainG = Lfh + cbfh
trainF = torch.rand(trainingDataNum) # initialize 
trainG = torch.rand(trainingDataNum) # initialize 
for i in range(trainingDataNum):
    cbfF = - Lgh(x[i,:])
    cbfg = Lfh(x[i,:]) + cbfh(x[i,:])
    trainF[i] = cbfF
    trainG[i] = cbfg.item()

## Differentiabl QP setup
u_dim = 1
u_mod = cp.Variable(u_dim)
u_init = cp.Parameter(u_dim)
cbf_f = cp.Parameter(u_dim)
cbf_g = cp.Parameter(1)
constraints = [u_mod >= -1., u_mod <= 1., cbf_f @ u_mod <= cbf_g] # control input bound and CBF constraint
objective = cp.Minimize(cp.pnorm(u_mod - u_init, p=2)) # minimize the QP
problem = cp.Problem(objective, constraints)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learningRate)
cvxpylayer = CvxpyLayer(problem, parameters=[u_init, cbf_f, cbf_g], variables=[u_mod])

data_num = x.shape[0]
## training
train_start_time = time.time()
for epoch in range(50):
    loss = 0
    current_x = x
    for horizon in range(DPChorizon):
        next_u, next_x = model(current_x)
        # cvxpylayer does not support batch computation, so we must do it one by one
        for each_data_index in range(trainF.shape[0]):
            solution, = cvxpylayer(next_u[each_data_index, :], trainF[each_data_index].unsqueeze(0), trainG[each_data_index].unsqueeze(0))
            loss += DPCcostOne(current_x[each_data_index, :], solution)
        current_x = dynamic(torch.transpose(current_x, 0, 1), torch.transpose(next_u, 0, 1))
        current_x = torch.transpose(current_x, 0, 1)
    loss /= data_num
    #if epoch % 10 == 0:
    print('epoch: ', epoch,' loss: ', loss.item())
    out = optimizer.zero_grad()
    loss.backward()
    optimizer.step()
print("training time: ", time.time()-train_start_time)
print("--------------------------")





In [None]:
# testing
currentState = torch.tensor([0., 30., 100.]) # initial state
position, velocity, distance = [], [], []
position.append(currentState[0].item())
velocity.append(currentState[1].item())
distance.append(currentState[2].item())
inputList = []
stateList = []
test_start_time = time.time()
currentState = currentState.reshape((1,3))
#print(currentState.shape)
for i in range(200):
   input, next_x = model(currentState)
   #print(input.shape)
   #print(currentState.shape)
   input = torch.tensor(project(input, currentState)).float()
   inputList.append(input[0].item())
   stateList.append(currentState)
   #print("input: ", input, input.shape)
   currentState = dynamic(torch.transpose(currentState, 0, 1), input.reshape((1,1)))
   #print("curr x: ", currentState, currentState.shape)
   position.append(currentState[0, 0].item())
   velocity.append(currentState[1, 0].item())
   distance.append(currentState[2, 0].item())
   currentState = torch.transpose(currentState, 0, 1)
print("total NN solve time --- %s seconds ---" % (time.time() - test_start_time))
print("-------------------------------")

# collect CBF values
cbfValue = []
for i in range(len(distance)):
    cbfValue.append(distance[i] - 1.8 * velocity[i])

methodName = "NN-Diff-CBF-QP"
#print("distance: ", distance)
#print("-------------------------------")
#print("velocity: ", velocity)
#print("-------------------------------")
#print("cbfValue: ", cbfValue)
#print("-------------------------------")
print("min cbfValue: ", min(cbfValue))
print("-------------------------------")
print("traj cost: ", trajCost(stateList, inputList))
plotSafeFigure(cbfValue, methodName)
plotVelocityFigure(velocity, methodName)