## Model Class
The class below is the model for any system. A parameter dictionary should be provided to define the system.

In [46]:
import math
import cvxpy
import numpy as np
import pandas as pd
from scipy import sparse
import matplotlib.pyplot as plt
from numpy.linalg import matrix_power


from qpsolvers import solve_qp
from scipy.integrate import solve_ivp
import quadprog
import sys
np.set_printoptions(threshold=sys.maxsize)
from sklearn.pipeline import Pipeline
from datafold.dynfold.transform import TSCRadialBasis
from datafold.pcfold import GaussianKernel, TSCDataFrame

In [2]:
def pendulum_dynamics_single(y, u):
    g = 9.8 
    L = 1.5 # Length of pendulum
    m = 1.0 #mass of bob (kg)
    M = 5.0  # mass of cart (kg)
    d1 = 1.0
    d2 = 0.5

    x_ddot = u - m*L*y[3]*y[3] * np.cos(y[2] ) + m*g*np.cos(y[2]) *  np.sin(y[2])
    x_ddot = x_ddot / ( M+m-m* np.sin(y[2])* np.sin(y[2]) )
    theta_ddot = -g/L * np.cos(y[2] ) -  np.sin( y[2]) / L*x_ddot

    damping_x =  - d1*y[1]
    damping_theta =  - d2*y[3]

    x_next = [y[1], x_ddot + damping_x, y[3], theta_ddot + damping_theta]

    return np.array(x_next)

In [3]:
def pendulum_dynamics(t, y):
    g = 9.8 
    L = 1.5 # Length of pendulum
    m = 1.0 #mass of bob (kg)
    M = 5.0  # mass of cart (kg)
    d1 = 1.0
    d2 = 0.5

    u = np.random.uniform(-4,4,1)[0]

    x_ddot = u - m*L*y[3]*y[3] * np.cos(y[2] ) + m*g*np.cos(y[2]) *  np.sin(y[2])
    x_ddot = x_ddot / ( M+m-m* np.sin(y[2])* np.sin(y[2]) )
    theta_ddot = -g/L * np.cos(y[2] ) -  np.sin( y[2]) / L*x_ddot

    damping_x =  - d1*y[1]
    damping_theta =  - d2*y[3]

    x_next = [y[1], x_ddot + damping_x, y[3], theta_ddot + damping_theta, u]

    return np.array(x_next)


params = {
    "Ntraj": 10, 
    "Nsims": 100,
    "dt": 0.1,
    "Nlift": 20, 
    "nstates": 4, 
    "mcontrolUnits": 1,
    "state_bounds": [-2,2],
    "input_bounds": [-12,12],
    "dynamics": pendulum_dynamics,
    "C": np.array([[1, 0, 0, 0], [0, 0, 1, 0]]),
    "basis_type": "radial",
    "Tmax": 5
}

In [4]:
class data:
    def __init__(self):
        self.x = []
        self.y = []
        self.u = []
    
    def plot_data_traj(self, traj=0, state= 0):
        for _id, df in self.x.itertimeseries():
            if(_id == traj):
                plt.plot(df.index.to_numpy(), df[state].to_numpy())
                plt.show()
    def return_traj_x(self, traj, state=0):
        for _id, df in self.x.itertimeseries():
            if(_id == traj):
                return df[state].to_numpy()
    def return_traj_y(self, traj, state=0):
        for _id, df in self.y.itertimeseries():
            if(_id == traj):
                return df[state].to_numpy()
    def return_traj_u(self, traj, state=0):
        for _id, df in self.u.itertimeseries():
            if(_id == traj):
                return df[state].to_numpy().reshape(-1,1).T
                
        
        
class cost:
    def __init__(self):
        self.A = []
        self.B = []
        self.C = []
        self.Q = []
        self.R = []
        self.H = []
        self.D = []
        self.G = []
        
    def print_shapes(self):  
        print("----------- cost --------------")
        print("A :", self.A.shape)
        print("B :", self.B.shape)
        print("C :", self.C.shape)
        print("Q :", self.Q.shape)
        print("R :", self.R.shape)
        print("H :", self.H.shape)
        print("D :", self.D.shape)
        print("G :", self.G.shape)
        
class constraints:
    def __init__(self):
        self.F = []
        self.E = []
        self.c = []
        self.L = []
        self.M = []

        
    def print_shapes(self): 
        print("----------- constraints --------------")
        print("F :", self.F.shape)
        print("E :", self.E.shape)
        print("c :", self.c.shape)
        print("L :", self.L.shape)
        print("M :", self.M.shape)



