In [1]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

In [2]:
import autograd.numpy as np
%matplotlib notebook
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import math

In [3]:
def obse_pred_plot(X_mea, X_pred):
    fig, ax = plt.subplots()
    _x = np.linspace( np.min(X_mea), np.max(X_mea), 100 )
    ax.plot(_x, _x, 'r--', label='$x=y$')
    ax.plot(X_mea, X_pred, 'o')
    ax.set_title('Predictions vs Measurements', fontsize=15)
    ax.set_xlabel('Measured', fontsize=15)
    ax.set_ylabel('Predicted', fontsize=15)
    ax.tick_params( labelsize=15 )
    fig.tight_layout()


In [76]:
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel
from pymanopt.manifolds import Stiefel
from pymanopt.optimizers import ConjugateGradient, SteepestDescent, TrustRegions
from pymanopt import Problem
import pymanopt
from sklearn.model_selection import train_test_split
from scipy.linalg import cholesky, solve_triangular, cho_solve

class grf:
    def __init__(self, X, Y, m_ridge, n_restart=20, tol=1e-2, test_size=0.5):
        """
        X          -- input data
        Y          -- output data
        m_ridge    -- ridge function input dimension
        n_restart  -- number of times to restart fitting and pick the model with the lowest objective function value
        tol        -- error tolerance of cost to stop iteration
        test_size  -- size to split data to train and test sets [0, 1]
        """
        X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=test_size, random_state=20)
        self.X_train = X_train
        self.X_test = X_test
        self.y_train = y_train
        self.y_test = y_test
        self.n_restart = n_restart
        self.tol = tol
        dim = X.shape[1] # original dimension
        self.manifold = Stiefel(dim, m_ridge)
        # initialize projection matrix M
        self.M = np.random.rand(dim, m_ridge)
        # initialize covariance kernel
        self.kernel = RBF
    

#     def create_cost_and_derivative(self):
#         @pymanopt.function.numpy(self.manifold)
#         def cost(M):
#             U_train = self.X_train @ M
#             U_test = self.X_test @ M

#             G = self.kernel(U_train)
#             b = np.linalg.solve(G, self.y_train)

#             K_test = self.kernel(U_test, U_train)
#             g_test = K_test @ b

#             r = 0.5 * np.linalg.norm(self.y_test - g_test)**2 / self.y_test.shape[0]
#             return r

#         @pymanopt.function.numpy(self.manifold)
#         def dcost(M):
#             ell = self.kernel.get_params()['k1__k2__length_scale']
#             U_train = self.X_train @ M
#             U_test = self.X_test @ M
#             N_test = self.y_test.shape[0]

#             G = self.kernel(U_train)
#             b = np.linalg.solve(G, self.y_train)
#             K_test = self.kernel(U_test, U_train)
#             g_test = K_test @ b

#             inv_P = np.diag(1.0/ell**2)
#             dr = np.zeros(M.shape)
#             for i in range(N_test):
#                 U_tilde = U_test[i] - U_train
#                 dgdu = inv_P @ U_tilde.T @ (K_test[i,:] * b)
#                 dy = np.outer(dgdu, self.X_test[i,:]).T
#                 assert(dy.shape == M.shape)
#                 dr += (self.y_test[i] - g_test[i]) * (dy - M @ dy.T @ M)
#             return dr / N_test
#         return cost, dcost

    def create_cost(self):
        @pymanopt.function.autograd(self.manifold)
        def cost(M):
            U_train = self.X_train @ M
            U_test = self.X_test @ M

            N_train, m = self.X_train.shape

            lengthscales = self.kernel.get_params()['k1__k2__length_scale'] # rbf lengthscale
            sigma2_f = self.kernel.get_params()['k1__k1__constant_value'] # rbf variance
            sigma2_n = self.kernel.get_params()['k2__noise_level'] # noise variance
            L_inv = np.diag(1. / lengthscales.reshape(-1))
            dim = U_train.shape[1] # dimension of ridge function space

            U_train_tilde = U_train @ L_inv 
            # covariance on training data
            G = sigma2_f * np.exp(-0.5*(np.sum(U_train_tilde**2,1).reshape(-1,1) + np.sum(U_train_tilde**2,1) - 
                                       2 * np.dot(U_train_tilde, U_train_tilde.T)))

            G = G + sigma2_n * np.eye(N_train)
            b = np.linalg.solve(G, self.y_train)

            N_test = self.X_test.shape[0]
            U_test_tilde = U_test @ L_inv
            # covariance of testing and training data K(U_test, U_train)
            K_test = sigma2_f * np.exp(-0.5*(np.sum(U_test_tilde**2,1).reshape(-1,1) + np.sum(U_train_tilde**2,1) - 
                                       2 * np.dot(U_test_tilde, U_train_tilde.T)))
            g_test = K_test @ b
            r = 0.5 * np.linalg.norm(self.y_test - g_test)**2 / g_test.shape[0]
            return r
        return cost
    
   
       

    def pred(self, X_test_pred, M, kernel, return_var=False):
        """ 
        X_test_pred: test points to evaluate ridge function outputs
        M: projection matrix
        kernel: kernel with fitted hyper-parameters
        return_var: whether to return variance at test points
        
        Return:
        g_test: predictions of posterior mean using ridge function
        var_test: posterior variance at test points
        """
        U_train = self.X_train @ M
        U_test = X_test_pred @ M

        G = kernel(U_train)
#         b = np.linalg.solve(G, self.y_train)
        L_ = cholesky(G, lower=True, check_finite=False) # lower triangular
        b = cho_solve((L_, True), self.y_train, check_finite=False)

        K_test = kernel(U_test, U_train) # covariance of testing and training data
        g_test = K_test @ b # predicted posterior mean
        
        if not return_var:
            return g_test
        else:  
            v = solve_triangular(L_, K_test.T, lower=True, check_finite=False)
            var_test = self.kernel.diag(U_test).copy() - np.einsum("ij,ji->i", v.T, v)
            return g_test, var_test.reshape(-1,1)
            
            
            
    def set_XY(self, X_new, Y_new):
        """
        Update GPR model dataset
        """
        self.X_train = np.vstack((self.X_train, X_new))
        self.y_train = np.vstack((self.y_train, Y_new))
        
    @staticmethod
    def BIC(gpr):
        """
        Return BIC using log-likelihood
        """
        return gpr.log_marginal_likelihood_value_ - 0.5 * (gpr.n_features_in_ + 2) * math.log(gpr.y_train_.shape[0])

    def grf_fit(self):
        last_r =1e10
        err = np.inf
        d, m = self.M.shape
        n_iter = 0
        
        # re-initialize projection matrix M
        V = np.random.randn(d, m)
        q = np.linalg.qr(V)[0]
        self.M = q.copy()
        
        while err > self.tol:
            self.M_pred = self.M.copy()
            n_iter += 1
            U_train = self.X_train @ self.M_pred
            # prior covariance
            ker = 1.0 * RBF(length_scale=[1 for _ in range(m)], length_scale_bounds=(1e-7, 1e7)) \
            + WhiteKernel(noise_level=1e-6, noise_level_bounds=(1e-8, 1e2)) # noise_level: iid noise variance
            
            gpr = GaussianProcessRegressor(kernel=ker, n_restarts_optimizer=30, alpha=1e-9, normalize_y=False) 
            # n_restars_optimizer: number of optimizations for hyper-parameters
            # alpha: adding to diagonal of covariance matrix to prevent numerical issue during fitting
            gpr.fit(U_train, self.y_train)

            self.kernel = gpr.kernel_ # posterior kernel

#             my_cost, my_dcost = self.create_cost_and_derivative()
#             problem = Problem(manifold=self.manifold, cost=my_cost, euclidean_gradient=my_dcost)
            my_cost = self.create_cost()
            problem = Problem(manifold=self.manifold, cost=my_cost)
#             optimizer = ConjugateGradient(verbosity=0)
#             optimizer = SteepestDescent(verbosity=0)
            optimizer = TrustRegions(verbosity=0)
            M_new = optimizer.run(problem).point
            self.M = M_new.copy()

            r = my_cost(self.M)
            err = np.abs(last_r - r) / last_r
            last_r = r
        bic = self.BIC(gpr)
        return self.M_pred, gpr, r, bic, n_iter
    
    def __call__(self):
        r_min = np.inf
        for i in range(self.n_restart):
            M, gpr, r, bic, n_iter = self.grf_fit();
            print(i, r)
            if r < r_min:
                M_opt = M.copy()
                gpr_opt = gpr
                bic_opt = bic
                n_final = n_iter
                r_min = r
        return M_opt, gpr_opt, r_min, bic_opt, n_final

## Testing linear ridge function
Paper section 4.1