In [5]:
class sysModel:
    """
        Initialize the system with parameter dictionary.
        After initialization, the model will generate A, B and C matrices.
    """
    def __init__(self, params):
        self.Ntraj = params["Ntraj"] # k or number of trajectories
        self.Nsims = params["Nsims"]  # number of items per state
        self.dt = params["dt"] # time difference
        self.n = params["nstates"] # number of states
        self.m = params["mcontrolUnits"] # number of control inputs
        self.Nlift = params["Nlift"]  # number of centers, or basis functions
        self.state_bounds = params["state_bounds"] # interval for which states are uniformly randomly picked
        self.input_bounds = params["input_bounds"] # interval for which inputs are uniformly randomly picked
        self.dynamics = params["dynamics"] # dynamics equation for the system
        self.C_select = params["C"] # which outputs are relevant
        self.basis_type = params["basis_type"] # which basis function to use
        self.data = data()
        
        self.Tmax = params["Tmax"]
        self.tspan =  np.linspace(0, self.dt*self.Nsims+1, self.Nsims+1)
        
        # initial_conditions is [x1, x2, x3, x4,... u]
        initlst = []
        for ns in range(self.n):
            initlst.append(np.random.uniform(self.state_bounds[0], self.state_bounds[1], self.Ntraj))
        for ms in range(self.m):
            initlst.append(np.random.uniform(self.input_bounds[0], self.input_bounds[1], self.Ntraj))
        self.initial_conditions = np.array(initlst).T.reshape(-1, len(initlst))
        
        # list of randomly generated centers for the basis function
#         centlst = []
#         for cs in range(self.n):
#             centlst.append(np.random.uniform(self.state_bounds[0], self.state_bounds[1], self.Nlift))
#         self.centers = np.array(centlst).T.reshape(-1, len(centlst))
        
        
        # generate data
        self.data.x, self.data.y, self.data.u = self.generate_data()
        
        xs = self.data.x.to_numpy()
        indices = np.random.choice(xs.shape[0], self.Nlift, replace=False) 
        self.centers = xs[indices, :]
        
        
        # Final A, B, C matrices required for control
        self.A, self.B, self.C, self.xlift, self.ylift, self.x_data, self.y_data, self.u_data = self.compute_ABC()
        if(self.m == 1):
            self.B = np.expand_dims(self.B, axis=1)

    """
        Currently rbf basis is supported where centers are randomly allocated
    """
    def selectBasis(self):
        dict_step = [("rbf", TSCRadialBasis(kernel=GaussianKernel(epsilon=0.17), center_type="fit_params"))]
        return dict_step
    
    def simulate(self):
        Nsims = int(self.Tmax/self.dt)
        xtrue = np.array([ 3.0, 1, math.pi/2-0.15, 0.15 ]).reshape(-1,1)
        xlift = self.computeBasis(xtrue.T).T
        u_dt = self.data.return_traj_u(traj=2)
        for i in range(0, Nsims):
            ax = self.A@xlift[:, -1]
            bu = self.B@u_dt[:,i]
            xlift_temp = ax+bu
            # Koopman predictor

            xlift = np.append(xlift,xlift_temp.reshape(-1,1) ,1)

            # True dynamics
            x_true_tepm = pendulum_dynamics_single(xtrue[:, -1].reshape(-1,1), u_dt[:,i])
            xtrue = np.append(xtrue, x_true_tepm, 1)

        return xlift, xtrue
    
    def plotSimulate(self):
        Nsims = int(self.Tmax/self.dt)
        xl, xt = self.simulate()
        xkoop = self.C@xl
        t = np.arange(0,Nsims+1)
        plt.plot(t, xkoop, '-r')
        plt.plot(t, xt, 'b')
        
    
    def computeBasis(self, arr):
        dict_step = self.selectBasis()
        pipe = Pipeline(dict_step)
        pipe.fit(arr, rbf__centers = self.centers)
        Xtransformed = pipe.transform(arr)
        return Xtransformed
    """
        Returns the A,B,C matrices after lifting the data generated by solving the dynamics.
    """
    def compute_ABC(self):
        x_data, y_data, u_data = self.data.x.to_numpy(), self.data.y.to_numpy(), self.data.u.to_numpy()
        
    
        dict_step = self.selectBasis()
        pipe = Pipeline(dict_step)
        pipe.fit(x_data, rbf__centers = self.centers)
        Xtransformed = pipe.transform(x_data)
        
        ncc = self.centers@self.C_select.T
        pipe.fit(y_data, rbf__centers=ncc)
        Ytransformed = pipe.transform(y_data)
        
        CT = np.linalg.lstsq(Xtransformed, x_data, rcond=None)[0]
        
        xlift = Xtransformed.T
        ylift = Ytransformed.T
        u_1 = u_data.T
        
        Xliftcombined = np.concatenate([xlift, u_1])
        
        G = Xliftcombined@Xliftcombined.T
        V = ylift@(Xliftcombined.T)
        
        AB = np.linalg.lstsq(G.T, V.T, rcond=None)[0]
        AB_val = AB.T
        
        A = AB_val[:, :self.Nlift]
        B = AB_val[:, self.Nlift]
        C = CT.T
        
        return A, B, C, xlift, ylift, x_data, y_data, u_data
        
    """
        Generates the data from the given dynamics and time span.
    """
    def generate_data(self):
        npts, _ = self.initial_conditions.shape
        
        states_control = self.m + self.n

        assert self.initial_conditions.shape[1] == states_control

        xtime_series_dfs = []
        ytime_series_dfs = []
        utime_series_dfs = []

        for ic in self.initial_conditions:

            solution = solve_ivp(self.dynamics, t_span=(self.tspan[0], self.tspan[-1]), y0=ic, t_eval=self.tspan)

            data_u_values = solution["y"][self.n]
            data_values = solution["y"][:self.n]

            mdim, ndim = data_values.shape

            x_data = data_values[:, 0:ndim-1]
            sub_y = data_values[:, 1:ndim]
            u_data = data_u_values[0:ndim-1]

            y_data = self.C_select@sub_y
            intermidiate_time = solution["t"]
            time_x = intermidiate_time[0:ndim-1]

            solution = pd.DataFrame(data=x_data.T, index=time_x)
            y_values_df = pd.DataFrame(data=y_data.T, index=time_x)
            u_values_df = pd.DataFrame(data=u_data.T, index=time_x)

            utime_series_dfs.append(u_values_df)
            xtime_series_dfs.append(solution)
            ytime_series_dfs.append(y_values_df)

        return TSCDataFrame.from_frame_list(xtime_series_dfs), TSCDataFrame.from_frame_list(ytime_series_dfs), TSCDataFrame.from_frame_list(utime_series_dfs)
    
    def print_shapes(self):
        print("----------- Model --------------")
        print("states: {0}      control inputs: {1}".format(self.n, self.m))
        print("No of data points: ", self.Nsims)
        print("No of trajectories: ", self.Ntraj)
        print("No of basis/lift: ", self.Nlift)
        print("x_data :", self.x_data.shape)
        print("y_data :", self.y_data.shape)
        print("u_data :", self.u_data.shape)
        print("A :", self.A.shape)
        print("B :", self.B.shape)
        print("C :", self.C.shape)
        print("xlift :", self.xlift.shape)
        print("ylift :", self.ylift.shape)