In [None]:
# get training and testing data
d = 10 # dimension of input
m = 2 # ridge subspace dimension
N = 100
X = np.random.rand(N, d) * 2 - 1 # x in [-1,1]
X_test = np.random.rand(50, d) * 2 - 1
# training and testing data
Ureal = np.random.randn(d, m)
q = np.linalg.qr(Ureal)[0]
Ureal = q.copy() # orthogonal
U_data = X @ Ureal
U_test = X_test @ Ureal
Y = np.sum(U_data, axis=1)
y_test = np.sum(U_test, axis=1)
grf_test = grf(X, Y, 2, n_restart=1, tol=1e-1, test_size=0.7)
results_grf = grf_test()
obse_pred_plot(y_test, grf_test.pred(X_test))

In [None]:
bic = results_grf[4]
r = results_grf[2]
print(f'cost={r}')
print(f'BIC = {bic}')

In [None]:
# verify sklearn predict method to calculate posterior mean 
M_final = results_grf[0]
gpr_final = results_grf[1]
obse_pred_plot(grf_test.pred(X_test), gpr_final.predict(X_test @ M_final))

In [None]:
# plot ridge function
ax = plt.figure().add_subplot(projection='3d')
U_test_final = X_test @ M_opt # using optimized M
ax.scatter(U_test_final[:,0], U_test_final[:,1], y_test, c=y_test)
# ax.invert_yaxis()
ax.set_xlabel('$m_1$')
ax.set_ylabel('$m_2$')
ax.set_zlabel('$f$')
plt.show()

### Bayesian Optimization

In [None]:
from dymola.dymola_interface import DymolaInterface
dymola = None
dymola = DymolaInterface()
dymola.openModel(path="C:\Jiacheng Ma\Modelica libraries\DynamicVCC\DynamicVCC\package.mo",changeDirectory=False)

In [None]:
def L_HX(theta_in):
    problem = "DynamicVCC.Examples.Tests.Test_ShellTubeHX"
    startTime = 2000
    stopTime = 3500
    outputInterval = 10
#     numberOfIntervals = 500
    method = "Dassl"
    tolerance = 0.0001
    initialNames = ['u[{}]'.format(i) for i in range(1,len(theta_in)+1)]
    initialValues = theta_in
    dymola.experimentSetupOutput(events=False)
    result, finalVar = dymola.simulateExtendedModel(problem=problem,
                                          startTime=startTime, 
                                          stopTime=stopTime,
                                          outputInterval=outputInterval,
                                          method=method,
                                          tolerance=tolerance,
                                          initialNames=initialNames,
                                          initialValues=initialValues)
    if not result:
        print(theta_in)
        print("Simulation failed. Below is the translation log.")
        log = dymola.getLastErrorLog()
        print(log)
        exit(1)
        return None, None
    else:
        Nrows = dymola.readTrajectorySize("dsres.mat")
        outputNames = ['y[{}]'.format(i) for i in range(1,4)] + ['y_mea[{}]'.format(i) for i in range(1,4)]
        outputVar = dymola.readTrajectory("dsres.mat", outputNames, Nrows)
        pred = np.array(outputVar[:3])
        Mea = np.array(outputVar[3:])
        ner = np.linalg.norm(pred[:,10:] - Mea[:,10:], axis=1) / np.linalg.norm(Mea[:,10:], axis=1) # omit some initialization points
        W = np.eye(ner.shape[0])
        cost = np.dot(ner.T,W.dot(ner))
        return cost, outputVar

# Objective function to minimize
def J_calib(u, lb, ub):
    """
    u    -- Scaled HTC
    lb   -- HTC lower bound
    ub   -- HTC upper bound
    """
    u_truescale = np.round(lb + u * (ub - lb),1)
    cost, outputVar = L_HX(list(u_truescale))
    if not cost:
        exit(1)
    else:
        return -np.log(cost)

In [None]:
if dymola is not None:
    dymola.close()
    dymola = None

In [None]:
# Test dymola model
theta_test = [5e4, 5e4, 5e4, 5e4, 131146]
cost, outputVar = L_HX(theta_test)

Lower and upper bounds for calibration parameters

In [None]:
lb = np.array([1e4,1e4,1e4,1e3,65573]) # Lower bounds of input space
ub = np.array([1e6,1e6,1e6,1e6,196719]) # Upper bounds of input space

Use Latin Hypercube designs to generate random samples

In [None]:
# Generate some starting data
np.random.seed(12345) # repeatable
n_init = 200 # Number of data points

from pyDOE import lhs

# Generate scaled samples of the input space
# X_normalize = np.random.rand(n_init, len(lb))
X_normalize = lhs(len(lb), n_init, 'c')

# Get corresponding results at function space
Y = np.zeros(n_init)
for i in range(n_init):
    Y[i] = J_calib(X_normalize[i,:], lb, ub)
    print(i+1, Y[i])
X_normalize = X_normalize[~np.isnan(Y),:]
Y = Y[~np.isnan(Y)]

# Plot funciton values
fig, ax = plt.subplots()
ax.plot(Y,'kx',markersize=10, markeredgewidth=2)
ax.set_xlabel('$n$')
ax.set_ylabel('$J(u)$')

# save data
np.savetxt('YX_chillerCond.txt', np.hstack((Y[:,None], X_normalize)), delimiter=',')

Gaussian ridge function for calibration parameter space

In [56]:
YX_data = np.loadtxt('YX_chillerCond.txt', delimiter=',')
Y = YX_data[:100,0]
X_normalize = YX_data[:100,1:]

# normalize training data
from sklearn import preprocessing
scaler_y = preprocessing.StandardScaler().fit(Y.reshape(-1,1))
Y_scaled = scaler_y.transform(Y.reshape(-1,1))

X_scaled = X_normalize
# scaler_X = preprocessing.StandardScaler().fit(X_normalize)
# X_scaled = scaler_X.transform(X_normalize)

In [None]:
grf_HX = grf(X_scaled, Y_scaled, 2, n_restart=30, tol=1e-1, test_size=0.4)
results_HX = grf_HX()
M_final, gpr_final, r_final, bic_final, n_final = results_HX
obse_pred_plot(Y, scaler_y.inverse_transform(grf_HX.pred(X_scaled, M_final, gpr_final.kernel_)))

0 0.5006066780416748


In [79]:
# performance only on training data
obse_pred_plot(scaler_y.inverse_transform(grf_HX.y_train), scaler_y.inverse_transform(grf_HX.pred(grf_HX.X_train, M_final, gpr_final.kernel_)))

<IPython.core.display.Javascript object>

In [80]:
# compare gpr.predict() method and grf.pred() method
y_train_predict = scaler_y.inverse_transform(gpr_final.predict(grf_HX.X_train @ M_final))
y_train_pred = scaler_y.inverse_transform(grf_HX.pred(grf_HX.X_train, M_final, gpr_final.kernel_))
obse_pred_plot(y_train_pred, y_train_predict)
# for using gpr.predict, input should be projected onto M_final, which is used for training

<IPython.core.display.Javascript object>

In [81]:
# performance only on testing data
X_test = grf_HX.X_test
y_test_scaled = grf_HX.y_test
obse_pred_plot(scaler_y.inverse_transform(y_test_scaled), scaler_y.inverse_transform(grf_HX.pred(grf_HX.X_test, M_final, gpr_final.kernel_)))

<IPython.core.display.Javascript object>

In [82]:
r_final

0.31076931930911394

In [None]:
print(f'r={results_HX[2]}')
print(f'BIC={results_HX[3]}')

In [None]:
def cost(M):
    U_train = self.X_train @ M
    U_test = self.X_test @ M

    N_train, m = self.X_train.shape

    lengthscales = self.kernel.get_params()['k1__k2__length_scale'] # rbf lengthscale
    sigma2_f = self.kernel.get_params()['k1__k1__constant_value'] # rbf variance
    sigma2_n = self.kernel.get_params()['k2__noise_level'] # noise variance
    L_inv = np.diag(1. / lengthscales.reshape(-1))
    dim = U_train.shape[1] # dimension of ridge function space

    U_train_tilde = U_train @ L_inv 
    # covariance on training data
    G = sigma2_f * np.exp(-0.5*(np.sum(U_train_tilde**2,1).reshape(-1,1) + np.sum(U_train_tilde**2,1) - 
                               2 * np.dot(U_train_tilde, U_train_tilde.T)))

    G = G + sigma2_n * np.eye(N_train)
    b = np.linalg.solve(G, self.y_train)

    N_test = self.X_test.shape[0]
    U_test_tilde = U_test @ L_inv
    # covariance of testing and training data K(U_test, U_train)
    K_test = sigma2_f * np.exp(-0.5*(np.sum(U_test_tilde**2,1).reshape(-1,1) + np.sum(U_train_tilde**2,1) - 
                               2 * np.dot(U_test_tilde, U_train_tilde.T)))
    g_test = K_test @ b
    r = 0.5 * np.linalg.norm(self.y_test - g_test)**2 / g_test.shape[0]
    return r