In [47]:
class KMPC:
    ''' 
        Initialize the parameters for the Koopman MPC, with koopman system.
    '''
    def __init__(self, model):
        # take some properties/methods from the other class
        self.model = model
        
        # define default values of properties
        self.horizon = math.floor(1/self.model.dt) 
        
        self.input_bounds = np.array([-12,12]) # [min,max] can be 1x2 or mx2
        self.state_bounds = np.array([-10,10]) # [min,max] can be 1x2 or nx2
        
        self.cost_running = 0.1
        self.cost_terminal = 100
        self.cost_input = 0.01
        self.expand_bounds()
        #self.projmtx = self.model.C
        self.projmtx = self.model.C_select
        self.cost = cost()
        self.constraints = constraints()
        
    def expand_bounds(self):
        if self.model.m != self.input_bounds.shape[0]:
            self.input_bounds =np.kron(np.ones((self.model.m,1)), self.input_bounds)
        if self.model.n != self.state_bounds.shape[0]:
            self.state_bounds =np.kron(np.ones((self.model.n,1)), self.state_bounds)
        
    def get_zeta(self, arr):
        return self.model.computeBasis(arr)
    
    def is_pos_def(self, x):
        return np.all(np.linalg.eigvals(x) > 0)
    
    def get_costMatrices(self):
        '''
            get_costMatrices: Constructs cost the matrices for the mpc optimization problem.
            self.cost has fields H, G, D, A, B, C, Q, R
            
            define cost function matrices
            Cost function is defined as U'HU + ( z0'G + Yr'D )U
        '''
        model = self.model
        
        
        # calculation for matrix A ---------------------------------
        N = np.size(self.model.A, 0)
        A = np.zeros((N*(self.horizon+1), N))
        for i in range(0, self.horizon+1):
            A[(N*i):N*(i+1) , : ] = matrix_power(model.A,i)
            
            
        # calculation for matrix B ---------------------------------
        Bheight = N*(self.horizon+1);
        Bcolwidth = np.size(self.model.B,1)

        Bcol = np.zeros((Bheight, Bcolwidth))

        for i in range(1, self.horizon+1):
            Bcol[(N*i) : N*(i+1) , :] = matrix_power(self.model.A,(i-1))@self.model.B
        lshift = sparse.spdiags(np.ones(N*self.horizon), -N, N*(self.horizon+1) , N*(self.horizon+1))
   

        Bwidth = np.size(self.model.B,1)*self.horizon
        Bblkwidth = self.horizon 
        B = np.zeros((Bheight , Bwidth))
        B[: , :Bcolwidth] = Bcol
        for i in range(2, Bblkwidth+1):
            B[:,(i-1)*Bcolwidth:(i)*Bcolwidth] = lshift@B[:,(i-2)*Bcolwidth:(i-1)*Bcolwidth]
            
        # calculation for matrix C ---------------------------------------------------------
        #C = np.kron(np.eye(self.horizon+1) , self.projmtx)
        #nproj = np.size(self.projmtx , 0)
        nCshape = (self.projmtx.shape[0], self.model.Nlift)
        nC = np.zeros(nCshape)
        nC[0,0] = 1
        nC[1,2] = 1
        C = np.kron(np.eye(self.horizon+1) , nC)
        nproj = np.size(nCshape , 0)
        
        
        
        # Q: Error magnitude penalty ------------------------------------------------------
        Q = np.kron(np.eye(self.horizon+1) , np.eye(nproj) * self.cost_running) #error magnitude penalty (running cost) (default 0.1)
        endr, endc = Q.shape
        Q[endr-nproj: , endc-nproj:] = np.eye(nproj) * self.cost_terminal # (terminal cost) (default 100)
        
        
        # R: Input magnitude penalty -------------------------------------------------------
        R = np.kron(np.eye(self.horizon) , np.eye(model.m) * self.cost_input)  # input magnitude penalty
        

        # H, G and D
        H = B.T @ C.T @ Q @ C @ B + R
        G = 2 * A.T @ C.T @ Q @ C @ B
        D = -2 * Q @ C @ B
        
        # Setting outputs
        
        # constructed matrices
        
        self.cost.H = H
        self.cost.G = G
        self.cost.D = D
        
        
        # component matrices
        
        self.cost.A = A
        self.cost.B = B
        self.cost.C = C
        self.cost.Q = Q
        self.cost.R = R
        
    #---------------------------------------------------------------
    def get_constraintMatrices(self):
        '''
        F is input constraints
        E is the state constraints
        m no of inputs, n no of states (1,2)
        '''
        Np = self.horizon
        m = self.model.m
        n = self.model.n
        cost = self.cost
        F=[]
        E=[]
        c=[]
        
        # input_bounds
        if self.input_bounds.any():
            # define the number of the input bound constraints
            num = 2*m
            
            # F: input bounds
            Fbounds_i = np.vstack((-np.eye(m), np.eye(m))) # diagonal element of F, for bounded inputs
            Fbounds = np.zeros((num*(Np+1) , np.size(cost.B,1))) # no constraints, all zeros
            Fbounds[:num*Np , :Np*m] = np.kron(np.eye(Np), Fbounds_i)  
            F.append(Fbounds)
            
            # E: input bounds
            Ebounds = np.zeros((num*(Np+1) , np.size(cost.B,0))) # no constraints, all zeros
            E.append(Ebounds)
            
            # c: input bounds
            cbounds_i = np.vstack((self.input_bounds[:, [0]], self.input_bounds[:, [1]]))
            cbounds = np.zeros((num*(Np+1), 1))
            cbounds[:num*Np] = np.kron(np.ones((Np,1)), cbounds_i) 
            c.append(cbounds)
            

        # state_bounds
        if self.state_bounds.any():
            # define the number of the state bound constraints
            num = 2*n
            
            # E: state_bounds
            Esbounds_i = np.vstack((-np.eye(n),np.eye(n)))  
            Esbounds = np.zeros((num*(Np+1) , np.size(cost.A,0)));  # no constraints, all zeros
            Esbounds[:num*(Np+1), :(Np+1)*n] = np.kron(np.eye(Np+1), Esbounds_i)    # fill in nonzeros
            E.append(Esbounds)
            
            # F: state_bounds (all zeros)
            Fsbounds = np.zeros((np.size(Esbounds, 0) , np.size(cost.B, 1)))
            F.append(Fsbounds)
            
            # c: state_bounds
            #csbounds_i =  np.vstack((self.input_bounds[:, [0]], self.input_bounds[:, [1]])) # [-ymin ; ymax ]
            #csbounds = np.kron(np.ones((Np+1, 1)), csbounds_i)     # fill in nonzeros
            #c.append(csbounds)
            csbounds_i = np.vstack((self.input_bounds[:, [0]], self.input_bounds[:, [1]]))
            csbounds_i = np.tile(csbounds_i, (n,1))
            csbounds = np.zeros((num*(Np+1), 1))
            csbounds[:num*Np] = np.kron(np.ones((Np,1)), csbounds_i) 
            c.append(csbounds)
            
        
        
        self.constraints.F = np.vstack(F)
        self.constraints.E = np.vstack(E)
        self.constraints.c = np.vstack(c)
 
        self.constraints.L = self.constraints.F + self.constraints.E @ cost.B
        self.constraints.M = self.constraints.E @ cost.A
        
    def quadprog_solve_qp(self,P, q, G=None, h=None, A=None, b=None):
        qp_G = .5 * (P + P.T)   # make sure P is symmetric
        qp_a = -q
        if A is not None:
            qp_C = -numpy.vstack([A, G]).T
            qp_b = -numpy.hstack([b, h])
            meq = A.shape[0]
        else:  # no equality constraint
            qp_C = -G.T
            qp_b = -h
            meq = 0
        return quadprog.solve_qp(qp_G, qp_a, qp_C, qp_b, meq)[0] 
    

    def get_mpcInput(self, traj, ref):
        Np = self.horizon
        if not ref.any():
            ref = np.zeros(self.horizon)
            
        if ref.shape[1] != self.projmtx.shape[0]:
        #if ref.shape[1] != 2:
            print("Reference trajectory is not the correct dimension")
        elif ref.shape[1] > Np+1:
            ref = ref[:Np+1, :]
        elif ref.shape[1] < Np+1:
            ref_temp = np.kron(np.ones((Np+1, 1)), ref[-1,:]);
            ref_temp[:ref.shape[0], :] = ref
            ref = ref_temp
        
        z = model.computeBasis(traj).T
 
        
        Yr = np.reshape(ref.T, ((Np+1)*ref.shape[1], 1), order="F")
        #Yr = np.reshape(ref.T, ((Np+1)*2, 1), order="F")


        P1 = 2*self.cost.H
 
        q1 = (z.T@self.cost.G + Yr.T@self.cost.D ).T

        G1 = self.constraints.L.astype(float)
        h1 = - self.constraints.M@z + self.constraints.c
        A1 = np.ones(P1.shape[0])
        b1 = np.ones(1)
        q1 = np.squeeze(q1, axis=1).astype(float)
        h1 = np.squeeze(h1, axis=1).astype(float)
        #print(self.is_pos_def(P1))
      
        U = solve_qp(P1,q1,G1,h1, solver='quadprog')
        U1 = self.quadprog_solve_qp(P1, q1, G1, h1)
        return U, U1
        
        

In [7]:
model = sysModel(params)
model.print_shapes()

----------- Model --------------
states: 4      control inputs: 1
No of data points:  100
No of trajectories:  10
No of basis/lift:  20
x_data : (1000, 4)
y_data : (1000, 2)
u_data : (1000, 1)
A : (20, 20)
B : (20, 1)
C : (4, 20)
xlift : (20, 1000)
ylift : (20, 1000)


In [48]:
mpc = KMPC(model)
mpc.get_costMatrices()
mpc.get_constraintMatrices()
mpc.cost.print_shapes()
print("horizon :", mpc.horizon)
mpc.constraints.print_shapes()

----------- cost --------------
A : (220, 20)
B : (220, 10)
C : (22, 220)
Q : (22, 22)
R : (10, 10)
H : (10, 10)
D : (22, 10)
G : (20, 10)
horizon : 10
----------- constraints --------------
F : (110, 10)
E : (110, 220)
c : (110, 1)
L : (110, 10)
M : (110, 20)


In [49]:
data = model.data.x.to_numpy()

In [50]:

tr = datum.reshape(-1,1).T
rf = np.array([0,math.pi/2]).reshape(-1,1).T
U = mpc.get_mpcInput(tr,rf)
print(U)
 


ValueError: constraints are inconsistent, no solution

In [51]:
mpc.constraints.L

array([[-1.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00],
       [ 1.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00],
       [ 0.00000000e+00, -1.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00],
       [ 0.00000000e+00,  1.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00],
       [ 0.00000000e+00,  0.00000000e+00, -1.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
         0.