In [None]:
U_train = grf_HX.X_train @ M_final
U_test = grf_HX.X_test @ M_final
G = gpr_final.kernel_(U_train)
b = np.linalg.solve(G, grf_HX.y_train)
K_test = gpr_final.kernel_(U_test, U_train)
g_test = K_test @ b
r = 0.5 * np.linalg.norm(grf_HX.y_test - g_test)**2 / g_test.shape[0]

In [None]:
results_HX[2]

In [None]:
obse_pred_plot(grf_HX.y_test, g_test)

### Bayesian optimization

In [None]:
def BGOmaximize(f, gpr, X_design, alpha, f_params={}, alpha_params={}, max_it=15, plot=False):
    """Optimize a function using Bayesian global optimization
    Arguments
    f              -- The function to optimize
    gpr            -- Gaussian process regression model to approximate the objective function
    X_design       -- The set of candidate points to evaluate the function for identifying the optimal point
    alpha          -- Information acquisition function
    alpha_params   -- Extra parameters to the information acquisition function
    max_it         -- The maximum number of iterations
    plot           -- Whether or not to plot function evaluations v.s. iterations at max_it
    """
    af_all = [] # Store values of acquisition function 
    x_all = []
    y_all = []
    gpr.n_restarts_optimizer=0
    for count in range(max_it):
        # Using GPR model to get posterior mean and variance at given design points
        g, var = gpr.pred(X_design, return_var=True) # posterior mean and variance
        # Evaluate information acquisition function
        af_values = alpha(g, np.sqrt(var), gpr.y_train.max(), **alpha_params)
        # Find index of the next point to evaluate
        i = np.argmax(af_values)
        # Evaluate the function and stack the new data point to observations
        x_new = X_design[i,:] # original input space, not ridge function input space
        y_new = f(x_new, **f_params)
        y_new_scaled = scaler_y.transform(y_new.reshape(-1,1))
        print(count+1, x_new, y_new)
        if not y_new:
            X_design = np.delete(X_design, i, axis=0)
        else:
            x_all.append(x_new)
            y_all.append(y_new)
            af_all.append(af_values[i])
            # Update GPR
            gpr.set_XY(x_new, y_new_scaled)
            
    if plot:
        fig, ax = plt.subplots()
        ax.plot(y_all, '-*', markersize=10, markeredgewidth=2)
        ax.set_xticks(range(1,max_it+1,2))
        ax.set_xlabel('Iterations')
        ax.set_ylabel('$f(x)$')
            
    return gpr, af_all, x_all, y_all

# maximum upper interval
def mui(m, sigma, ymax, psi=1.96):
    return m + psi * sigma

In [None]:
n_design = int(1e6)
max_it=50 # number of iterations
X_design_normalize = np.random.rand(n_design, len(lb))
gpr_grf, af_all, x_all, y_all = BGOmaximize(J_calib, grf_HX, X_design_normalize, mui, f_params={'lb':lb,'ub':ub},max_it=max_it, plot=1)

In [None]:
x_optimal = x_all[-1]
u_optimal = np.round(lb + x_optimal * (ub - lb),1)

In [None]:
cost, outputVar = L_HX(list(u_optimal))

In [None]:
fig, ax = plt.subplots(3,1, figsize=(7,7))
time = np.linspace(2100, 3500, 141)
ax[0].plot(time, np.array(outputVar[0][10:]) / 1e5, label='Predicted')
ax[0].plot(time, np.array(outputVar[3][10:]) / 1e5, label='Measured')
ax[0].set_ylabel('Pressure [bar]')
ax[0].grid()
ax[1].plot(time, outputVar[1][10:], label='Predicted')
ax[1].plot(time, outputVar[4][10:], label='Measured')
ax[1].set_ylabel('Subcooling [K]')
ax[1].grid()
ax[2].plot(time, outputVar[2][10:], label='Predicted')
ax[2].plot(time, outputVar[5][10:], label='Measured')
ax[2].set_ylabel('Temperature [K]')
ax[2].grid()
ax[2].set_xlabel('Time [s]')


In [None]:
outputVar[0]